From 21ad45fafd64ee234280b16284349037f36580ff Mon Sep 17 00:00:00 2001 From: Uarbekov Date: Mon, 17 Jul 2023 13:40:44 +0600 Subject: [PATCH 1/6] fix(uat): add callback for disconnect --- .../client/grpc/GRPCDiscoveryClient.java | 41 +++- .../mqtt5/client/paho/MqttConnectionImpl.java | 193 ++++++++++++------ 2 files changed, 171 insertions(+), 63 deletions(-) diff --git a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCDiscoveryClient.java b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCDiscoveryClient.java index d29e643fe..bf99b5b2e 100644 --- a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCDiscoveryClient.java +++ b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCDiscoveryClient.java @@ -8,6 +8,7 @@ import com.aws.greengrass.testing.mqtt.client.DiscoveryRequest; import com.aws.greengrass.testing.mqtt.client.Mqtt5Disconnect; import com.aws.greengrass.testing.mqtt.client.Mqtt5Message; +import com.aws.greengrass.testing.mqtt.client.Mqtt5Properties; import com.aws.greengrass.testing.mqtt.client.MqttAgentDiscoveryGrpc; import com.aws.greengrass.testing.mqtt.client.MqttConnectionId; import com.aws.greengrass.testing.mqtt.client.MqttQoS; @@ -27,6 +28,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; + /** * Implementation of gRPC client used to discover agent. */ @@ -165,14 +168,39 @@ public void onReceiveMqttMessage(int connectionId, MqttReceivedMessage message) @Override public void onMqttDisconnect(int connectionId, DisconnectInfo disconnectInfo, String error) { OnMqttDisconnectRequest.Builder builder = OnMqttDisconnectRequest.newBuilder() - .setAgentId(agentId) - .setConnectionId(MqttConnectionId.newBuilder().setConnectionId(connectionId) - .build()); + .setAgentId(agentId) + .setConnectionId(MqttConnectionId.newBuilder().setConnectionId(connectionId).build()); if (disconnectInfo != null) { - // TODO: fill - Mqtt5Disconnect disconnect = Mqtt5Disconnect.newBuilder().build(); - builder.setDisconnect(disconnect); + Mqtt5Disconnect.Builder disconnectBuilder = Mqtt5Disconnect.newBuilder(); + + final Integer reasonCode = disconnectInfo.getReasonCode(); + if (reasonCode != null) { + disconnectBuilder.setReasonCode(reasonCode); + } + + final Integer sessionExpiryInterval = disconnectInfo.getSessionExpiryInterval(); + if (sessionExpiryInterval != null) { + disconnectBuilder.setSessionExpiryInterval(sessionExpiryInterval); + } + + final String reasonString = disconnectInfo.getReasonString(); + if (reasonString != null) { + disconnectBuilder.setReasonString(reasonString); + } + + final String serverReference = disconnectInfo.getServerReference(); + if (serverReference != null) { + disconnectBuilder.setServerReference(serverReference); + } + + final List userProperties = disconnectInfo.getUserProperties(); + if (userProperties != null && !userProperties.isEmpty()) { + disconnectBuilder.addAllProperties(userProperties); + } + + builder.setDisconnect(disconnectBuilder.build()); } + if (error != null) { builder.setError(error); } @@ -182,6 +210,7 @@ public void onMqttDisconnect(int connectionId, DisconnectInfo disconnectInfo, St } catch (StatusRuntimeException ex) { logger.atError().withThrowable(ex).log(GRPC_REQUEST_FAILED); } + } /** diff --git a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java index c327f2193..d4ce0cd00 100644 --- a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java +++ b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java @@ -20,7 +20,9 @@ import org.eclipse.paho.mqttv5.client.IMqttMessageListener; import org.eclipse.paho.mqttv5.client.IMqttToken; import org.eclipse.paho.mqttv5.client.MqttAsyncClient; +import org.eclipse.paho.mqttv5.client.MqttCallback; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.MqttSubscription; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; @@ -75,6 +77,7 @@ public ConnectResult start(MqttLib.ConnectionParams connectionParams, int connec try { MqttConnectionOptions connectionOptions = convertConnectParams(connectionParams); IMqttToken token = client.connect(connectionOptions); + client.setCallback(new MqttCallbackImpl()); token.waitForCompletion(TimeUnit.SECONDS.toMillis(connectionParams.getConnectionTimeout())); success = true; return buildConnectResult(true, token, null); @@ -417,63 +420,7 @@ private class MqttMessageListener implements IMqttMessageListener { @SuppressWarnings("PMD.AvoidCatchingGenericException") @Override public void messageArrived(String topic, MqttMessage mqttMessage) { - MqttProperties receivedProperties = mqttMessage.getProperties(); - - String contentType = null; - Boolean payloadFormatIndicator = null; - Integer messageExpiryInterval = null; - String responseTopic = null; - byte[] correlationData = null; - if (receivedProperties != null) { - contentType = receivedProperties.getContentType(); - payloadFormatIndicator = receivedProperties.getPayloadFormat(); - if (receivedProperties.getMessageExpiryInterval() != null) { - messageExpiryInterval = receivedProperties.getMessageExpiryInterval().intValue(); - } - responseTopic = receivedProperties.getResponseTopic(); - correlationData = receivedProperties.getCorrelationData(); - } - - List userProps = convertToMqtt5Properties(receivedProperties); - - if (isClosing.get()) { - logger.atWarn().log("PIBLISH event ignored due to shutdown initiated"); - } else { - GRPCClient.MqttReceivedMessage message = new GRPCClient.MqttReceivedMessage( - mqttMessage.getQos(), mqttMessage.isRetained(), topic, - mqttMessage.getPayload(), userProps, contentType, payloadFormatIndicator, - messageExpiryInterval, responseTopic, correlationData); - executorService.submit(() -> { - try { - grpcClient.onReceiveMqttMessage(connectionId, message); - } catch (Exception ex) { - logger.atError().withThrowable(ex).log("onReceiveMqttMessage failed"); - } - }); - } - - logger.atInfo().log("Received MQTT message: connectionId {} topic '{}' QoS {} retain {}", - connectionId, topic, mqttMessage.getQos(), mqttMessage.isRetained()); - - if (userProps != null) { - userProps.forEach(p -> logger.atInfo() - .log("Received MQTT userProperties: {}, {}", p.getKey(), p.getValue())); - } - if (contentType != null) { - logger.atInfo().log("Received MQTT message has content type '{}'", contentType); - } - if (payloadFormatIndicator != null) { - logger.atInfo().log("Received MQTT message has payload format indicator '{}'", payloadFormatIndicator); - } - if (messageExpiryInterval != null) { - logger.atInfo().log("Received MQTT message has message expiry interval {}", messageExpiryInterval); - } - if (responseTopic != null) { - logger.atInfo().log("Received MQTT message has response topic: {}", responseTopic); - } - if (correlationData != null) { - logger.atInfo().log("Received MQTT message has correlation data: {}", correlationData); - } + MqttConnectionImpl.this.messageArrived(topic, mqttMessage); } } @@ -507,4 +454,136 @@ private static List getAckUserProperties(List use } return userProperties; } + + class MqttCallbackImpl implements MqttCallback { + + @Override + @SuppressWarnings("PMD.AvoidCatchingGenericException") + public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) { + GRPCClient.DisconnectInfo disconnectInfo = convertDisconnectPacket(mqttDisconnectResponse); + + final String errorString = mqttDisconnectResponse.getException() == null + ? null : mqttDisconnectResponse.getException().getMessage(); + + // only unsolicited disconnect + if (isClosing.get()) { + logger.atWarn().log("DISCONNECT event ignored due to shutdown initiated"); + } else { + executorService.submit(() -> { + try { + grpcClient.onMqttDisconnect(connectionId, disconnectInfo, errorString); + } catch (Exception ex) { + logger.atError().withThrowable(ex).log("onMqttDisconnect failed"); + } + }); + } + } + + @Override + public void mqttErrorOccurred(org.eclipse.paho.mqttv5.common.MqttException e) { + logger.error("Client error with reason code {} and message '{}'", e.getReasonCode(), e.getMessage()); + } + + @Override + @SuppressWarnings("PMD.AvoidCatchingGenericException") + public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { + MqttConnectionImpl.this.messageArrived(topic, mqttMessage); + } + + @Override + public void deliveryComplete(IMqttToken token) { + logger.atInfo().log("Delivery completion is {}", token.isComplete()); + } + + @Override + public void connectComplete(boolean reconnect, String s) { + logger.atInfo().log("Connection completed"); + } + + @Override + public void authPacketArrived(int i, MqttProperties mqttProperties) { + logger.atInfo().log("Connection completed"); + } + + private GRPCClient.DisconnectInfo convertDisconnectPacket(MqttDisconnectResponse response) { + if (response == null) { + return null; + } + + + List properties = response.getUserProperties(); + final List userProperties = properties == null + ? null : convertToMqtt5Properties(properties); + + final int reasonCode = response.getReturnCode(); + + return new GRPCClient.DisconnectInfo(reasonCode, + null, + response.getReasonString(), + response.getServerReference(), + userProperties + ); + } + } + + @SuppressWarnings("PMD.AvoidCatchingGenericException") + private void messageArrived(String topic, MqttMessage mqttMessage) { + MqttProperties receivedProperties = mqttMessage.getProperties(); + + String contentType = null; + Boolean payloadFormatIndicator = null; + Integer messageExpiryInterval = null; + String responseTopic = null; + byte[] correlationData = null; + if (receivedProperties != null) { + contentType = receivedProperties.getContentType(); + payloadFormatIndicator = receivedProperties.getPayloadFormat(); + if (receivedProperties.getMessageExpiryInterval() != null) { + messageExpiryInterval = receivedProperties.getMessageExpiryInterval().intValue(); + } + responseTopic = receivedProperties.getResponseTopic(); + correlationData = receivedProperties.getCorrelationData(); + } + + List userProps = convertToMqtt5Properties(receivedProperties); + + if (isClosing.get()) { + logger.atWarn().log("PIBLISH event ignored due to shutdown initiated"); + } else { + GRPCClient.MqttReceivedMessage message = new GRPCClient.MqttReceivedMessage( + mqttMessage.getQos(), mqttMessage.isRetained(), topic, + mqttMessage.getPayload(), userProps, contentType, payloadFormatIndicator, + messageExpiryInterval, responseTopic, correlationData); + executorService.submit(() -> { + try { + grpcClient.onReceiveMqttMessage(connectionId, message); + } catch (Exception ex) { + logger.atError().withThrowable(ex).log("onReceiveMqttMessage failed"); + } + }); + } + + logger.atInfo().log("Received MQTT message: connectionId {} topic '{}' QoS {} retain {}", + connectionId, topic, mqttMessage.getQos(), mqttMessage.isRetained()); + + if (userProps != null) { + userProps.forEach(p -> logger.atInfo() + .log("Received MQTT userProperties: {}, {}", p.getKey(), p.getValue())); + } + if (contentType != null) { + logger.atInfo().log("Received MQTT message has content type '{}'", contentType); + } + if (payloadFormatIndicator != null) { + logger.atInfo().log("Received MQTT message has payload format indicator '{}'", payloadFormatIndicator); + } + if (messageExpiryInterval != null) { + logger.atInfo().log("Received MQTT message has message expiry interval {}", messageExpiryInterval); + } + if (responseTopic != null) { + logger.atInfo().log("Received MQTT message has response topic: {}", responseTopic); + } + if (correlationData != null) { + logger.atInfo().log("Received MQTT message has correlation data: {}", correlationData); + } + } } From d7d56f70d2c51c272e464f48ba3dfa0c6d5019d6 Mon Sep 17 00:00:00 2001 From: Uarbekov Date: Mon, 17 Jul 2023 15:53:11 +0600 Subject: [PATCH 2/6] fix(uat): add callback for paho v3 --- .../client/paho/Mqtt311ConnectionImpl.java | 80 ++++++++++++++----- 1 file changed, 61 insertions(+), 19 deletions(-) diff --git a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java index b0bd181e9..c6b704643 100644 --- a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java +++ b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java @@ -21,6 +21,7 @@ import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttAsyncClient; +import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage; @@ -70,8 +71,11 @@ public ConnectResult start(MqttLib.ConnectionParams connectionParams, int connec this.connectionId = connectionId; try { MqttConnectOptions connectOptions = convertParams(connectionParams); + IMqttToken token = mqttClient.connect(connectOptions); + mqttClient.setCallback(new MqttCallbackImpl()); token.waitForCompletion(TimeUnit.SECONDS.toMillis(connectionParams.getConnectionTimeout())); + logger.atInfo().log("MQTT 3.1.1 connection {} is establisted", connectionId); return buildConnectResult(true, token.isComplete()); } catch (org.eclipse.paho.client.mqttv3.MqttException e) { @@ -222,26 +226,9 @@ private static ConnectResult buildConnectResult(boolean success, Boolean session private class MqttMessageListener implements IMqttMessageListener { - @SuppressWarnings("PMD.AvoidCatchingGenericException") @Override - public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { - if (isClosing.get()) { - logger.atWarn().log("PIBLISH event ignored due to shutdown initiated"); - } else { - GRPCClient.MqttReceivedMessage message = new GRPCClient.MqttReceivedMessage( - mqttMessage.getQos(), mqttMessage.isRetained(), topic, mqttMessage.getPayload(), - null, null, null,null, null, null); - executorService.submit(() -> { - try { - grpcClient.onReceiveMqttMessage(connectionId, message); - } catch (Exception ex) { - logger.atError().withThrowable(ex).log("onReceiveMqttMessage failed"); - } - }); - - logger.atInfo().log("Received MQTT message: connectionId {} topic '{}' QoS {} retain {}", - connectionId, topic, mqttMessage.getQos(), mqttMessage.isRetained()); - } + public void messageArrived(String topic, MqttMessage mqttMessage) { + Mqtt311ConnectionImpl.this.messageArrived(topic, mqttMessage); } } @@ -280,4 +267,59 @@ private void checkCorrelationData(byte[] correlationData) { logger.atWarn().log("MQTT v3.1.1 doesn't support correlation data"); } } + + class MqttCallbackImpl implements MqttCallback { + + @SuppressWarnings("PMD.AvoidCatchingGenericException") + @Override + public void connectionLost(Throwable throwable) { + // only unsolicited disconnect + if (isClosing.get()) { + logger.atWarn().log("DISCONNECT event ignored due to shutdown initiated"); + } else { + GRPCClient.DisconnectInfo disconnectInfo = new GRPCClient.DisconnectInfo(null, null, null, null, null); + executorService.submit(() -> { + try { + grpcClient.onMqttDisconnect(connectionId, disconnectInfo, throwable.getMessage()); + } catch (Exception ex) { + logger.atError().withThrowable(ex).log("onMqttDisconnect failed"); + } + }); + } + + logger.atInfo().log("MQTT connection {} interrupted, error code {}", connectionId, throwable.getMessage()); + + } + + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) { + Mqtt311ConnectionImpl.this.messageArrived(topic, mqttMessage); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + logger.atInfo().log("Delivery completion is {}", token.isComplete()); + } + } + + @SuppressWarnings("PMD.AvoidCatchingGenericException") + private void messageArrived(String topic, MqttMessage mqttMessage) { + if (isClosing.get()) { + logger.atWarn().log("PIBLISH event ignored due to shutdown initiated"); + } else { + GRPCClient.MqttReceivedMessage message = new GRPCClient.MqttReceivedMessage( + mqttMessage.getQos(), mqttMessage.isRetained(), topic, mqttMessage.getPayload(), + null, null, null,null, null, null); + executorService.submit(() -> { + try { + grpcClient.onReceiveMqttMessage(connectionId, message); + } catch (Exception ex) { + logger.atError().withThrowable(ex).log("onReceiveMqttMessage failed"); + } + }); + + logger.atInfo().log("Received MQTT message: connectionId {} topic '{}' QoS {} retain {}", + connectionId, topic, mqttMessage.getQos(), mqttMessage.isRetained()); + } + } } From dfd2cb402305dcd47a03070e302a2a1f0592d8f7 Mon Sep 17 00:00:00 2001 From: Uarbekov Date: Tue, 18 Jul 2023 11:56:41 +0600 Subject: [PATCH 3/6] fix(uat): rename methods message arrived --- .../client/paho/Mqtt311ConnectionImpl.java | 6 +-- .../mqtt5/client/paho/MqttConnectionImpl.java | 50 +++++++++---------- 2 files changed, 27 insertions(+), 29 deletions(-) diff --git a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java index c6b704643..1fad39acb 100644 --- a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java +++ b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java @@ -228,7 +228,7 @@ private class MqttMessageListener implements IMqttMessageListener { @Override public void messageArrived(String topic, MqttMessage mqttMessage) { - Mqtt311ConnectionImpl.this.messageArrived(topic, mqttMessage); + processMessage(topic, mqttMessage); } } @@ -293,7 +293,7 @@ public void connectionLost(Throwable throwable) { @Override public void messageArrived(String topic, MqttMessage mqttMessage) { - Mqtt311ConnectionImpl.this.messageArrived(topic, mqttMessage); + processMessage(topic, mqttMessage); } @Override @@ -303,7 +303,7 @@ public void deliveryComplete(IMqttDeliveryToken token) { } @SuppressWarnings("PMD.AvoidCatchingGenericException") - private void messageArrived(String topic, MqttMessage mqttMessage) { + private void processMessage(String topic, MqttMessage mqttMessage) { if (isClosing.get()) { logger.atWarn().log("PIBLISH event ignored due to shutdown initiated"); } else { diff --git a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java index d4ce0cd00..ab955f692 100644 --- a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java +++ b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java @@ -417,10 +417,9 @@ private static Integer convertLongToInteger(Long value) { } private class MqttMessageListener implements IMqttMessageListener { - @SuppressWarnings("PMD.AvoidCatchingGenericException") @Override public void messageArrived(String topic, MqttMessage mqttMessage) { - MqttConnectionImpl.this.messageArrived(topic, mqttMessage); + processMessage(topic, mqttMessage); } } @@ -485,9 +484,8 @@ public void mqttErrorOccurred(org.eclipse.paho.mqttv5.common.MqttException e) { } @Override - @SuppressWarnings("PMD.AvoidCatchingGenericException") - public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { - MqttConnectionImpl.this.messageArrived(topic, mqttMessage); + public void messageArrived(String topic, MqttMessage mqttMessage) { + processMessage(topic, mqttMessage); } @Override @@ -504,30 +502,10 @@ public void connectComplete(boolean reconnect, String s) { public void authPacketArrived(int i, MqttProperties mqttProperties) { logger.atInfo().log("Connection completed"); } - - private GRPCClient.DisconnectInfo convertDisconnectPacket(MqttDisconnectResponse response) { - if (response == null) { - return null; - } - - - List properties = response.getUserProperties(); - final List userProperties = properties == null - ? null : convertToMqtt5Properties(properties); - - final int reasonCode = response.getReturnCode(); - - return new GRPCClient.DisconnectInfo(reasonCode, - null, - response.getReasonString(), - response.getServerReference(), - userProperties - ); - } } @SuppressWarnings("PMD.AvoidCatchingGenericException") - private void messageArrived(String topic, MqttMessage mqttMessage) { + private void processMessage(String topic, MqttMessage mqttMessage) { MqttProperties receivedProperties = mqttMessage.getProperties(); String contentType = null; @@ -586,4 +564,24 @@ private void messageArrived(String topic, MqttMessage mqttMessage) { logger.atInfo().log("Received MQTT message has correlation data: {}", correlationData); } } + + private GRPCClient.DisconnectInfo convertDisconnectPacket(MqttDisconnectResponse response) { + if (response == null) { + return null; + } + + + List properties = response.getUserProperties(); + final List userProperties = properties == null + ? null : convertToMqtt5Properties(properties); + + final int reasonCode = response.getReturnCode(); + + return new GRPCClient.DisconnectInfo(reasonCode, + null, + response.getReasonString(), + response.getServerReference(), + userProperties + ); + } } From 19c4e7b3d965865703473d0337e89ecc106880e2 Mon Sep 17 00:00:00 2001 From: Uarbekov Date: Tue, 18 Jul 2023 12:01:40 +0600 Subject: [PATCH 4/6] fix(uat): add disconnect log --- .../testing/mqtt5/client/paho/MqttConnectionImpl.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java index ab955f692..2e2dd5ff8 100644 --- a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java +++ b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java @@ -476,6 +476,9 @@ public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) { } }); } + + logger.atInfo().log("MQTT connectionId {} disconnected error '{}' disconnectInfo '{}'", + connectionId, errorString, disconnectInfo); } @Override From db070808332f9ab4699622ef30bf400cca364df7 Mon Sep 17 00:00:00 2001 From: auarbekov <129752166+auarbekov@users.noreply.github.com> Date: Wed, 19 Jul 2023 09:55:41 +0600 Subject: [PATCH 5/6] fix(uat): logging connectionImpl Co-authored-by: Joseph Cosentino --- .../testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java index 1fad39acb..c7cf0cce2 100644 --- a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java +++ b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java @@ -305,7 +305,7 @@ public void deliveryComplete(IMqttDeliveryToken token) { @SuppressWarnings("PMD.AvoidCatchingGenericException") private void processMessage(String topic, MqttMessage mqttMessage) { if (isClosing.get()) { - logger.atWarn().log("PIBLISH event ignored due to shutdown initiated"); + logger.atWarn().log("PUBLISH event ignored due to shutdown initiated"); } else { GRPCClient.MqttReceivedMessage message = new GRPCClient.MqttReceivedMessage( mqttMessage.getQos(), mqttMessage.isRetained(), topic, mqttMessage.getPayload(), From d35d9ec7909eec6b29308ba46a3bbcc363c07765 Mon Sep 17 00:00:00 2001 From: auarbekov <129752166+auarbekov@users.noreply.github.com> Date: Wed, 19 Jul 2023 09:55:54 +0600 Subject: [PATCH 6/6] fix(uat): logging connection impl Co-authored-by: Joseph Cosentino --- .../testing/mqtt5/client/paho/MqttConnectionImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java index 2e2dd5ff8..6a3f28704 100644 --- a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java +++ b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java @@ -529,7 +529,7 @@ private void processMessage(String topic, MqttMessage mqttMessage) { List userProps = convertToMqtt5Properties(receivedProperties); if (isClosing.get()) { - logger.atWarn().log("PIBLISH event ignored due to shutdown initiated"); + logger.atWarn().log("PUBLISH event ignored due to shutdown initiated"); } else { GRPCClient.MqttReceivedMessage message = new GRPCClient.MqttReceivedMessage( mqttMessage.getQos(), mqttMessage.isRetained(), topic,