Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Commit

Permalink
Add channel future timeout to prevent locking and keep old sync impl
Browse files Browse the repository at this point in the history
  • Loading branch information
mitch2na committed Aug 1, 2019
1 parent 36d82ac commit 367620d
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 37 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ target
log.xml
.externalToolBuilders
*.pyc
#####
# Intellij
####
.idea/
*.iml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,10 @@

package org.kurento.jsonrpc.client;

import javax.net.ssl.SSLException;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

import javax.net.ssl.SSLException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -55,6 +51,8 @@
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JsonRpcClientNettyWebSocket extends AbstractJsonRpcClientWebSocket {

Expand Down Expand Up @@ -154,13 +152,30 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
private volatile Channel channel;
private volatile EventLoopGroup group;
private volatile JsonRpcWebSocketClientHandler handler;
private final boolean channelTimeout;
private String host = "";
private int port;

public JsonRpcClientNettyWebSocket(String url) {
this(url, null);
this(url, null, false);
}

public JsonRpcClientNettyWebSocket(String url, JsonRpcWSConnectionListener connectionListener) {
this(url, connectionListener, false);
log.debug("{} Creating JsonRPC NETTY Websocket client", label);
}

/**
* Create json rpc client netty websocket connection
*
* @param url - url of websocket connection
* @param connectionListener - listener
* @param channelTimeout - should use future timeout for channel creation to prevent this thread from blocking forever.
*/
public JsonRpcClientNettyWebSocket(String url, JsonRpcWSConnectionListener connectionListener,
boolean channelTimeout) {
super(url, connectionListener);
this.channelTimeout = channelTimeout;
log.debug("{} Creating JsonRPC NETTY Websocket client", label);
}

Expand All @@ -185,20 +200,18 @@ protected boolean isNativeClientConnected() {
@Override
protected void connectNativeClient() throws TimeoutException, Exception {

if (channel == null || !channel.isActive() || group == null || group.isShuttingDown()
|| group.isShutdown()) {

if (channel == null || !channel.isActive() || group == null || group.isShuttingDown() || group.isShutdown()) {
log.info("{} Connecting native client", label);

final boolean ssl = "wss".equalsIgnoreCase(this.uri.getScheme());
final SslContext sslCtx;
try {
sslCtx = ssl ? SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build() : null;
.trustManager(InsecureTrustManagerFactory.INSTANCE).build() : null;
} catch (SSLException e) {
log.error("{} Could not create SSL Context", label, e);
throw new IllegalArgumentException(
"Could not create SSL context. See logs for more details", e);
"Could not create SSL context. See logs for more details", e);
}

final String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();
Expand All @@ -215,7 +228,8 @@ protected void connectNativeClient() throws TimeoutException, Exception {
} else {
port = uri.getPort();
}

this.port = port;
this.host = host;
if (group == null || group.isShuttingDown() || group.isShutdown() || group.isTerminated()) {
log.info("{} Creating new NioEventLoopGroup", label);
group = new NioEventLoopGroup();
Expand All @@ -228,38 +242,52 @@ protected void connectNativeClient() throws TimeoutException, Exception {

Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
log.info("{} Initiating new Netty channel. Will create new handler too!", label);
handler = new JsonRpcWebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null,
true, new DefaultHttpHeaders(), maxPacketSize));

ChannelPipeline p = ch.pipeline();
p.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTimeout / 1000));
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
}
p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192),
WebSocketClientCompressionHandler.INSTANCE, handler);
}
}).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectionTimeout);
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
log.info("{} Initiating new Netty channel. Will create new handler too!", label);
handler = new JsonRpcWebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null,
true, new DefaultHttpHeaders(), maxPacketSize));

ChannelPipeline p = ch.pipeline();
p.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTimeout / 1000));
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
}
p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192),
WebSocketClientCompressionHandler.INSTANCE, handler);
}
}).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectionTimeout);

int numRetries = 0;
final int maxRetries = 5;
while (channel == null || !channel.isOpen()) {
try {
channel = b.connect(host, port).sync().channel();
handler.handshakeFuture().sync();
ChannelFuture future = b.connect(host, port);
if (channelTimeout) {
handleTimeout(future);
this.channel = future.channel();
} else {
channel = future.sync()
.channel();
}
future = handler.handshakeFuture();
if (channelTimeout) {
handleTimeout(future);
} else {
future.sync();
}
} catch (InterruptedException e) {
// This should never happen
log.warn("{} ERROR connecting WS Netty client, opening channel", label, e);
} catch (Exception e) {
if (e.getCause() instanceof WebSocketHandshakeException && numRetries < maxRetries) {
if (e.getCause() instanceof WebSocketHandshakeException
|| e.getCause() instanceof TimeoutException
&& numRetries < maxRetries) {
log.warn(
"{} Upgrade exception when trying to connect to {}. Try {} of {}. Retrying in 200ms ",
label, uri, numRetries + 1, maxRetries);
"{} Upgrade exception when trying to connect to {}. Try {} of {}. Retrying in 200ms ",
label, uri, numRetries + 1, maxRetries);
Thread.sleep(200);
numRetries++;
} else {
Expand All @@ -281,6 +309,23 @@ public void operationComplete(ChannelFuture future) throws Exception {

}

private void handleTimeout(ChannelFuture future) throws Exception {
// increase timeout plus 500 milliseconds to add buffer between actual connection timeout
final int timeoutMillis = this.connectionTimeout + 500;
future.await(timeoutMillis);
if (!future.isDone()) {
future.cancel(true);
throw new TimeoutException("Connection to " + host + ":" + port + " with future timeout of " + timeoutMillis);
}
if (future.isCancelled()) {
throw new Exception("Connection to " + host + ":" + port + " cancelled by user!");
}
if (!future.isSuccess()) {
final Throwable cause = future.cause();
throw new Exception("Create connection to " + host + ":" + port + " error", cause);
}
}

@Override
public void closeNativeClient() {
closeChannel();
Expand All @@ -298,10 +343,14 @@ private void closeChannel() {
if (channel != null) {
log.debug("{} Closing client", label);
try {
channel.close().sync();
ChannelFuture channelFuture = channel.close();
if (channelTimeout) {
handleTimeout(channelFuture);
} else {
channelFuture.sync();
}
} catch (Exception e) {
log.debug("{} Could not properly close websocket client. Reason: {}", label, e.getMessage(),
e);
log.debug("{} Could not properly close websocket client. Reason: {}", label, e.getMessage(), e);
}
channel = null;
} else {
Expand Down

0 comments on commit 367620d

Please sign in to comment.