Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Add channel future timeout to be backwards compatible with older clients #18

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ target
log.xml
.externalToolBuilders
*.pyc
#####
# Intellij
####
.idea/
*.iml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,10 @@

package org.kurento.jsonrpc.client;

import javax.net.ssl.SSLException;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

import javax.net.ssl.SSLException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -55,6 +51,8 @@
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JsonRpcClientNettyWebSocket extends AbstractJsonRpcClientWebSocket {

Expand Down Expand Up @@ -154,13 +152,30 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
private volatile Channel channel;
private volatile EventLoopGroup group;
private volatile JsonRpcWebSocketClientHandler handler;
private final boolean channelTimeout;
private String host = "";
private int port;

public JsonRpcClientNettyWebSocket(String url) {
this(url, null);
this(url, null, false);
}

public JsonRpcClientNettyWebSocket(String url, JsonRpcWSConnectionListener connectionListener) {
this(url, connectionListener, false);
log.debug("{} Creating JsonRPC NETTY Websocket client", label);
}

/**
* Create json rpc client netty websocket connection
*
* @param url - url of websocket connection
* @param connectionListener - listener
* @param channelTimeout - should use future timeout for channel creation to prevent this thread from blocking forever.
*/
public JsonRpcClientNettyWebSocket(String url, JsonRpcWSConnectionListener connectionListener,
boolean channelTimeout) {
super(url, connectionListener);
this.channelTimeout = channelTimeout;
log.debug("{} Creating JsonRPC NETTY Websocket client", label);
}

Expand All @@ -185,20 +200,18 @@ protected boolean isNativeClientConnected() {
@Override
protected void connectNativeClient() throws TimeoutException, Exception {

if (channel == null || !channel.isActive() || group == null || group.isShuttingDown()
|| group.isShutdown()) {

if (channel == null || !channel.isActive() || group == null || group.isShuttingDown() || group.isShutdown()) {
log.info("{} Connecting native client", label);

final boolean ssl = "wss".equalsIgnoreCase(this.uri.getScheme());
final SslContext sslCtx;
try {
sslCtx = ssl ? SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build() : null;
.trustManager(InsecureTrustManagerFactory.INSTANCE).build() : null;
} catch (SSLException e) {
log.error("{} Could not create SSL Context", label, e);
throw new IllegalArgumentException(
"Could not create SSL context. See logs for more details", e);
"Could not create SSL context. See logs for more details", e);
}

final String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();
Expand All @@ -215,7 +228,8 @@ protected void connectNativeClient() throws TimeoutException, Exception {
} else {
port = uri.getPort();
}

this.port = port;
this.host = host;
if (group == null || group.isShuttingDown() || group.isShutdown() || group.isTerminated()) {
log.info("{} Creating new NioEventLoopGroup", label);
group = new NioEventLoopGroup();
Expand Down Expand Up @@ -250,13 +264,26 @@ protected void initChannel(SocketChannel ch) {
final int maxRetries = 5;
while (channel == null || !channel.isOpen()) {
try {
channel = b.connect(host, port).sync().channel();
handler.handshakeFuture().sync();
ChannelFuture future = b.connect(host, port);
if (channelTimeout) {
handleTimeout(future);
this.channel = future.channel();
} else {
channel = future.sync().channel();
}
future = handler.handshakeFuture();
if (channelTimeout) {
handleTimeout(future);
} else {
future.sync();
}
} catch (InterruptedException e) {
// This should never happen
log.warn("{} ERROR connecting WS Netty client, opening channel", label, e);
} catch (Exception e) {
if (e.getCause() instanceof WebSocketHandshakeException && numRetries < maxRetries) {
if (e.getCause() instanceof WebSocketHandshakeException
|| e.getCause() instanceof TimeoutException
&& numRetries < maxRetries) {
log.warn(
"{} Upgrade exception when trying to connect to {}. Try {} of {}. Retrying in 200ms ",
label, uri, numRetries + 1, maxRetries);
Expand All @@ -281,6 +308,23 @@ public void operationComplete(ChannelFuture future) throws Exception {

}

private void handleTimeout(ChannelFuture future) throws Exception {
// increase timeout plus 500 milliseconds to add buffer between actual connection timeout
final int timeoutMillis = this.connectionTimeout + 500;
future.await(timeoutMillis);
if (!future.isDone()) {
future.cancel(true);
throw new TimeoutException("Connection to " + host + ":" + port + " with future timeout of " + timeoutMillis);
}
if (future.isCancelled()) {
throw new Exception("Connection to " + host + ":" + port + " cancelled by user!");
}
if (!future.isSuccess()) {
final Throwable cause = future.cause();
throw new Exception("Create connection to " + host + ":" + port + " error", cause);
}
}

@Override
public void closeNativeClient() {
closeChannel();
Expand All @@ -298,10 +342,14 @@ private void closeChannel() {
if (channel != null) {
log.debug("{} Closing client", label);
try {
channel.close().sync();
ChannelFuture channelFuture = channel.close();
if (channelTimeout) {
handleTimeout(channelFuture);
} else {
channelFuture.sync();
}
} catch (Exception e) {
log.debug("{} Could not properly close websocket client. Reason: {}", label, e.getMessage(),
e);
log.debug("{} Could not properly close websocket client. Reason: {}", label, e.getMessage(), e);
}
channel = null;
} else {
Expand Down