diff --git a/.gitignore b/.gitignore index 56688635b..b19855ab7 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,8 @@ target log.xml .externalToolBuilders *.pyc +##### +# Intellij +#### +.idea/ +*.iml diff --git a/kurento-jsonrpc/kurento-jsonrpc-client/src/main/java/org/kurento/jsonrpc/client/JsonRpcClientNettyWebSocket.java b/kurento-jsonrpc/kurento-jsonrpc-client/src/main/java/org/kurento/jsonrpc/client/JsonRpcClientNettyWebSocket.java index a7e280b85..9655a2918 100644 --- a/kurento-jsonrpc/kurento-jsonrpc-client/src/main/java/org/kurento/jsonrpc/client/JsonRpcClientNettyWebSocket.java +++ b/kurento-jsonrpc/kurento-jsonrpc-client/src/main/java/org/kurento/jsonrpc/client/JsonRpcClientNettyWebSocket.java @@ -16,14 +16,10 @@ package org.kurento.jsonrpc.client; +import javax.net.ssl.SSLException; import java.io.IOException; import java.util.concurrent.TimeoutException; -import javax.net.ssl.SSLException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -55,6 +51,8 @@ import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.CharsetUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class JsonRpcClientNettyWebSocket extends AbstractJsonRpcClientWebSocket { @@ -154,13 +152,30 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { private volatile Channel channel; private volatile EventLoopGroup group; private volatile JsonRpcWebSocketClientHandler handler; + private final boolean channelTimeout; + private String host = ""; + private int port; public JsonRpcClientNettyWebSocket(String url) { - this(url, null); + this(url, null, false); } public JsonRpcClientNettyWebSocket(String url, JsonRpcWSConnectionListener connectionListener) { + this(url, connectionListener, false); + log.debug("{} Creating JsonRPC NETTY Websocket client", label); + } + + /** + * Create json rpc client netty websocket connection + * + * @param url - url of websocket connection + * @param connectionListener - listener + * @param channelTimeout - should use future timeout for channel creation to prevent this thread from blocking forever. + */ + public JsonRpcClientNettyWebSocket(String url, JsonRpcWSConnectionListener connectionListener, + boolean channelTimeout) { super(url, connectionListener); + this.channelTimeout = channelTimeout; log.debug("{} Creating JsonRPC NETTY Websocket client", label); } @@ -185,20 +200,18 @@ protected boolean isNativeClientConnected() { @Override protected void connectNativeClient() throws TimeoutException, Exception { - if (channel == null || !channel.isActive() || group == null || group.isShuttingDown() - || group.isShutdown()) { - + if (channel == null || !channel.isActive() || group == null || group.isShuttingDown() || group.isShutdown()) { log.info("{} Connecting native client", label); final boolean ssl = "wss".equalsIgnoreCase(this.uri.getScheme()); final SslContext sslCtx; try { sslCtx = ssl ? SslContextBuilder.forClient() - .trustManager(InsecureTrustManagerFactory.INSTANCE).build() : null; + .trustManager(InsecureTrustManagerFactory.INSTANCE).build() : null; } catch (SSLException e) { log.error("{} Could not create SSL Context", label, e); throw new IllegalArgumentException( - "Could not create SSL context. See logs for more details", e); + "Could not create SSL context. See logs for more details", e); } final String scheme = uri.getScheme() == null ? "ws" : uri.getScheme(); @@ -215,7 +228,8 @@ protected void connectNativeClient() throws TimeoutException, Exception { } else { port = uri.getPort(); } - + this.port = port; + this.host = host; if (group == null || group.isShuttingDown() || group.isShutdown() || group.isTerminated()) { log.info("{} Creating new NioEventLoopGroup", label); group = new NioEventLoopGroup(); @@ -228,38 +242,52 @@ protected void connectNativeClient() throws TimeoutException, Exception { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) { - log.info("{} Initiating new Netty channel. Will create new handler too!", label); - handler = new JsonRpcWebSocketClientHandler( - WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, - true, new DefaultHttpHeaders(), maxPacketSize)); - - ChannelPipeline p = ch.pipeline(); - p.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTimeout / 1000)); - if (sslCtx != null) { - p.addLast(sslCtx.newHandler(ch.alloc(), host, port)); - } - p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), - WebSocketClientCompressionHandler.INSTANCE, handler); - } - }).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectionTimeout); + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + log.info("{} Initiating new Netty channel. Will create new handler too!", label); + handler = new JsonRpcWebSocketClientHandler( + WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, + true, new DefaultHttpHeaders(), maxPacketSize)); + + ChannelPipeline p = ch.pipeline(); + p.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTimeout / 1000)); + if (sslCtx != null) { + p.addLast(sslCtx.newHandler(ch.alloc(), host, port)); + } + p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), + WebSocketClientCompressionHandler.INSTANCE, handler); + } + }).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectionTimeout); int numRetries = 0; final int maxRetries = 5; while (channel == null || !channel.isOpen()) { try { - channel = b.connect(host, port).sync().channel(); - handler.handshakeFuture().sync(); + ChannelFuture future = b.connect(host, port); + if (channelTimeout) { + handleTimeout(future); + this.channel = future.channel(); + } else { + channel = future.sync() + .channel(); + } + future = handler.handshakeFuture(); + if (channelTimeout) { + handleTimeout(future); + } else { + future.sync(); + } } catch (InterruptedException e) { // This should never happen log.warn("{} ERROR connecting WS Netty client, opening channel", label, e); } catch (Exception e) { - if (e.getCause() instanceof WebSocketHandshakeException && numRetries < maxRetries) { + if (e.getCause() instanceof WebSocketHandshakeException + || e.getCause() instanceof TimeoutException + && numRetries < maxRetries) { log.warn( - "{} Upgrade exception when trying to connect to {}. Try {} of {}. Retrying in 200ms ", - label, uri, numRetries + 1, maxRetries); + "{} Upgrade exception when trying to connect to {}. Try {} of {}. Retrying in 200ms ", + label, uri, numRetries + 1, maxRetries); Thread.sleep(200); numRetries++; } else { @@ -281,6 +309,23 @@ public void operationComplete(ChannelFuture future) throws Exception { } + private void handleTimeout(ChannelFuture future) throws Exception { + // increase timeout plus 500 milliseconds to add buffer between actual connection timeout + final int timeoutMillis = this.connectionTimeout + 500; + future.await(timeoutMillis); + if (!future.isDone()) { + future.cancel(true); + throw new TimeoutException("Connection to " + host + ":" + port + " with future timeout of " + timeoutMillis); + } + if (future.isCancelled()) { + throw new Exception("Connection to " + host + ":" + port + " cancelled by user!"); + } + if (!future.isSuccess()) { + final Throwable cause = future.cause(); + throw new Exception("Create connection to " + host + ":" + port + " error", cause); + } + } + @Override public void closeNativeClient() { closeChannel(); @@ -298,10 +343,14 @@ private void closeChannel() { if (channel != null) { log.debug("{} Closing client", label); try { - channel.close().sync(); + ChannelFuture channelFuture = channel.close(); + if (channelTimeout) { + handleTimeout(channelFuture); + } else { + channelFuture.sync(); + } } catch (Exception e) { - log.debug("{} Could not properly close websocket client. Reason: {}", label, e.getMessage(), - e); + log.debug("{} Could not properly close websocket client. Reason: {}", label, e.getMessage(), e); } channel = null; } else {