Skip to content

Commit

Permalink
fix(uat): change error processing in paho-java-client (#381)
Browse files Browse the repository at this point in the history
  • Loading branch information
auarbekov authored Jul 27, 2023
1 parent d87a777 commit f3f2d38
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ public class Mqtt311ConnectionImpl implements MqttConnection {
private static final Logger logger = LogManager.getLogger(Mqtt311ConnectionImpl.class);
private static final String EXCEPTION_WHEN_CONNECTING = "Exception occurred during connect";
private static final String EXCEPTION_WHEN_CONFIGURE_SSL_CA = "Exception occurred during SSL configuration";
private static final String EXCEPTION_WHEN_DISCONNECTING = "Exception occurred during disconnect";
private static final int REASON_CODE_SUCCESS = 0;

private final AtomicBoolean isClosing = new AtomicBoolean();
private final AtomicBoolean isConnected = new AtomicBoolean();
private final IMqttAsyncClient mqttClient;
private final GRPCClient grpcClient;
private int connectionId = 0;
Expand Down Expand Up @@ -76,6 +78,7 @@ public ConnectResult start(MqttLib.ConnectionParams connectionParams, int connec
mqttClient.setCallback(new MqttCallbackImpl());
token.waitForCompletion(TimeUnit.SECONDS.toMillis(connectionParams.getConnectionTimeout()));

isConnected.set(true);
logger.atInfo().log("MQTT 3.1.1 connection {} is establisted", connectionId);
return buildConnectResult(true, token.isComplete());
} catch (org.eclipse.paho.client.mqttv3.MqttException e) {
Expand All @@ -89,7 +92,9 @@ public ConnectResult start(MqttLib.ConnectionParams connectionParams, int connec

@Override
public MqttSubscribeReply subscribe(long timeout, @NonNull List<Subscription> subscriptions,
List<Mqtt5Properties> userProperties) {
List<Mqtt5Properties> userProperties) throws MqttException {
stateCheck();

checkUserProperties(userProperties);
String[] filters = new String[subscriptions.size()];
int[] qos = new int[subscriptions.size()];
Expand All @@ -109,7 +114,7 @@ public MqttSubscribeReply subscribe(long timeout, @NonNull List<Subscription> su
logger.atError().withThrowable(e).log("Exception occurred during subscribe, reason code {}",
e.getReasonCode());

builder.addAllReasonCodes(Collections.nCopies(subscriptions.size(), (int)e.getReasonCode()));
throw new MqttException("Could not subscribe", e);
}
return builder.build();
}
Expand All @@ -127,7 +132,9 @@ public void disconnect(long timeout, int reasonCode, List<Mqtt5Properties> userP
}

@Override
public MqttPublishReply publish(long timeout, @NonNull Message message) {
public MqttPublishReply publish(long timeout, @NonNull Message message) throws MqttException {
stateCheck();

checkUserProperties(message.getUserProperties());
checkContentType(message.getContentType());
checkPayloadFormatIndicator(message.getPayloadFormatIndicator());
Expand All @@ -148,14 +155,16 @@ public MqttPublishReply publish(long timeout, @NonNull Message message) {
logger.atError().withThrowable(ex)
.log("Failed during publishing message with reasonCode {} and reasonString {}",
ex.getReasonCode(), ex.getMessage());
builder.setReasonCode(ex.getReasonCode());
throw new MqttException("Could not publish", ex);
}
return builder.build();
}

@Override
public MqttSubscribeReply unsubscribe(long timeout, @NonNull List<String> filters,
List<Mqtt5Properties> userProperties) {
List<Mqtt5Properties> userProperties) throws MqttException {
stateCheck();

checkUserProperties(userProperties);

MqttSubscribeReply.Builder builder = MqttSubscribeReply.newBuilder();
Expand All @@ -166,7 +175,7 @@ public MqttSubscribeReply unsubscribe(long timeout, @NonNull List<String> filter
} catch (org.eclipse.paho.client.mqttv3.MqttException e) {
logger.atError().withThrowable(e).log("Exception occurred during unsubscribe, reason code {}",
e.getReasonCode());
builder.addAllReasonCodes(Collections.nCopies(filters.size(), (int)e.getReasonCode()));
throw new MqttException("Could not unsubscribe", e);
}
return builder.build();
}
Expand Down Expand Up @@ -210,10 +219,31 @@ private MqttConnectOptions convertParams(MqttLib.ConnectionParams connectionPara
return connectionOptions;
}

private void disconnectAndClose(long timeout) throws org.eclipse.paho.client.mqttv3.MqttException {
private void disconnectAndClose(long timeout) throws org.eclipse.paho.client.mqttv3.MqttException, MqttException {
try {
mqttClient.disconnectForcibly(timeout);
final long deadline = System.nanoTime() + timeout * 1_000_000_000;

if (isConnected.compareAndSet(true, false)) {
mqttClient.disconnectForcibly(timeout);
} else {
logger.atWarn().log("DISCONNECT was not sent on the dead connection");
}

long remaining = deadline - System.nanoTime();
if (remaining < MIN_SHUTDOWN_NS) {
remaining = MIN_SHUTDOWN_NS;
}

executorService.shutdown();
if (!executorService.awaitTermination(remaining, TimeUnit.NANOSECONDS)) {
executorService.shutdownNow();
}

logger.atInfo().log("MQTT 3.1.1 connection {} has been disconnected", connectionId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.atError().withThrowable(e).log(EXCEPTION_WHEN_DISCONNECTING);
throw new MqttException(EXCEPTION_WHEN_DISCONNECTING, e);
} finally {
mqttClient.close();
}
Expand Down Expand Up @@ -273,6 +303,7 @@ class MqttCallbackImpl implements MqttCallback {
@SuppressWarnings("PMD.AvoidCatchingGenericException")
@Override
public void connectionLost(Throwable throwable) {
isConnected.set(false);
// only unsolicited disconnect
if (isClosing.get()) {
logger.atWarn().log("DISCONNECT event ignored due to shutdown initiated");
Expand Down Expand Up @@ -322,4 +353,19 @@ private void processMessage(String topic, MqttMessage mqttMessage) {
connectionId, topic, mqttMessage.getQos(), mqttMessage.isRetained());
}
}

/**
* Checks connection state.
*
* @throws MqttException when connection state is not allow opertation
*/
private void stateCheck() throws MqttException {
if (!isConnected.get()) {
throw new MqttException("MQTT client is not in connected state");
}

if (isClosing.get()) {
throw new MqttException("MQTT connection is closing");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,10 @@ class Message {
* @param subscriptions list of subscriptions
* @param userProperties list of user's properties MQTT v5.0
* @return useful information from SUBACK packet
* @throws MqttException on errors
*/
MqttSubscribeReply subscribe(long timeout, @NonNull List<Subscription> subscriptions,
List<Mqtt5Properties> userProperties);
List<Mqtt5Properties> userProperties) throws MqttException;

/**
* Closes MQTT connection.
Expand All @@ -171,8 +172,9 @@ MqttSubscribeReply subscribe(long timeout, @NonNull List<Subscription> subscript
* @param timeout publish operation timeout in seconds
* @param message message to publish
* @return useful information from PUBACK packet or null of no PUBACK has been received (as for QoS 0)
* @throws MqttException on errors
*/
MqttPublishReply publish(long timeout, @NonNull Message message);
MqttPublishReply publish(long timeout, @NonNull Message message) throws MqttException;

/**
* Unsubscribes from topics.
Expand All @@ -181,9 +183,10 @@ MqttSubscribeReply subscribe(long timeout, @NonNull List<Subscription> subscript
* @param filters list of topic filter to unsubscribe
* @param userProperties list of user's properties MQTT v5.0
* @return useful information from UNSUBACK packet
* @throws MqttException on errors
*/
MqttSubscribeReply unsubscribe(long timeout, @NonNull List<String> filters,
List<Mqtt5Properties> userProperties);
List<Mqtt5Properties> userProperties) throws MqttException;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,19 @@ public void publishMqtt(MqttPublishRequest request, StreamObserver<MqttPublishRe
internalMessage.contentType(message.getContentType());
}

MqttPublishReply publishReply = connection.publish(timeout, internalMessage.build());
MqttPublishReply publishReply;
try {
publishReply = connection.publish(timeout, internalMessage.build());
if (publishReply != null) {
logger.atInfo().log("Publish response: connectionId {} reason code {} reason string '{}'",
connectionId, publishReply.getReasonCode(), publishReply.getReasonString());
}
} catch (MqttException e) {
logger.atError().withThrowable(e).log("exception during publish");
responseObserver.onError(e);
return;
}

responseObserver.onNext(publishReply);
responseObserver.onCompleted();
}
Expand Down Expand Up @@ -445,11 +453,19 @@ public void subscribeMqtt(MqttSubscribeRequest request, StreamObserver<MqttSubsc
logger.atInfo().log("Subscribe: connectionId {} for {} filters",
connectionId, outSubscriptions.size());
List<Mqtt5Properties> userProperties = request.getPropertiesList();
MqttSubscribeReply subscribeReply = connection.subscribe(timeout, outSubscriptions, userProperties);
if (subscribeReply != null) {
logger.atInfo().log("Subscribe response: connectionId {} reason codes {} reason string '{}'",
connectionId, subscribeReply.getReasonCodesList(), subscribeReply.getReasonString());
MqttSubscribeReply subscribeReply;
try {
subscribeReply = connection.subscribe(timeout, outSubscriptions, userProperties);
if (subscribeReply != null) {
logger.atInfo().log("Subscribe response: connectionId {} reason codes {} reason string '{}'",
connectionId, subscribeReply.getReasonCodesList(), subscribeReply.getReasonString());
}
} catch (MqttException e) {
logger.atError().withThrowable(e).log("exception during subscribe");
responseObserver.onError(e);
return;
}

responseObserver.onNext(subscribeReply);
responseObserver.onCompleted();
}
Expand Down Expand Up @@ -487,12 +503,19 @@ public void unsubscribeMqtt(MqttUnsubscribeRequest request,
logger.atInfo().log("Unsubscribe: connectionId {} for {} filters",
connectionId, filters);
List<Mqtt5Properties> userProperties = request.getPropertiesList();
MqttSubscribeReply unsubscribeReply = connection.unsubscribe(timeout, filters, userProperties);

if (unsubscribeReply != null) {
logger.atInfo().log("Unsubscribe response: connectionId {} reason codes {} reason string '{}'",
connectionId, unsubscribeReply.getReasonCodesList(), unsubscribeReply.getReasonString());
MqttSubscribeReply unsubscribeReply;
try {
unsubscribeReply = connection.unsubscribe(timeout, filters, userProperties);
if (unsubscribeReply != null) {
logger.atInfo().log("Unsubscribe response: connectionId {} reason codes {} reason string '{}'",
connectionId, unsubscribeReply.getReasonCodesList(), unsubscribeReply.getReasonString());
}
} catch (MqttException e) {
logger.atError().withThrowable(e).log("exception during unsubscribe");
responseObserver.onError(e);
return;
}

responseObserver.onNext(unsubscribeReply);
responseObserver.onCompleted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class MqttConnectionImpl implements MqttConnection {
private static final Logger logger = LogManager.getLogger(MqttConnectionImpl.class);

private final AtomicBoolean isClosing = new AtomicBoolean();
private final AtomicBoolean isConnected = new AtomicBoolean();
private final GRPCClient grpcClient;
private final IMqttAsyncClient client;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
Expand Down Expand Up @@ -80,6 +81,7 @@ public ConnectResult start(MqttLib.ConnectionParams connectionParams, int connec
client.setCallback(new MqttCallbackImpl());
token.waitForCompletion(TimeUnit.SECONDS.toMillis(connectionParams.getConnectionTimeout()));
success = true;
isConnected.set(true);
return buildConnectResult(true, token, null);
} catch (org.eclipse.paho.mqttv5.common.MqttException ex) {
logger.atError().withThrowable(ex).log("Exception occurred during connect reason code {}",
Expand All @@ -101,7 +103,9 @@ public ConnectResult start(MqttLib.ConnectionParams connectionParams, int connec

@Override
public MqttSubscribeReply subscribe(long timeout, @NonNull List<Subscription> subscriptions,
List<Mqtt5Properties> userProperties) {
List<Mqtt5Properties> userProperties) throws MqttException {
stateCheck();

MqttSubscription[] mqttSubscriptions = new MqttSubscription[subscriptions.size()];
MqttMessageListener[] listeners = new MqttMessageListener[subscriptions.size()];
for (int i = 0; i < subscriptions.size(); i++) {
Expand Down Expand Up @@ -142,8 +146,8 @@ public MqttSubscribeReply subscribe(long timeout, @NonNull List<Subscription> su
}
}
} catch (org.eclipse.paho.mqttv5.common.MqttException e) {
builder.addReasonCodes(e.getReasonCode());
builder.setReasonString(e.getMessage());
logger.atError().withThrowable(e).log("Failed during subscribing");
throw new MqttException("Could not subscribe", e);
}
return builder.build();
}
Expand All @@ -161,7 +165,9 @@ public void disconnect(long timeout, int reasonCode, List<Mqtt5Properties> userP
}

@Override
public MqttPublishReply publish(long timeout, @NonNull Message message) {
public MqttPublishReply publish(long timeout, @NonNull Message message) throws MqttException {
stateCheck();

MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(message.getQos());
mqttMessage.setPayload(message.getPayload());
Expand Down Expand Up @@ -197,15 +203,16 @@ public MqttPublishReply publish(long timeout, @NonNull Message message) {
logger.atError().withThrowable(ex)
.log("Failed during publishing message with reasonCode {} and reasonString {}",
ex.getReasonCode(), ex.getMessage());
builder.setReasonCode(ex.getReasonCode());
builder.setReasonString(ex.getMessage());
throw new MqttException("Could not publish", ex);
}
return builder.build();
}

@Override
public MqttSubscribeReply unsubscribe(long timeout, @NonNull List<String> filters,
List<Mqtt5Properties> userProperties) {
List<Mqtt5Properties> userProperties) throws MqttException {
stateCheck();

String[] filterArray = new String[filters.size()];
for (int i = 0; i < filters.size(); i++) {
filterArray[i] = filters.get(i);
Expand Down Expand Up @@ -245,8 +252,7 @@ public MqttSubscribeReply unsubscribe(long timeout, @NonNull List<String> filter
}
} catch (org.eclipse.paho.mqttv5.common.MqttException e) {
logger.atError().withThrowable(e).log("Failed during unsubscribe");
builder.addReasonCodes(e.getReasonCode());
builder.setReasonString(e.getMessage());
throw new MqttException("Could not unsubscribe", e);
}
return builder.build();
}
Expand All @@ -266,13 +272,34 @@ private IMqttAsyncClient createAsyncClient(MqttLib.ConnectionParams connectionPa
private void disconnectAndClose(long timeout, int reasonCode, List<Mqtt5Properties> userProperties)
throws org.eclipse.paho.mqttv5.common.MqttException {
MqttProperties properties = new MqttProperties();

if (userProperties != null && !userProperties.isEmpty()) {
properties.setUserProperties(convertToUserProperties(userProperties));
userProperties.forEach(p -> logger.atInfo()
.log("Disconnect MQTT userProperties: {}, {}", p.getKey(), p.getValue()));
}

try {
client.disconnectForcibly(QUIESCE_TIMEOUT, timeout, reasonCode, properties);
if (isConnected.compareAndSet(true, false)) {
client.disconnectForcibly(QUIESCE_TIMEOUT, timeout, reasonCode, properties);
} else {
logger.atWarn().log("DISCONNECT was not sent on the dead connection");
}

final long deadline = System.nanoTime() + timeout * 1_000_000_000;

long remaining = deadline - System.nanoTime();
if (remaining < MIN_SHUTDOWN_NS) {
remaining = MIN_SHUTDOWN_NS;
}

executorService.shutdown();
if (!executorService.awaitTermination(remaining, TimeUnit.NANOSECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
} finally {
client.close();
}
Expand Down Expand Up @@ -459,6 +486,7 @@ class MqttCallbackImpl implements MqttCallback {
@Override
@SuppressWarnings("PMD.AvoidCatchingGenericException")
public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
isConnected.set(false);
GRPCClient.DisconnectInfo disconnectInfo = convertDisconnectPacket(mqttDisconnectResponse);

final String errorString = mqttDisconnectResponse.getException() == null
Expand Down Expand Up @@ -587,4 +615,19 @@ private GRPCClient.DisconnectInfo convertDisconnectPacket(MqttDisconnectResponse
userProperties
);
}

/**
* Checks connection state.
*
* @throws MqttException when connection state is not allow opertation
*/
private void stateCheck() throws MqttException {
if (!isConnected.get()) {
throw new MqttException("MQTT client is not in connected state");
}

if (isClosing.get()) {
throw new MqttException("MQTT connection is closing");
}
}
}

0 comments on commit f3f2d38

Please sign in to comment.