Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(uat): bypass send DISCONNECT on dead connection in SDK client #376

Merged
merged 2 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,17 @@ public ConnectResult start(long timeout, int connectionId) throws MqttException
public void disconnect(long timeout, int reasonCode, List<Mqtt5Properties> userProperties) throws MqttException {

checkUserProperties(userProperties);
if (!isClosing.getAndSet(true)) {
CompletableFuture<Void> disconnnectFuture = connection.disconnect();

if (isClosing.compareAndSet(false, true)) {
try {
final long deadline = System.nanoTime() + timeout * 1_000_000_000;

disconnnectFuture.get(timeout, TimeUnit.SECONDS);
if (isConnected.compareAndSet(true, false)) {
CompletableFuture<Void> disconnnectFuture = connection.disconnect();
disconnnectFuture.get(timeout, TimeUnit.SECONDS);
} else {
logger.atWarn().log("DISCONNECT was not sent on the dead connection");
}

long remaining = deadline - System.nanoTime();
if (remaining < MIN_SHUTDOWN_NS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,23 +277,28 @@ public ConnectResult start(long timeout, int connectionId) throws MqttException
}
}

@SuppressWarnings({"PMD.UseTryWithResources", "PMD.AvoidCatchingGenericException"})
@SuppressWarnings({"PMD.UseTryWithResources", "PMD.AvoidCatchingGenericException", "PMD.CognitiveComplexity"})
@Override
public void disconnect(long timeout, int reasonCode, List<Mqtt5Properties> userProperties) throws MqttException {

if (isClosing.compareAndSet(false, true)) {
final DisconnectPacket.DisconnectReasonCode disconnectReason
= DisconnectPacket.DisconnectReasonCode.getEnumValueFromInteger(reasonCode);
DisconnectPacket.DisconnectPacketBuilder builder = new DisconnectPacket.DisconnectPacketBuilder()
.withReasonCode(disconnectReason);
if (isConnected.compareAndSet(true, false)) {
final DisconnectPacket.DisconnectReasonCode disconnectReason
= DisconnectPacket.DisconnectReasonCode.getEnumValueFromInteger(reasonCode);
DisconnectPacket.DisconnectPacketBuilder builder = new DisconnectPacket.DisconnectPacketBuilder()
.withReasonCode(disconnectReason);

if (userProperties != null && !userProperties.isEmpty()) {
builder.withUserProperties(convertToUserProperties(userProperties, "DISCONNECT"));
}

if (userProperties != null && !userProperties.isEmpty()) {
builder.withUserProperties(convertToUserProperties(userProperties, "DISCONNECT"));
client.stop(builder.build());
} else {
logger.atWarn().log("DISCONNECT was not sent on the dead connection");
}

client.stop(builder.build());

try {
client.close();
final long deadline = System.nanoTime() + timeout * 1_000_000_000;
lifecycleEvents.stoppedFuture.get(timeout, TimeUnit.SECONDS);

Expand All @@ -312,8 +317,6 @@ public void disconnect(long timeout, int reasonCode, List<Mqtt5Properties> userP
} catch (Exception ex) {
logger.atError().withThrowable(ex).log("Failed during disconnecting from MQTT broker");
throw new MqttException("Could not disconnect", ex);
} finally {
client.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ void GIVEN_disconnect_successful_WHEN_disconnect_THEN_client_methods_are_called(
when(connection.disconnect()).thenReturn(connectFuture);
connectFuture.complete(null);

// move to connected state
mqttConnectionImpl.connectionEvents.onConnectionResumed(true);

// WHEN
mqttConnectionImpl.disconnect(timeoutSeconds, reasonCode, null);

Expand All @@ -152,6 +155,9 @@ void GIVEN_timedout_WHEN_disconnect_THEN_exception() throws MqttException {
final CompletableFuture<Void> connectFuture = new CompletableFuture<>();
when(connection.disconnect()).thenReturn(connectFuture);

// move to connected state
mqttConnectionImpl.connectionEvents.onConnectionResumed(true);

// WHEN, THEN
assertThrows(MqttException.class, () -> {
mqttConnectionImpl.disconnect(timeoutSeconds, reasonCode, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ void GIVEN_disconnect_successful_WHEN_disconnect_THEN_client_methods_are_called(

mqttConnectionImpl.lifecycleEvents.onStopped(client, onStoppedReturn);

// move to connected state
mqttConnectionImpl.lifecycleEvents.onConnectionSuccess(client, null);

// WHEN
mqttConnectionImpl.disconnect(timeoutSeconds, reasonCode, null);

Expand All @@ -273,6 +276,9 @@ void GIVEN_timedout_WHEN_disconnect_THEN_exception() throws MqttException {
final long timeoutSeconds = SHORT_TIMEOUT_SEC;
final int reasonCode = 4;

// move to connected state
mqttConnectionImpl.lifecycleEvents.onConnectionSuccess(client, null);

// WHEN
assertThrows(MqttException.class, () -> {
mqttConnectionImpl.disconnect(timeoutSeconds, reasonCode, null);
Expand Down
Loading