diff --git a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java index b0bd181e9..c7cf0cce2 100644 --- a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java +++ b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt311/client/paho/Mqtt311ConnectionImpl.java @@ -21,6 +21,7 @@ import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttAsyncClient; +import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage; @@ -70,8 +71,11 @@ public ConnectResult start(MqttLib.ConnectionParams connectionParams, int connec this.connectionId = connectionId; try { MqttConnectOptions connectOptions = convertParams(connectionParams); + IMqttToken token = mqttClient.connect(connectOptions); + mqttClient.setCallback(new MqttCallbackImpl()); token.waitForCompletion(TimeUnit.SECONDS.toMillis(connectionParams.getConnectionTimeout())); + logger.atInfo().log("MQTT 3.1.1 connection {} is establisted", connectionId); return buildConnectResult(true, token.isComplete()); } catch (org.eclipse.paho.client.mqttv3.MqttException e) { @@ -222,26 +226,9 @@ private static ConnectResult buildConnectResult(boolean success, Boolean session private class MqttMessageListener implements IMqttMessageListener { - @SuppressWarnings("PMD.AvoidCatchingGenericException") @Override - public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { - if (isClosing.get()) { - logger.atWarn().log("PIBLISH event ignored due to shutdown initiated"); - } else { - GRPCClient.MqttReceivedMessage message = new GRPCClient.MqttReceivedMessage( - mqttMessage.getQos(), mqttMessage.isRetained(), topic, mqttMessage.getPayload(), - null, null, null,null, null, null); - executorService.submit(() -> { - try { - grpcClient.onReceiveMqttMessage(connectionId, message); - } catch (Exception ex) { - logger.atError().withThrowable(ex).log("onReceiveMqttMessage failed"); - } - }); - - logger.atInfo().log("Received MQTT message: connectionId {} topic '{}' QoS {} retain {}", - connectionId, topic, mqttMessage.getQos(), mqttMessage.isRetained()); - } + public void messageArrived(String topic, MqttMessage mqttMessage) { + processMessage(topic, mqttMessage); } } @@ -280,4 +267,59 @@ private void checkCorrelationData(byte[] correlationData) { logger.atWarn().log("MQTT v3.1.1 doesn't support correlation data"); } } + + class MqttCallbackImpl implements MqttCallback { + + @SuppressWarnings("PMD.AvoidCatchingGenericException") + @Override + public void connectionLost(Throwable throwable) { + // only unsolicited disconnect + if (isClosing.get()) { + logger.atWarn().log("DISCONNECT event ignored due to shutdown initiated"); + } else { + GRPCClient.DisconnectInfo disconnectInfo = new GRPCClient.DisconnectInfo(null, null, null, null, null); + executorService.submit(() -> { + try { + grpcClient.onMqttDisconnect(connectionId, disconnectInfo, throwable.getMessage()); + } catch (Exception ex) { + logger.atError().withThrowable(ex).log("onMqttDisconnect failed"); + } + }); + } + + logger.atInfo().log("MQTT connection {} interrupted, error code {}", connectionId, throwable.getMessage()); + + } + + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) { + processMessage(topic, mqttMessage); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + logger.atInfo().log("Delivery completion is {}", token.isComplete()); + } + } + + @SuppressWarnings("PMD.AvoidCatchingGenericException") + private void processMessage(String topic, MqttMessage mqttMessage) { + if (isClosing.get()) { + logger.atWarn().log("PUBLISH event ignored due to shutdown initiated"); + } else { + GRPCClient.MqttReceivedMessage message = new GRPCClient.MqttReceivedMessage( + mqttMessage.getQos(), mqttMessage.isRetained(), topic, mqttMessage.getPayload(), + null, null, null,null, null, null); + executorService.submit(() -> { + try { + grpcClient.onReceiveMqttMessage(connectionId, message); + } catch (Exception ex) { + logger.atError().withThrowable(ex).log("onReceiveMqttMessage failed"); + } + }); + + logger.atInfo().log("Received MQTT message: connectionId {} topic '{}' QoS {} retain {}", + connectionId, topic, mqttMessage.getQos(), mqttMessage.isRetained()); + } + } } diff --git a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCDiscoveryClient.java b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCDiscoveryClient.java index d29e643fe..bf99b5b2e 100644 --- a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCDiscoveryClient.java +++ b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/grpc/GRPCDiscoveryClient.java @@ -8,6 +8,7 @@ import com.aws.greengrass.testing.mqtt.client.DiscoveryRequest; import com.aws.greengrass.testing.mqtt.client.Mqtt5Disconnect; import com.aws.greengrass.testing.mqtt.client.Mqtt5Message; +import com.aws.greengrass.testing.mqtt.client.Mqtt5Properties; import com.aws.greengrass.testing.mqtt.client.MqttAgentDiscoveryGrpc; import com.aws.greengrass.testing.mqtt.client.MqttConnectionId; import com.aws.greengrass.testing.mqtt.client.MqttQoS; @@ -27,6 +28,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; + /** * Implementation of gRPC client used to discover agent. */ @@ -165,14 +168,39 @@ public void onReceiveMqttMessage(int connectionId, MqttReceivedMessage message) @Override public void onMqttDisconnect(int connectionId, DisconnectInfo disconnectInfo, String error) { OnMqttDisconnectRequest.Builder builder = OnMqttDisconnectRequest.newBuilder() - .setAgentId(agentId) - .setConnectionId(MqttConnectionId.newBuilder().setConnectionId(connectionId) - .build()); + .setAgentId(agentId) + .setConnectionId(MqttConnectionId.newBuilder().setConnectionId(connectionId).build()); if (disconnectInfo != null) { - // TODO: fill - Mqtt5Disconnect disconnect = Mqtt5Disconnect.newBuilder().build(); - builder.setDisconnect(disconnect); + Mqtt5Disconnect.Builder disconnectBuilder = Mqtt5Disconnect.newBuilder(); + + final Integer reasonCode = disconnectInfo.getReasonCode(); + if (reasonCode != null) { + disconnectBuilder.setReasonCode(reasonCode); + } + + final Integer sessionExpiryInterval = disconnectInfo.getSessionExpiryInterval(); + if (sessionExpiryInterval != null) { + disconnectBuilder.setSessionExpiryInterval(sessionExpiryInterval); + } + + final String reasonString = disconnectInfo.getReasonString(); + if (reasonString != null) { + disconnectBuilder.setReasonString(reasonString); + } + + final String serverReference = disconnectInfo.getServerReference(); + if (serverReference != null) { + disconnectBuilder.setServerReference(serverReference); + } + + final List userProperties = disconnectInfo.getUserProperties(); + if (userProperties != null && !userProperties.isEmpty()) { + disconnectBuilder.addAllProperties(userProperties); + } + + builder.setDisconnect(disconnectBuilder.build()); } + if (error != null) { builder.setError(error); } @@ -182,6 +210,7 @@ public void onMqttDisconnect(int connectionId, DisconnectInfo disconnectInfo, St } catch (StatusRuntimeException ex) { logger.atError().withThrowable(ex).log(GRPC_REQUEST_FAILED); } + } /** diff --git a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java index c327f2193..6a3f28704 100644 --- a/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java +++ b/uat/custom-components/client-java-paho/src/main/java/com/aws/greengrass/testing/mqtt5/client/paho/MqttConnectionImpl.java @@ -20,7 +20,9 @@ import org.eclipse.paho.mqttv5.client.IMqttMessageListener; import org.eclipse.paho.mqttv5.client.IMqttToken; import org.eclipse.paho.mqttv5.client.MqttAsyncClient; +import org.eclipse.paho.mqttv5.client.MqttCallback; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.MqttSubscription; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; @@ -75,6 +77,7 @@ public ConnectResult start(MqttLib.ConnectionParams connectionParams, int connec try { MqttConnectionOptions connectionOptions = convertConnectParams(connectionParams); IMqttToken token = client.connect(connectionOptions); + client.setCallback(new MqttCallbackImpl()); token.waitForCompletion(TimeUnit.SECONDS.toMillis(connectionParams.getConnectionTimeout())); success = true; return buildConnectResult(true, token, null); @@ -414,66 +417,9 @@ private static Integer convertLongToInteger(Long value) { } private class MqttMessageListener implements IMqttMessageListener { - @SuppressWarnings("PMD.AvoidCatchingGenericException") @Override public void messageArrived(String topic, MqttMessage mqttMessage) { - MqttProperties receivedProperties = mqttMessage.getProperties(); - - String contentType = null; - Boolean payloadFormatIndicator = null; - Integer messageExpiryInterval = null; - String responseTopic = null; - byte[] correlationData = null; - if (receivedProperties != null) { - contentType = receivedProperties.getContentType(); - payloadFormatIndicator = receivedProperties.getPayloadFormat(); - if (receivedProperties.getMessageExpiryInterval() != null) { - messageExpiryInterval = receivedProperties.getMessageExpiryInterval().intValue(); - } - responseTopic = receivedProperties.getResponseTopic(); - correlationData = receivedProperties.getCorrelationData(); - } - - List userProps = convertToMqtt5Properties(receivedProperties); - - if (isClosing.get()) { - logger.atWarn().log("PIBLISH event ignored due to shutdown initiated"); - } else { - GRPCClient.MqttReceivedMessage message = new GRPCClient.MqttReceivedMessage( - mqttMessage.getQos(), mqttMessage.isRetained(), topic, - mqttMessage.getPayload(), userProps, contentType, payloadFormatIndicator, - messageExpiryInterval, responseTopic, correlationData); - executorService.submit(() -> { - try { - grpcClient.onReceiveMqttMessage(connectionId, message); - } catch (Exception ex) { - logger.atError().withThrowable(ex).log("onReceiveMqttMessage failed"); - } - }); - } - - logger.atInfo().log("Received MQTT message: connectionId {} topic '{}' QoS {} retain {}", - connectionId, topic, mqttMessage.getQos(), mqttMessage.isRetained()); - - if (userProps != null) { - userProps.forEach(p -> logger.atInfo() - .log("Received MQTT userProperties: {}, {}", p.getKey(), p.getValue())); - } - if (contentType != null) { - logger.atInfo().log("Received MQTT message has content type '{}'", contentType); - } - if (payloadFormatIndicator != null) { - logger.atInfo().log("Received MQTT message has payload format indicator '{}'", payloadFormatIndicator); - } - if (messageExpiryInterval != null) { - logger.atInfo().log("Received MQTT message has message expiry interval {}", messageExpiryInterval); - } - if (responseTopic != null) { - logger.atInfo().log("Received MQTT message has response topic: {}", responseTopic); - } - if (correlationData != null) { - logger.atInfo().log("Received MQTT message has correlation data: {}", correlationData); - } + processMessage(topic, mqttMessage); } } @@ -507,4 +453,138 @@ private static List getAckUserProperties(List use } return userProperties; } + + class MqttCallbackImpl implements MqttCallback { + + @Override + @SuppressWarnings("PMD.AvoidCatchingGenericException") + public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) { + GRPCClient.DisconnectInfo disconnectInfo = convertDisconnectPacket(mqttDisconnectResponse); + + final String errorString = mqttDisconnectResponse.getException() == null + ? null : mqttDisconnectResponse.getException().getMessage(); + + // only unsolicited disconnect + if (isClosing.get()) { + logger.atWarn().log("DISCONNECT event ignored due to shutdown initiated"); + } else { + executorService.submit(() -> { + try { + grpcClient.onMqttDisconnect(connectionId, disconnectInfo, errorString); + } catch (Exception ex) { + logger.atError().withThrowable(ex).log("onMqttDisconnect failed"); + } + }); + } + + logger.atInfo().log("MQTT connectionId {} disconnected error '{}' disconnectInfo '{}'", + connectionId, errorString, disconnectInfo); + } + + @Override + public void mqttErrorOccurred(org.eclipse.paho.mqttv5.common.MqttException e) { + logger.error("Client error with reason code {} and message '{}'", e.getReasonCode(), e.getMessage()); + } + + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) { + processMessage(topic, mqttMessage); + } + + @Override + public void deliveryComplete(IMqttToken token) { + logger.atInfo().log("Delivery completion is {}", token.isComplete()); + } + + @Override + public void connectComplete(boolean reconnect, String s) { + logger.atInfo().log("Connection completed"); + } + + @Override + public void authPacketArrived(int i, MqttProperties mqttProperties) { + logger.atInfo().log("Connection completed"); + } + } + + @SuppressWarnings("PMD.AvoidCatchingGenericException") + private void processMessage(String topic, MqttMessage mqttMessage) { + MqttProperties receivedProperties = mqttMessage.getProperties(); + + String contentType = null; + Boolean payloadFormatIndicator = null; + Integer messageExpiryInterval = null; + String responseTopic = null; + byte[] correlationData = null; + if (receivedProperties != null) { + contentType = receivedProperties.getContentType(); + payloadFormatIndicator = receivedProperties.getPayloadFormat(); + if (receivedProperties.getMessageExpiryInterval() != null) { + messageExpiryInterval = receivedProperties.getMessageExpiryInterval().intValue(); + } + responseTopic = receivedProperties.getResponseTopic(); + correlationData = receivedProperties.getCorrelationData(); + } + + List userProps = convertToMqtt5Properties(receivedProperties); + + if (isClosing.get()) { + logger.atWarn().log("PUBLISH event ignored due to shutdown initiated"); + } else { + GRPCClient.MqttReceivedMessage message = new GRPCClient.MqttReceivedMessage( + mqttMessage.getQos(), mqttMessage.isRetained(), topic, + mqttMessage.getPayload(), userProps, contentType, payloadFormatIndicator, + messageExpiryInterval, responseTopic, correlationData); + executorService.submit(() -> { + try { + grpcClient.onReceiveMqttMessage(connectionId, message); + } catch (Exception ex) { + logger.atError().withThrowable(ex).log("onReceiveMqttMessage failed"); + } + }); + } + + logger.atInfo().log("Received MQTT message: connectionId {} topic '{}' QoS {} retain {}", + connectionId, topic, mqttMessage.getQos(), mqttMessage.isRetained()); + + if (userProps != null) { + userProps.forEach(p -> logger.atInfo() + .log("Received MQTT userProperties: {}, {}", p.getKey(), p.getValue())); + } + if (contentType != null) { + logger.atInfo().log("Received MQTT message has content type '{}'", contentType); + } + if (payloadFormatIndicator != null) { + logger.atInfo().log("Received MQTT message has payload format indicator '{}'", payloadFormatIndicator); + } + if (messageExpiryInterval != null) { + logger.atInfo().log("Received MQTT message has message expiry interval {}", messageExpiryInterval); + } + if (responseTopic != null) { + logger.atInfo().log("Received MQTT message has response topic: {}", responseTopic); + } + if (correlationData != null) { + logger.atInfo().log("Received MQTT message has correlation data: {}", correlationData); + } + } + + private GRPCClient.DisconnectInfo convertDisconnectPacket(MqttDisconnectResponse response) { + if (response == null) { + return null; + } + + + List properties = response.getUserProperties(); + final List userProperties = properties == null + ? null : convertToMqtt5Properties(properties); + + final int reasonCode = response.getReturnCode(); + + return new GRPCClient.DisconnectInfo(reasonCode, + null, + response.getReasonString(), + response.getServerReference(), + userProperties + ); + } }