Skip to content

Commit

Permalink
fix(uat): add callback for disconnect (#373)
Browse files Browse the repository at this point in the history
  • Loading branch information
auarbekov authored Jul 19, 2023
1 parent 64f9a68 commit 3e78d7f
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;

Expand Down Expand Up @@ -70,8 +71,11 @@ public ConnectResult start(MqttLib.ConnectionParams connectionParams, int connec
this.connectionId = connectionId;
try {
MqttConnectOptions connectOptions = convertParams(connectionParams);

IMqttToken token = mqttClient.connect(connectOptions);
mqttClient.setCallback(new MqttCallbackImpl());
token.waitForCompletion(TimeUnit.SECONDS.toMillis(connectionParams.getConnectionTimeout()));

logger.atInfo().log("MQTT 3.1.1 connection {} is establisted", connectionId);
return buildConnectResult(true, token.isComplete());
} catch (org.eclipse.paho.client.mqttv3.MqttException e) {
Expand Down Expand Up @@ -222,26 +226,9 @@ private static ConnectResult buildConnectResult(boolean success, Boolean session

private class MqttMessageListener implements IMqttMessageListener {

@SuppressWarnings("PMD.AvoidCatchingGenericException")
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
if (isClosing.get()) {
logger.atWarn().log("PIBLISH event ignored due to shutdown initiated");
} else {
GRPCClient.MqttReceivedMessage message = new GRPCClient.MqttReceivedMessage(
mqttMessage.getQos(), mqttMessage.isRetained(), topic, mqttMessage.getPayload(),
null, null, null,null, null, null);
executorService.submit(() -> {
try {
grpcClient.onReceiveMqttMessage(connectionId, message);
} catch (Exception ex) {
logger.atError().withThrowable(ex).log("onReceiveMqttMessage failed");
}
});

logger.atInfo().log("Received MQTT message: connectionId {} topic '{}' QoS {} retain {}",
connectionId, topic, mqttMessage.getQos(), mqttMessage.isRetained());
}
public void messageArrived(String topic, MqttMessage mqttMessage) {
processMessage(topic, mqttMessage);
}
}

Expand Down Expand Up @@ -280,4 +267,59 @@ private void checkCorrelationData(byte[] correlationData) {
logger.atWarn().log("MQTT v3.1.1 doesn't support correlation data");
}
}

class MqttCallbackImpl implements MqttCallback {

@SuppressWarnings("PMD.AvoidCatchingGenericException")
@Override
public void connectionLost(Throwable throwable) {
// only unsolicited disconnect
if (isClosing.get()) {
logger.atWarn().log("DISCONNECT event ignored due to shutdown initiated");
} else {
GRPCClient.DisconnectInfo disconnectInfo = new GRPCClient.DisconnectInfo(null, null, null, null, null);
executorService.submit(() -> {
try {
grpcClient.onMqttDisconnect(connectionId, disconnectInfo, throwable.getMessage());
} catch (Exception ex) {
logger.atError().withThrowable(ex).log("onMqttDisconnect failed");
}
});
}

logger.atInfo().log("MQTT connection {} interrupted, error code {}", connectionId, throwable.getMessage());

}

@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
processMessage(topic, mqttMessage);
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.atInfo().log("Delivery completion is {}", token.isComplete());
}
}

@SuppressWarnings("PMD.AvoidCatchingGenericException")
private void processMessage(String topic, MqttMessage mqttMessage) {
if (isClosing.get()) {
logger.atWarn().log("PUBLISH event ignored due to shutdown initiated");
} else {
GRPCClient.MqttReceivedMessage message = new GRPCClient.MqttReceivedMessage(
mqttMessage.getQos(), mqttMessage.isRetained(), topic, mqttMessage.getPayload(),
null, null, null,null, null, null);
executorService.submit(() -> {
try {
grpcClient.onReceiveMqttMessage(connectionId, message);
} catch (Exception ex) {
logger.atError().withThrowable(ex).log("onReceiveMqttMessage failed");
}
});

logger.atInfo().log("Received MQTT message: connectionId {} topic '{}' QoS {} retain {}",
connectionId, topic, mqttMessage.getQos(), mqttMessage.isRetained());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.aws.greengrass.testing.mqtt.client.DiscoveryRequest;
import com.aws.greengrass.testing.mqtt.client.Mqtt5Disconnect;
import com.aws.greengrass.testing.mqtt.client.Mqtt5Message;
import com.aws.greengrass.testing.mqtt.client.Mqtt5Properties;
import com.aws.greengrass.testing.mqtt.client.MqttAgentDiscoveryGrpc;
import com.aws.greengrass.testing.mqtt.client.MqttConnectionId;
import com.aws.greengrass.testing.mqtt.client.MqttQoS;
Expand All @@ -27,6 +28,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;

/**
* Implementation of gRPC client used to discover agent.
*/
Expand Down Expand Up @@ -165,14 +168,39 @@ public void onReceiveMqttMessage(int connectionId, MqttReceivedMessage message)
@Override
public void onMqttDisconnect(int connectionId, DisconnectInfo disconnectInfo, String error) {
OnMqttDisconnectRequest.Builder builder = OnMqttDisconnectRequest.newBuilder()
.setAgentId(agentId)
.setConnectionId(MqttConnectionId.newBuilder().setConnectionId(connectionId)
.build());
.setAgentId(agentId)
.setConnectionId(MqttConnectionId.newBuilder().setConnectionId(connectionId).build());
if (disconnectInfo != null) {
// TODO: fill
Mqtt5Disconnect disconnect = Mqtt5Disconnect.newBuilder().build();
builder.setDisconnect(disconnect);
Mqtt5Disconnect.Builder disconnectBuilder = Mqtt5Disconnect.newBuilder();

final Integer reasonCode = disconnectInfo.getReasonCode();
if (reasonCode != null) {
disconnectBuilder.setReasonCode(reasonCode);
}

final Integer sessionExpiryInterval = disconnectInfo.getSessionExpiryInterval();
if (sessionExpiryInterval != null) {
disconnectBuilder.setSessionExpiryInterval(sessionExpiryInterval);
}

final String reasonString = disconnectInfo.getReasonString();
if (reasonString != null) {
disconnectBuilder.setReasonString(reasonString);
}

final String serverReference = disconnectInfo.getServerReference();
if (serverReference != null) {
disconnectBuilder.setServerReference(serverReference);
}

final List<Mqtt5Properties> userProperties = disconnectInfo.getUserProperties();
if (userProperties != null && !userProperties.isEmpty()) {
disconnectBuilder.addAllProperties(userProperties);
}

builder.setDisconnect(disconnectBuilder.build());
}

if (error != null) {
builder.setError(error);
}
Expand All @@ -182,6 +210,7 @@ public void onMqttDisconnect(int connectionId, DisconnectInfo disconnectInfo, St
} catch (StatusRuntimeException ex) {
logger.atError().withThrowable(ex).log(GRPC_REQUEST_FAILED);
}

}

/**
Expand Down
Loading

0 comments on commit 3e78d7f

Please sign in to comment.