diff --git a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java index c7cf0cce2..f53fbae61 100644 --- a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java +++ b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java @@ -42,9 +42,11 @@ public class Mqtt311ConnectionImpl implements MqttConnection { private static final Logger logger = LogManager.getLogger(Mqtt311ConnectionImpl.class); private static final String EXCEPTION_WHEN_CONNECTING = "Exception occurred during connect"; private static final String EXCEPTION_WHEN_CONFIGURE_SSL_CA = "Exception occurred during SSL configuration"; + private static final String EXCEPTION_WHEN_DISCONNECTING = "Exception occurred during disconnect"; private static final int REASON_CODE_SUCCESS = 0; private final AtomicBoolean isClosing = new AtomicBoolean(); + private final AtomicBoolean isConnected = new AtomicBoolean(); private final IMqttAsyncClient mqttClient; private final GRPCClient grpcClient; private int connectionId = 0; @@ -76,6 +78,7 @@ public ConnectResult start(MqttLib.ConnectionParams connectionParams, int connec mqttClient.setCallback(new MqttCallbackImpl()); token.waitForCompletion(TimeUnit.SECONDS.toMillis(connectionParams.getConnectionTimeout())); + isConnected.set(true); logger.atInfo().log("MQTT 3.1.1 connection {} is establisted", connectionId); return buildConnectResult(true, token.isComplete()); } catch (org.eclipse.paho.client.mqttv3.MqttException e) { @@ -89,7 +92,9 @@ public ConnectResult start(MqttLib.ConnectionParams connectionParams, int connec @Override public MqttSubscribeReply subscribe(long timeout, @NonNull List subscriptions, - List userProperties) { + List userProperties) throws MqttException { + stateCheck(); + checkUserProperties(userProperties); String[] filters = new String[subscriptions.size()]; int[] qos = new int[subscriptions.size()]; @@ -109,7 +114,7 @@ public MqttSubscribeReply subscribe(long timeout, @NonNull List su logger.atError().withThrowable(e).log("Exception occurred during subscribe, reason code {}", e.getReasonCode()); - builder.addAllReasonCodes(Collections.nCopies(subscriptions.size(), (int)e.getReasonCode())); + throw new MqttException("Could not subscribe", e); } return builder.build(); } @@ -127,7 +132,9 @@ public void disconnect(long timeout, int reasonCode, List userP } @Override - public MqttPublishReply publish(long timeout, @NonNull Message message) { + public MqttPublishReply publish(long timeout, @NonNull Message message) throws MqttException { + stateCheck(); + checkUserProperties(message.getUserProperties()); checkContentType(message.getContentType()); checkPayloadFormatIndicator(message.getPayloadFormatIndicator()); @@ -148,14 +155,16 @@ public MqttPublishReply publish(long timeout, @NonNull Message message) { logger.atError().withThrowable(ex) .log("Failed during publishing message with reasonCode {} and reasonString {}", ex.getReasonCode(), ex.getMessage()); - builder.setReasonCode(ex.getReasonCode()); + throw new MqttException("Could not publish", ex); } return builder.build(); } @Override public MqttSubscribeReply unsubscribe(long timeout, @NonNull List filters, - List userProperties) { + List userProperties) throws MqttException { + stateCheck(); + checkUserProperties(userProperties); MqttSubscribeReply.Builder builder = MqttSubscribeReply.newBuilder(); @@ -166,7 +175,7 @@ public MqttSubscribeReply unsubscribe(long timeout, @NonNull List filter } catch (org.eclipse.paho.client.mqttv3.MqttException e) { logger.atError().withThrowable(e).log("Exception occurred during unsubscribe, reason code {}", e.getReasonCode()); - builder.addAllReasonCodes(Collections.nCopies(filters.size(), (int)e.getReasonCode())); + throw new MqttException("Could not unsubscribe", e); } return builder.build(); } @@ -210,10 +219,31 @@ private MqttConnectOptions convertParams(MqttLib.ConnectionParams connectionPara return connectionOptions; } - private void disconnectAndClose(long timeout) throws org.eclipse.paho.client.mqttv3.MqttException { + private void disconnectAndClose(long timeout) throws org.eclipse.paho.client.mqttv3.MqttException, MqttException { try { - mqttClient.disconnectForcibly(timeout); + final long deadline = System.nanoTime() + timeout * 1_000_000_000; + + if (isConnected.compareAndSet(true, false)) { + mqttClient.disconnectForcibly(timeout); + } else { + logger.atWarn().log("DISCONNECT was not sent on the dead connection"); + } + + long remaining = deadline - System.nanoTime(); + if (remaining < MIN_SHUTDOWN_NS) { + remaining = MIN_SHUTDOWN_NS; + } + + executorService.shutdown(); + if (!executorService.awaitTermination(remaining, TimeUnit.NANOSECONDS)) { + executorService.shutdownNow(); + } + logger.atInfo().log("MQTT 3.1.1 connection {} has been disconnected", connectionId); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.atError().withThrowable(e).log(EXCEPTION_WHEN_DISCONNECTING); + throw new MqttException(EXCEPTION_WHEN_DISCONNECTING, e); } finally { mqttClient.close(); } @@ -273,6 +303,7 @@ class MqttCallbackImpl implements MqttCallback { @SuppressWarnings("PMD.AvoidCatchingGenericException") @Override public void connectionLost(Throwable throwable) { + isConnected.set(false); // only unsolicited disconnect if (isClosing.get()) { logger.atWarn().log("DISCONNECT event ignored due to shutdown initiated"); @@ -322,4 +353,19 @@ private void processMessage(String topic, MqttMessage mqttMessage) { connectionId, topic, mqttMessage.getQos(), mqttMessage.isRetained()); } } + + /** + * Checks connection state. + * + * @throws MqttException when connection state is not allow opertation + */ + private void stateCheck() throws MqttException { + if (!isConnected.get()) { + throw new MqttException("MQTT client is not in connected state"); + } + + if (isClosing.get()) { + throw new MqttException("MQTT connection is closing"); + } + } } diff --git a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/MqttConnection.java b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/MqttConnection.java index d48bef89c..a8942c12c 100644 --- a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/MqttConnection.java +++ b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/MqttConnection.java @@ -151,9 +151,10 @@ class Message { * @param subscriptions list of subscriptions * @param userProperties list of user's properties MQTT v5.0 * @return useful information from SUBACK packet + * @throws MqttException on errors */ MqttSubscribeReply subscribe(long timeout, @NonNull List subscriptions, - List userProperties); + List userProperties) throws MqttException; /** * Closes MQTT connection. @@ -171,8 +172,9 @@ MqttSubscribeReply subscribe(long timeout, @NonNull List subscript * @param timeout publish operation timeout in seconds * @param message message to publish * @return useful information from PUBACK packet or null of no PUBACK has been received (as for QoS 0) + * @throws MqttException on errors */ - MqttPublishReply publish(long timeout, @NonNull Message message); + MqttPublishReply publish(long timeout, @NonNull Message message) throws MqttException; /** * Unsubscribes from topics. @@ -181,9 +183,10 @@ MqttSubscribeReply subscribe(long timeout, @NonNull List subscript * @param filters list of topic filter to unsubscribe * @param userProperties list of user's properties MQTT v5.0 * @return useful information from UNSUBACK packet + * @throws MqttException on errors */ MqttSubscribeReply unsubscribe(long timeout, @NonNull List filters, - List userProperties); + List userProperties) throws MqttException; /** diff --git a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCControlServer.java b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCControlServer.java index 20dce6fa4..3cfad7ca2 100644 --- a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCControlServer.java +++ b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCControlServer.java @@ -314,11 +314,19 @@ public void publishMqtt(MqttPublishRequest request, StreamObserver userProperties = request.getPropertiesList(); - MqttSubscribeReply subscribeReply = connection.subscribe(timeout, outSubscriptions, userProperties); - if (subscribeReply != null) { - logger.atInfo().log("Subscribe response: connectionId {} reason codes {} reason string '{}'", - connectionId, subscribeReply.getReasonCodesList(), subscribeReply.getReasonString()); + MqttSubscribeReply subscribeReply; + try { + subscribeReply = connection.subscribe(timeout, outSubscriptions, userProperties); + if (subscribeReply != null) { + logger.atInfo().log("Subscribe response: connectionId {} reason codes {} reason string '{}'", + connectionId, subscribeReply.getReasonCodesList(), subscribeReply.getReasonString()); + } + } catch (MqttException e) { + logger.atError().withThrowable(e).log("exception during subscribe"); + responseObserver.onError(e); + return; } + responseObserver.onNext(subscribeReply); responseObserver.onCompleted(); } @@ -487,12 +503,19 @@ public void unsubscribeMqtt(MqttUnsubscribeRequest request, logger.atInfo().log("Unsubscribe: connectionId {} for {} filters", connectionId, filters); List userProperties = request.getPropertiesList(); - MqttSubscribeReply unsubscribeReply = connection.unsubscribe(timeout, filters, userProperties); - - if (unsubscribeReply != null) { - logger.atInfo().log("Unsubscribe response: connectionId {} reason codes {} reason string '{}'", - connectionId, unsubscribeReply.getReasonCodesList(), unsubscribeReply.getReasonString()); + MqttSubscribeReply unsubscribeReply; + try { + unsubscribeReply = connection.unsubscribe(timeout, filters, userProperties); + if (unsubscribeReply != null) { + logger.atInfo().log("Unsubscribe response: connectionId {} reason codes {} reason string '{}'", + connectionId, unsubscribeReply.getReasonCodesList(), unsubscribeReply.getReasonString()); + } + } catch (MqttException e) { + logger.atError().withThrowable(e).log("exception during unsubscribe"); + responseObserver.onError(e); + return; } + responseObserver.onNext(unsubscribeReply); responseObserver.onCompleted(); } diff --git a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java index 6a3f28704..be31f1153 100644 --- a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java +++ b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java @@ -48,6 +48,7 @@ public class MqttConnectionImpl implements MqttConnection { private static final Logger logger = LogManager.getLogger(MqttConnectionImpl.class); private final AtomicBoolean isClosing = new AtomicBoolean(); + private final AtomicBoolean isConnected = new AtomicBoolean(); private final GRPCClient grpcClient; private final IMqttAsyncClient client; private final ExecutorService executorService = Executors.newSingleThreadExecutor(); @@ -80,6 +81,7 @@ public ConnectResult start(MqttLib.ConnectionParams connectionParams, int connec client.setCallback(new MqttCallbackImpl()); token.waitForCompletion(TimeUnit.SECONDS.toMillis(connectionParams.getConnectionTimeout())); success = true; + isConnected.set(true); return buildConnectResult(true, token, null); } catch (org.eclipse.paho.mqttv5.common.MqttException ex) { logger.atError().withThrowable(ex).log("Exception occurred during connect reason code {}", @@ -101,7 +103,9 @@ public ConnectResult start(MqttLib.ConnectionParams connectionParams, int connec @Override public MqttSubscribeReply subscribe(long timeout, @NonNull List subscriptions, - List userProperties) { + List userProperties) throws MqttException { + stateCheck(); + MqttSubscription[] mqttSubscriptions = new MqttSubscription[subscriptions.size()]; MqttMessageListener[] listeners = new MqttMessageListener[subscriptions.size()]; for (int i = 0; i < subscriptions.size(); i++) { @@ -142,8 +146,8 @@ public MqttSubscribeReply subscribe(long timeout, @NonNull List su } } } catch (org.eclipse.paho.mqttv5.common.MqttException e) { - builder.addReasonCodes(e.getReasonCode()); - builder.setReasonString(e.getMessage()); + logger.atError().withThrowable(e).log("Failed during subscribing"); + throw new MqttException("Could not subscribe", e); } return builder.build(); } @@ -161,7 +165,9 @@ public void disconnect(long timeout, int reasonCode, List userP } @Override - public MqttPublishReply publish(long timeout, @NonNull Message message) { + public MqttPublishReply publish(long timeout, @NonNull Message message) throws MqttException { + stateCheck(); + MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(message.getQos()); mqttMessage.setPayload(message.getPayload()); @@ -197,15 +203,16 @@ public MqttPublishReply publish(long timeout, @NonNull Message message) { logger.atError().withThrowable(ex) .log("Failed during publishing message with reasonCode {} and reasonString {}", ex.getReasonCode(), ex.getMessage()); - builder.setReasonCode(ex.getReasonCode()); - builder.setReasonString(ex.getMessage()); + throw new MqttException("Could not publish", ex); } return builder.build(); } @Override public MqttSubscribeReply unsubscribe(long timeout, @NonNull List filters, - List userProperties) { + List userProperties) throws MqttException { + stateCheck(); + String[] filterArray = new String[filters.size()]; for (int i = 0; i < filters.size(); i++) { filterArray[i] = filters.get(i); @@ -245,8 +252,7 @@ public MqttSubscribeReply unsubscribe(long timeout, @NonNull List filter } } catch (org.eclipse.paho.mqttv5.common.MqttException e) { logger.atError().withThrowable(e).log("Failed during unsubscribe"); - builder.addReasonCodes(e.getReasonCode()); - builder.setReasonString(e.getMessage()); + throw new MqttException("Could not unsubscribe", e); } return builder.build(); } @@ -266,13 +272,34 @@ private IMqttAsyncClient createAsyncClient(MqttLib.ConnectionParams connectionPa private void disconnectAndClose(long timeout, int reasonCode, List userProperties) throws org.eclipse.paho.mqttv5.common.MqttException { MqttProperties properties = new MqttProperties(); + if (userProperties != null && !userProperties.isEmpty()) { properties.setUserProperties(convertToUserProperties(userProperties)); userProperties.forEach(p -> logger.atInfo() .log("Disconnect MQTT userProperties: {}, {}", p.getKey(), p.getValue())); } + try { - client.disconnectForcibly(QUIESCE_TIMEOUT, timeout, reasonCode, properties); + if (isConnected.compareAndSet(true, false)) { + client.disconnectForcibly(QUIESCE_TIMEOUT, timeout, reasonCode, properties); + } else { + logger.atWarn().log("DISCONNECT was not sent on the dead connection"); + } + + final long deadline = System.nanoTime() + timeout * 1_000_000_000; + + long remaining = deadline - System.nanoTime(); + if (remaining < MIN_SHUTDOWN_NS) { + remaining = MIN_SHUTDOWN_NS; + } + + executorService.shutdown(); + if (!executorService.awaitTermination(remaining, TimeUnit.NANOSECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); } finally { client.close(); } @@ -459,6 +486,7 @@ class MqttCallbackImpl implements MqttCallback { @Override @SuppressWarnings("PMD.AvoidCatchingGenericException") public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) { + isConnected.set(false); GRPCClient.DisconnectInfo disconnectInfo = convertDisconnectPacket(mqttDisconnectResponse); final String errorString = mqttDisconnectResponse.getException() == null @@ -587,4 +615,19 @@ private GRPCClient.DisconnectInfo convertDisconnectPacket(MqttDisconnectResponse userProperties ); } + + /** + * Checks connection state. + * + * @throws MqttException when connection state is not allow opertation + */ + private void stateCheck() throws MqttException { + if (!isConnected.get()) { + throw new MqttException("MQTT client is not in connected state"); + } + + if (isClosing.get()) { + throw new MqttException("MQTT connection is closing"); + } + } }