Skip to content

Commit

Permalink
Merge branch 'uat-dev' into fix-otf
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeDombo authored Jul 19, 2023
2 parents 5862cf3 + 3e78d7f commit ed13430
Show file tree
Hide file tree
Showing 12 changed files with 475 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;

Expand Down Expand Up @@ -70,8 +71,11 @@ public ConnectResult start(MqttLib.ConnectionParams connectionParams, int connec
this.connectionId = connectionId;
try {
MqttConnectOptions connectOptions = convertParams(connectionParams);

IMqttToken token = mqttClient.connect(connectOptions);
mqttClient.setCallback(new MqttCallbackImpl());
token.waitForCompletion(TimeUnit.SECONDS.toMillis(connectionParams.getConnectionTimeout()));

logger.atInfo().log("MQTT 3.1.1 connection {} is establisted", connectionId);
return buildConnectResult(true, token.isComplete());
} catch (org.eclipse.paho.client.mqttv3.MqttException e) {
Expand Down Expand Up @@ -222,26 +226,9 @@ private static ConnectResult buildConnectResult(boolean success, Boolean session

private class MqttMessageListener implements IMqttMessageListener {

@SuppressWarnings("PMD.AvoidCatchingGenericException")
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
if (isClosing.get()) {
logger.atWarn().log("PIBLISH event ignored due to shutdown initiated");
} else {
GRPCClient.MqttReceivedMessage message = new GRPCClient.MqttReceivedMessage(
mqttMessage.getQos(), mqttMessage.isRetained(), topic, mqttMessage.getPayload(),
null, null, null,null, null, null);
executorService.submit(() -> {
try {
grpcClient.onReceiveMqttMessage(connectionId, message);
} catch (Exception ex) {
logger.atError().withThrowable(ex).log("onReceiveMqttMessage failed");
}
});

logger.atInfo().log("Received MQTT message: connectionId {} topic '{}' QoS {} retain {}",
connectionId, topic, mqttMessage.getQos(), mqttMessage.isRetained());
}
public void messageArrived(String topic, MqttMessage mqttMessage) {
processMessage(topic, mqttMessage);
}
}

Expand Down Expand Up @@ -280,4 +267,59 @@ private void checkCorrelationData(byte[] correlationData) {
logger.atWarn().log("MQTT v3.1.1 doesn't support correlation data");
}
}

class MqttCallbackImpl implements MqttCallback {

@SuppressWarnings("PMD.AvoidCatchingGenericException")
@Override
public void connectionLost(Throwable throwable) {
// only unsolicited disconnect
if (isClosing.get()) {
logger.atWarn().log("DISCONNECT event ignored due to shutdown initiated");
} else {
GRPCClient.DisconnectInfo disconnectInfo = new GRPCClient.DisconnectInfo(null, null, null, null, null);
executorService.submit(() -> {
try {
grpcClient.onMqttDisconnect(connectionId, disconnectInfo, throwable.getMessage());
} catch (Exception ex) {
logger.atError().withThrowable(ex).log("onMqttDisconnect failed");
}
});
}

logger.atInfo().log("MQTT connection {} interrupted, error code {}", connectionId, throwable.getMessage());

}

@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
processMessage(topic, mqttMessage);
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.atInfo().log("Delivery completion is {}", token.isComplete());
}
}

@SuppressWarnings("PMD.AvoidCatchingGenericException")
private void processMessage(String topic, MqttMessage mqttMessage) {
if (isClosing.get()) {
logger.atWarn().log("PUBLISH event ignored due to shutdown initiated");
} else {
GRPCClient.MqttReceivedMessage message = new GRPCClient.MqttReceivedMessage(
mqttMessage.getQos(), mqttMessage.isRetained(), topic, mqttMessage.getPayload(),
null, null, null,null, null, null);
executorService.submit(() -> {
try {
grpcClient.onReceiveMqttMessage(connectionId, message);
} catch (Exception ex) {
logger.atError().withThrowable(ex).log("onReceiveMqttMessage failed");
}
});

logger.atInfo().log("Received MQTT message: connectionId {} topic '{}' QoS {} retain {}",
connectionId, topic, mqttMessage.getQos(), mqttMessage.isRetained());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.aws.greengrass.testing.mqtt.client.DiscoveryRequest;
import com.aws.greengrass.testing.mqtt.client.Mqtt5Disconnect;
import com.aws.greengrass.testing.mqtt.client.Mqtt5Message;
import com.aws.greengrass.testing.mqtt.client.Mqtt5Properties;
import com.aws.greengrass.testing.mqtt.client.MqttAgentDiscoveryGrpc;
import com.aws.greengrass.testing.mqtt.client.MqttConnectionId;
import com.aws.greengrass.testing.mqtt.client.MqttQoS;
Expand All @@ -27,6 +28,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;

/**
* Implementation of gRPC client used to discover agent.
*/
Expand Down Expand Up @@ -165,14 +168,39 @@ public void onReceiveMqttMessage(int connectionId, MqttReceivedMessage message)
@Override
public void onMqttDisconnect(int connectionId, DisconnectInfo disconnectInfo, String error) {
OnMqttDisconnectRequest.Builder builder = OnMqttDisconnectRequest.newBuilder()
.setAgentId(agentId)
.setConnectionId(MqttConnectionId.newBuilder().setConnectionId(connectionId)
.build());
.setAgentId(agentId)
.setConnectionId(MqttConnectionId.newBuilder().setConnectionId(connectionId).build());
if (disconnectInfo != null) {
// TODO: fill
Mqtt5Disconnect disconnect = Mqtt5Disconnect.newBuilder().build();
builder.setDisconnect(disconnect);
Mqtt5Disconnect.Builder disconnectBuilder = Mqtt5Disconnect.newBuilder();

final Integer reasonCode = disconnectInfo.getReasonCode();
if (reasonCode != null) {
disconnectBuilder.setReasonCode(reasonCode);
}

final Integer sessionExpiryInterval = disconnectInfo.getSessionExpiryInterval();
if (sessionExpiryInterval != null) {
disconnectBuilder.setSessionExpiryInterval(sessionExpiryInterval);
}

final String reasonString = disconnectInfo.getReasonString();
if (reasonString != null) {
disconnectBuilder.setReasonString(reasonString);
}

final String serverReference = disconnectInfo.getServerReference();
if (serverReference != null) {
disconnectBuilder.setServerReference(serverReference);
}

final List<Mqtt5Properties> userProperties = disconnectInfo.getUserProperties();
if (userProperties != null && !userProperties.isEmpty()) {
disconnectBuilder.addAllProperties(userProperties);
}

builder.setDisconnect(disconnectBuilder.build());
}

if (error != null) {
builder.setError(error);
}
Expand All @@ -182,6 +210,7 @@ public void onMqttDisconnect(int connectionId, DisconnectInfo disconnectInfo, St
} catch (StatusRuntimeException ex) {
logger.atError().withThrowable(ex).log(GRPC_REQUEST_FAILED);
}

}

/**
Expand Down
Loading

0 comments on commit ed13430

Please sign in to comment.