Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(uat): add callback for disconnect #373

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;

Expand Down Expand Up @@ -70,8 +71,11 @@ public ConnectResult start(MqttLib.ConnectionParams connectionParams, int connec
this.connectionId = connectionId;
try {
MqttConnectOptions connectOptions = convertParams(connectionParams);

IMqttToken token = mqttClient.connect(connectOptions);
mqttClient.setCallback(new MqttCallbackImpl());
token.waitForCompletion(TimeUnit.SECONDS.toMillis(connectionParams.getConnectionTimeout()));

logger.atInfo().log("MQTT 3.1.1 connection {} is establisted", connectionId);
return buildConnectResult(true, token.isComplete());
} catch (org.eclipse.paho.client.mqttv3.MqttException e) {
Expand Down Expand Up @@ -222,26 +226,9 @@ private static ConnectResult buildConnectResult(boolean success, Boolean session

private class MqttMessageListener implements IMqttMessageListener {

@SuppressWarnings("PMD.AvoidCatchingGenericException")
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
if (isClosing.get()) {
logger.atWarn().log("PIBLISH event ignored due to shutdown initiated");
} else {
GRPCClient.MqttReceivedMessage message = new GRPCClient.MqttReceivedMessage(
mqttMessage.getQos(), mqttMessage.isRetained(), topic, mqttMessage.getPayload(),
null, null, null,null, null, null);
executorService.submit(() -> {
try {
grpcClient.onReceiveMqttMessage(connectionId, message);
} catch (Exception ex) {
logger.atError().withThrowable(ex).log("onReceiveMqttMessage failed");
}
});

logger.atInfo().log("Received MQTT message: connectionId {} topic '{}' QoS {} retain {}",
connectionId, topic, mqttMessage.getQos(), mqttMessage.isRetained());
}
public void messageArrived(String topic, MqttMessage mqttMessage) {
processMessage(topic, mqttMessage);
}
}

Expand Down Expand Up @@ -280,4 +267,59 @@ private void checkCorrelationData(byte[] correlationData) {
logger.atWarn().log("MQTT v3.1.1 doesn't support correlation data");
}
}

class MqttCallbackImpl implements MqttCallback {

@SuppressWarnings("PMD.AvoidCatchingGenericException")
@Override
public void connectionLost(Throwable throwable) {
// only unsolicited disconnect
if (isClosing.get()) {
logger.atWarn().log("DISCONNECT event ignored due to shutdown initiated");
} else {
GRPCClient.DisconnectInfo disconnectInfo = new GRPCClient.DisconnectInfo(null, null, null, null, null);
executorService.submit(() -> {
try {
grpcClient.onMqttDisconnect(connectionId, disconnectInfo, throwable.getMessage());
} catch (Exception ex) {
logger.atError().withThrowable(ex).log("onMqttDisconnect failed");
}
});
}

logger.atInfo().log("MQTT connection {} interrupted, error code {}", connectionId, throwable.getMessage());

}

@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
processMessage(topic, mqttMessage);
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.atInfo().log("Delivery completion is {}", token.isComplete());
}
}

@SuppressWarnings("PMD.AvoidCatchingGenericException")
private void processMessage(String topic, MqttMessage mqttMessage) {
if (isClosing.get()) {
logger.atWarn().log("PUBLISH event ignored due to shutdown initiated");
} else {
GRPCClient.MqttReceivedMessage message = new GRPCClient.MqttReceivedMessage(
mqttMessage.getQos(), mqttMessage.isRetained(), topic, mqttMessage.getPayload(),
null, null, null,null, null, null);
executorService.submit(() -> {
try {
grpcClient.onReceiveMqttMessage(connectionId, message);
} catch (Exception ex) {
logger.atError().withThrowable(ex).log("onReceiveMqttMessage failed");
}
});

logger.atInfo().log("Received MQTT message: connectionId {} topic '{}' QoS {} retain {}",
connectionId, topic, mqttMessage.getQos(), mqttMessage.isRetained());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.aws.greengrass.testing.mqtt.client.DiscoveryRequest;
import com.aws.greengrass.testing.mqtt.client.Mqtt5Disconnect;
import com.aws.greengrass.testing.mqtt.client.Mqtt5Message;
import com.aws.greengrass.testing.mqtt.client.Mqtt5Properties;
import com.aws.greengrass.testing.mqtt.client.MqttAgentDiscoveryGrpc;
import com.aws.greengrass.testing.mqtt.client.MqttConnectionId;
import com.aws.greengrass.testing.mqtt.client.MqttQoS;
Expand All @@ -27,6 +28,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;

/**
* Implementation of gRPC client used to discover agent.
*/
Expand Down Expand Up @@ -165,14 +168,39 @@ public void onReceiveMqttMessage(int connectionId, MqttReceivedMessage message)
@Override
public void onMqttDisconnect(int connectionId, DisconnectInfo disconnectInfo, String error) {
OnMqttDisconnectRequest.Builder builder = OnMqttDisconnectRequest.newBuilder()
.setAgentId(agentId)
.setConnectionId(MqttConnectionId.newBuilder().setConnectionId(connectionId)
.build());
.setAgentId(agentId)
.setConnectionId(MqttConnectionId.newBuilder().setConnectionId(connectionId).build());
if (disconnectInfo != null) {
// TODO: fill
Mqtt5Disconnect disconnect = Mqtt5Disconnect.newBuilder().build();
builder.setDisconnect(disconnect);
Mqtt5Disconnect.Builder disconnectBuilder = Mqtt5Disconnect.newBuilder();

final Integer reasonCode = disconnectInfo.getReasonCode();
if (reasonCode != null) {
disconnectBuilder.setReasonCode(reasonCode);
}

final Integer sessionExpiryInterval = disconnectInfo.getSessionExpiryInterval();
if (sessionExpiryInterval != null) {
disconnectBuilder.setSessionExpiryInterval(sessionExpiryInterval);
}

final String reasonString = disconnectInfo.getReasonString();
if (reasonString != null) {
disconnectBuilder.setReasonString(reasonString);
}

final String serverReference = disconnectInfo.getServerReference();
if (serverReference != null) {
disconnectBuilder.setServerReference(serverReference);
}

final List<Mqtt5Properties> userProperties = disconnectInfo.getUserProperties();
if (userProperties != null && !userProperties.isEmpty()) {
disconnectBuilder.addAllProperties(userProperties);
}

builder.setDisconnect(disconnectBuilder.build());
}

if (error != null) {
builder.setError(error);
}
Expand All @@ -182,6 +210,7 @@ public void onMqttDisconnect(int connectionId, DisconnectInfo disconnectInfo, St
} catch (StatusRuntimeException ex) {
logger.atError().withThrowable(ex).log(GRPC_REQUEST_FAILED);
}

}

/**
Expand Down
Loading
Loading