From 7f67acf65b1ba150aacee022f523b3759754183d Mon Sep 17 00:00:00 2001 From: saranyailla Date: Thu, 23 May 2024 20:16:56 -0700 Subject: [PATCH] fix: synchronize updating subscriptions --- .../shadowmanager/ShadowManager.java | 38 ++--- .../shadowmanager/sync/CloudDataClient.java | 157 ++++++++---------- .../shadowmanager/ShadowManagerUnitTest.java | 6 +- 3 files changed, 82 insertions(+), 119 deletions(-) diff --git a/src/main/java/com/aws/greengrass/shadowmanager/ShadowManager.java b/src/main/java/com/aws/greengrass/shadowmanager/ShadowManager.java index e7d0d32d..68b9e68b 100644 --- a/src/main/java/com/aws/greengrass/shadowmanager/ShadowManager.java +++ b/src/main/java/com/aws/greengrass/shadowmanager/ShadowManager.java @@ -63,9 +63,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import javax.inject.Inject; @@ -104,10 +101,7 @@ public class ShadowManager extends PluginService { private final CloudDataClient cloudDataClient; private final MqttClient mqttClient; private final PubSubIntegrator pubSubIntegrator; - private final ExecutorService executorService; private final AtomicReference currentStrategy = new AtomicReference<>(DEFAULT_STRATEGY); - // This is used from within the mqtt thread only - private Future mqttCallbackFuture = new CompletableFuture<>(); public final MqttClientConnectionEvents callbacks = new MqttClientConnectionEvents() { @Override public void onConnectionInterrupted(int errorCode) { @@ -116,20 +110,14 @@ public void onConnectionInterrupted(int errorCode) { @Override public void onConnectionResumed(boolean sessionPresent) { + // Make sure that it is non-blocking as it is run on mqtt event loop thread. if (inState(State.RUNNING)) { - handleAsync(() -> startSyncingShadows( - StartSyncInfo.builder().startSyncStrategy(true).updateCloudSubscriptions(true).build())); - } - } + startSyncingShadows(StartSyncInfo.builder().startSyncStrategy(true) + .updateCloudSubscriptions(true).build()); - private void handleAsync(Runnable runnable) { - if (!mqttCallbackFuture.isDone() && !mqttCallbackFuture.isCancelled()) { - mqttCallbackFuture.cancel(true); } - mqttCallbackFuture = executorService.submit(runnable); } }; - private final CallbackEventManager.OnConnectCallback onConnect = callbacks::onConnectionResumed; @Getter(AccessLevel.PUBLIC) @@ -160,18 +148,23 @@ private void handleAsync(Runnable runnable) { * @param cloudDataClient the data client subscribing to cloud shadow topics * @param mqttClient the mqtt client connected to IoT Core * @param direction The sync direction - * @param executorService Executor service */ @SuppressWarnings("PMD.ExcessiveParameterList") @Inject public ShadowManager( Topics topics, - ShadowManagerDatabase database, ShadowManagerDAOImpl dao, - AuthorizationHandlerWrapper authorizationHandlerWrapper, PubSubClientWrapper pubSubClientWrapper, - InboundRateLimiter inboundRateLimiter, DeviceConfiguration deviceConfiguration, - ShadowWriteSynchronizeHelper synchronizeHelper, IotDataPlaneClientWrapper iotDataPlaneClientWrapper, - SyncHandler syncHandler, CloudDataClient cloudDataClient, MqttClient mqttClient, DirectionWrapper direction, - ExecutorService executorService) { + ShadowManagerDatabase database, + ShadowManagerDAOImpl dao, + AuthorizationHandlerWrapper authorizationHandlerWrapper, + PubSubClientWrapper pubSubClientWrapper, + InboundRateLimiter inboundRateLimiter, + DeviceConfiguration deviceConfiguration, + ShadowWriteSynchronizeHelper synchronizeHelper, + IotDataPlaneClientWrapper iotDataPlaneClientWrapper, + SyncHandler syncHandler, + CloudDataClient cloudDataClient, + MqttClient mqttClient, + DirectionWrapper direction) { super(topics); this.database = database; this.authorizationHandlerWrapper = authorizationHandlerWrapper; @@ -192,7 +185,6 @@ public ShadowManager( this.pubSubIntegrator = new PubSubIntegrator(pubSubClientWrapper, deleteThingShadowRequestHandler, updateThingShadowRequestHandler, getThingShadowRequestHandler); this.direction = direction; - this.executorService = executorService; } private void registerHandlers() { diff --git a/src/main/java/com/aws/greengrass/shadowmanager/sync/CloudDataClient.java b/src/main/java/com/aws/greengrass/shadowmanager/sync/CloudDataClient.java index 9301276d..447978df 100644 --- a/src/main/java/com/aws/greengrass/shadowmanager/sync/CloudDataClient.java +++ b/src/main/java/com/aws/greengrass/shadowmanager/sync/CloudDataClient.java @@ -60,6 +60,7 @@ public class CloudDataClient { .retryableExceptions(Collections.singletonList(SubscriptionRetryException.class)) .build(); private Future syncLoopFuture; + private final Object subscriptionLock = new Object(); /** * Ctr for CloudDataClient. @@ -87,37 +88,10 @@ public void stopSubscribing() { /** * Unsubscribe to all shadow topics. */ - public synchronized void unsubscribeForAllShadowsTopics() { - unsubscribeForAllShadowsTopics(subscribedUpdateShadowTopics, this::handleUpdate); - unsubscribeForAllShadowsTopics(subscribedDeleteShadowTopics, this::handleDelete); - } - - /** - * Unsubscribe from all the shadow topics. - * - * @param topics topics to unsubscribe - * @param callback Callback function applied to shadow topic - */ - private synchronized void unsubscribeForAllShadowsTopics(Set topics, Consumer callback) { - Set topicsToUnsubscribe = new HashSet<>(topics); - for (String topic : topicsToUnsubscribe) { - try { - mqttClient.unsubscribe(UnsubscribeRequest.builder().callback(callback).topic(topic).build()); - logger.atDebug().log("Unsubscribed from {}", topic); - topics.remove(topic); - } catch (TimeoutException | ExecutionException e) { - logger.atWarn() - .setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code()) - .kv(LOG_TOPIC, topic) - .setCause(e) - .log("Failed to unsubscribe from shadow topic"); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.atError() - .setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code()) - .log("Failed from unsubscribe to all shadow topics"); - } - } + public void unsubscribeForAllShadowsTopics() { + // There are no new topics to add to update/delete sets, so no new topics are subscribed. + // There are no new topics to remove from update/delete sets, so all the existing topics are unsubscribed. + updateSubscriptions(new HashSet<>()); } /** @@ -152,67 +126,71 @@ private void updateSubscriptions(Set updateTopics, Set deleteTop .log("Attempting to update subscriptions when offline"); return; } + // It is possible for a thread to hold the lock indefinitely as updating subscriptions is retried forever. + // The lock is released only when updating the subscriptions is successful or when the thread is interrupted + // (happens when we cancel the syncLoopFuture) + synchronized (subscriptionLock) { + // get update topics to remove and subscribe + Set updateTopicsToRemove = new HashSet<>(subscribedUpdateShadowTopics); + updateTopicsToRemove.removeAll(updateTopics); - // get update topics to remove and subscribe - Set updateTopicsToRemove = new HashSet<>(subscribedUpdateShadowTopics); - updateTopicsToRemove.removeAll(updateTopics); - - Set updateTopicsToSubscribe = new HashSet<>(updateTopics); - updateTopicsToSubscribe.removeAll(subscribedUpdateShadowTopics); + Set updateTopicsToSubscribe = new HashSet<>(updateTopics); + updateTopicsToSubscribe.removeAll(subscribedUpdateShadowTopics); - Set deleteTopicsToRemove = new HashSet<>(subscribedDeleteShadowTopics); - deleteTopicsToRemove.removeAll(deleteTopics); + Set deleteTopicsToRemove = new HashSet<>(subscribedDeleteShadowTopics); + deleteTopicsToRemove.removeAll(deleteTopics); - Set deleteTopicsToSubscribe = new HashSet<>(deleteTopics); - deleteTopicsToSubscribe.removeAll(subscribedDeleteShadowTopics); + Set deleteTopicsToSubscribe = new HashSet<>(deleteTopics); + deleteTopicsToSubscribe.removeAll(subscribedDeleteShadowTopics); - boolean success; - try { - success = RetryUtils.runWithRetry(RETRY_CONFIG, () -> { - unsubscribeToShadows(subscribedUpdateShadowTopics, updateTopicsToRemove, this::handleUpdate); - subscribeToShadows(subscribedUpdateShadowTopics, updateTopicsToSubscribe, this::handleUpdate); + boolean success; + try { + success = RetryUtils.runWithRetry(RETRY_CONFIG, () -> { + unsubscribeToShadows(subscribedUpdateShadowTopics, updateTopicsToRemove, this::handleUpdate); + subscribeToShadows(subscribedUpdateShadowTopics, updateTopicsToSubscribe, this::handleUpdate); - unsubscribeToShadows(subscribedDeleteShadowTopics, deleteTopicsToRemove, this::handleDelete); - subscribeToShadows(subscribedDeleteShadowTopics, deleteTopicsToSubscribe, this::handleDelete); + unsubscribeToShadows(subscribedDeleteShadowTopics, deleteTopicsToRemove, this::handleDelete); + subscribeToShadows(subscribedDeleteShadowTopics, deleteTopicsToSubscribe, this::handleDelete); - if (!updateTopicsToRemove.isEmpty() || !updateTopicsToSubscribe.isEmpty() - || !deleteTopicsToRemove.isEmpty() || !deleteTopicsToSubscribe.isEmpty() - && !Thread.currentThread().isInterrupted()) { + if (!updateTopicsToRemove.isEmpty() || !updateTopicsToSubscribe.isEmpty() + || !deleteTopicsToRemove.isEmpty() || !deleteTopicsToSubscribe.isEmpty() + && !Thread.currentThread().isInterrupted()) { - throw new SubscriptionRetryException("Missed shadow topics to (un)subscribe to"); - } + throw new SubscriptionRetryException("Missed shadow topics to (un)subscribe to"); + } - // if interrupted then handle - if (Thread.currentThread().isInterrupted()) { - Thread.currentThread().interrupt(); - logger.atError() - .setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code()) - .log("Failed to update subscriptions"); - return false; - } + // if interrupted then handle + if (Thread.currentThread().isInterrupted()) { + Thread.currentThread().interrupt(); + logger.atError() + .setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code()) + .log("Failed to update subscriptions"); + return false; + } - return true; - }, LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code(), logger); + return true; + }, LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code(), logger); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.atError() - .setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code()) - .setCause(e) - .log("Failed to update subscriptions"); - return; - } catch (Exception e) { // NOPMD - thrown by RetryUtils.runWithRetry() - logger.atError() - .setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code()) - .setCause(e) - .log("Failed to update subscriptions"); - return; - } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.atError() + .setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code()) + .setCause(e) + .log("Failed to update subscriptions"); + return; + } catch (Exception e) { // NOPMD - thrown by RetryUtils.runWithRetry() + logger.atError() + .setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code()) + .setCause(e) + .log("Failed to update subscriptions"); + return; + } - if (success) { - logger.atDebug() - .setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code()) - .log("Finished updating subscriptions"); + if (success) { + logger.atDebug() + .setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code()) + .log("Finished updating subscriptions"); + } } } @@ -224,8 +202,9 @@ private void updateSubscriptions(Set updateTopics, Set deleteTop * @param callback Callback function applied to shadow topic * @throws InterruptedException Interrupt occurred while trying to unsubscribe to shadows */ - private synchronized void unsubscribeToShadows(Set currentTopics, Set topicsToUnsubscribe, - Consumer callback) throws InterruptedException { + @SuppressWarnings("PMD.PreserveStackTrace") + private void unsubscribeToShadows(Set currentTopics, Set topicsToUnsubscribe, + Consumer callback) throws InterruptedException { Set tempHashSet = new HashSet<>(topicsToUnsubscribe); for (String topic : tempHashSet) { try { @@ -251,10 +230,9 @@ private synchronized void unsubscribeToShadows(Set currentTopics, Set currentTopics, Set topicsToSubscribe, - Consumer callback) throws InterruptedException { + private void subscribeToShadows(Set currentTopics, Set topicsToSubscribe, + Consumer callback) throws InterruptedException { Set tempHashSet = new HashSet<>(topicsToSubscribe); - for (String topic : tempHashSet) { try { mqttClient.subscribe(SubscribeRequest.builder().topic(topic).callback(callback).build()); @@ -262,11 +240,8 @@ private synchronized void subscribeToShadows(Set currentTopics, Set