Skip to content

Commit

Permalink
fix(uat): bypass send DISCONNECT on dead connection in SDK client (#376)
Browse files Browse the repository at this point in the history
  • Loading branch information
bgklika authored Jul 24, 2023
1 parent bb149d0 commit 6d366ab
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,17 @@ public ConnectResult start(long timeout, int connectionId) throws MqttException
public void disconnect(long timeout, int reasonCode, List<Mqtt5Properties> userProperties) throws MqttException {

checkUserProperties(userProperties);
if (!isClosing.getAndSet(true)) {
CompletableFuture<Void> disconnnectFuture = connection.disconnect();

if (isClosing.compareAndSet(false, true)) {
try {
final long deadline = System.nanoTime() + timeout * 1_000_000_000;

disconnnectFuture.get(timeout, TimeUnit.SECONDS);
if (isConnected.compareAndSet(true, false)) {
CompletableFuture<Void> disconnnectFuture = connection.disconnect();
disconnnectFuture.get(timeout, TimeUnit.SECONDS);
} else {
logger.atWarn().log("DISCONNECT was not sent on the dead connection");
}

long remaining = deadline - System.nanoTime();
if (remaining < MIN_SHUTDOWN_NS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,23 +277,28 @@ public ConnectResult start(long timeout, int connectionId) throws MqttException
}
}

@SuppressWarnings({"PMD.UseTryWithResources", "PMD.AvoidCatchingGenericException"})
@SuppressWarnings({"PMD.UseTryWithResources", "PMD.AvoidCatchingGenericException", "PMD.CognitiveComplexity"})
@Override
public void disconnect(long timeout, int reasonCode, List<Mqtt5Properties> userProperties) throws MqttException {

if (isClosing.compareAndSet(false, true)) {
final DisconnectPacket.DisconnectReasonCode disconnectReason
= DisconnectPacket.DisconnectReasonCode.getEnumValueFromInteger(reasonCode);
DisconnectPacket.DisconnectPacketBuilder builder = new DisconnectPacket.DisconnectPacketBuilder()
.withReasonCode(disconnectReason);
if (isConnected.compareAndSet(true, false)) {
final DisconnectPacket.DisconnectReasonCode disconnectReason
= DisconnectPacket.DisconnectReasonCode.getEnumValueFromInteger(reasonCode);
DisconnectPacket.DisconnectPacketBuilder builder = new DisconnectPacket.DisconnectPacketBuilder()
.withReasonCode(disconnectReason);

if (userProperties != null && !userProperties.isEmpty()) {
builder.withUserProperties(convertToUserProperties(userProperties, "DISCONNECT"));
}

if (userProperties != null && !userProperties.isEmpty()) {
builder.withUserProperties(convertToUserProperties(userProperties, "DISCONNECT"));
client.stop(builder.build());
} else {
logger.atWarn().log("DISCONNECT was not sent on the dead connection");
}

client.stop(builder.build());

try {
client.close();
final long deadline = System.nanoTime() + timeout * 1_000_000_000;
lifecycleEvents.stoppedFuture.get(timeout, TimeUnit.SECONDS);

Expand All @@ -312,8 +317,6 @@ public void disconnect(long timeout, int reasonCode, List<Mqtt5Properties> userP
} catch (Exception ex) {
logger.atError().withThrowable(ex).log("Failed during disconnecting from MQTT broker");
throw new MqttException("Could not disconnect", ex);
} finally {
client.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ void GIVEN_disconnect_successful_WHEN_disconnect_THEN_client_methods_are_called(
when(connection.disconnect()).thenReturn(connectFuture);
connectFuture.complete(null);

// move to connected state
mqttConnectionImpl.connectionEvents.onConnectionResumed(true);

// WHEN
mqttConnectionImpl.disconnect(timeoutSeconds, reasonCode, null);

Expand All @@ -152,6 +155,9 @@ void GIVEN_timedout_WHEN_disconnect_THEN_exception() throws MqttException {
final CompletableFuture<Void> connectFuture = new CompletableFuture<>();
when(connection.disconnect()).thenReturn(connectFuture);

// move to connected state
mqttConnectionImpl.connectionEvents.onConnectionResumed(true);

// WHEN, THEN
assertThrows(MqttException.class, () -> {
mqttConnectionImpl.disconnect(timeoutSeconds, reasonCode, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ void GIVEN_disconnect_successful_WHEN_disconnect_THEN_client_methods_are_called(

mqttConnectionImpl.lifecycleEvents.onStopped(client, onStoppedReturn);

// move to connected state
mqttConnectionImpl.lifecycleEvents.onConnectionSuccess(client, null);

// WHEN
mqttConnectionImpl.disconnect(timeoutSeconds, reasonCode, null);

Expand All @@ -273,6 +276,9 @@ void GIVEN_timedout_WHEN_disconnect_THEN_exception() throws MqttException {
final long timeoutSeconds = SHORT_TIMEOUT_SEC;
final int reasonCode = 4;

// move to connected state
mqttConnectionImpl.lifecycleEvents.onConnectionSuccess(client, null);

// WHEN
assertThrows(MqttException.class, () -> {
mqttConnectionImpl.disconnect(timeoutSeconds, reasonCode, null);
Expand Down

0 comments on commit 6d366ab

Please sign in to comment.