Skip to content

Commit

Permalink
fix: synchronize updating subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
saranyailla committed May 30, 2024
1 parent b8670b9 commit e5f94b4
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 126 deletions.
38 changes: 15 additions & 23 deletions src/main/java/com/aws/greengrass/shadowmanager/ShadowManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.inject.Inject;
Expand Down Expand Up @@ -104,10 +101,7 @@ public class ShadowManager extends PluginService {
private final CloudDataClient cloudDataClient;
private final MqttClient mqttClient;
private final PubSubIntegrator pubSubIntegrator;
private final ExecutorService executorService;
private final AtomicReference<Strategy> currentStrategy = new AtomicReference<>(DEFAULT_STRATEGY);
// This is used from within the mqtt thread only
private Future<?> mqttCallbackFuture = new CompletableFuture<>();
public final MqttClientConnectionEvents callbacks = new MqttClientConnectionEvents() {
@Override
public void onConnectionInterrupted(int errorCode) {
Expand All @@ -116,20 +110,14 @@ public void onConnectionInterrupted(int errorCode) {

@Override
public void onConnectionResumed(boolean sessionPresent) {
// Make sure that it is non-blocking as it is run on mqtt event loop thread.
if (inState(State.RUNNING)) {
handleAsync(() -> startSyncingShadows(
StartSyncInfo.builder().startSyncStrategy(true).updateCloudSubscriptions(true).build()));
}
}
startSyncingShadows(StartSyncInfo.builder().startSyncStrategy(true)
.updateCloudSubscriptions(true).build());

private void handleAsync(Runnable runnable) {
if (!mqttCallbackFuture.isDone() && !mqttCallbackFuture.isCancelled()) {
mqttCallbackFuture.cancel(true);
}
mqttCallbackFuture = executorService.submit(runnable);
}
};

private final CallbackEventManager.OnConnectCallback onConnect = callbacks::onConnectionResumed;

@Getter(AccessLevel.PUBLIC)
Expand Down Expand Up @@ -160,18 +148,23 @@ private void handleAsync(Runnable runnable) {
* @param cloudDataClient the data client subscribing to cloud shadow topics
* @param mqttClient the mqtt client connected to IoT Core
* @param direction The sync direction
* @param executorService Executor service
*/
@SuppressWarnings("PMD.ExcessiveParameterList")
@Inject
public ShadowManager(
Topics topics,
ShadowManagerDatabase database, ShadowManagerDAOImpl dao,
AuthorizationHandlerWrapper authorizationHandlerWrapper, PubSubClientWrapper pubSubClientWrapper,
InboundRateLimiter inboundRateLimiter, DeviceConfiguration deviceConfiguration,
ShadowWriteSynchronizeHelper synchronizeHelper, IotDataPlaneClientWrapper iotDataPlaneClientWrapper,
SyncHandler syncHandler, CloudDataClient cloudDataClient, MqttClient mqttClient, DirectionWrapper direction,
ExecutorService executorService) {
ShadowManagerDatabase database,
ShadowManagerDAOImpl dao,
AuthorizationHandlerWrapper authorizationHandlerWrapper,
PubSubClientWrapper pubSubClientWrapper,
InboundRateLimiter inboundRateLimiter,
DeviceConfiguration deviceConfiguration,
ShadowWriteSynchronizeHelper synchronizeHelper,
IotDataPlaneClientWrapper iotDataPlaneClientWrapper,
SyncHandler syncHandler,
CloudDataClient cloudDataClient,
MqttClient mqttClient,
DirectionWrapper direction) {
super(topics);
this.database = database;
this.authorizationHandlerWrapper = authorizationHandlerWrapper;
Expand All @@ -192,7 +185,6 @@ public ShadowManager(
this.pubSubIntegrator = new PubSubIntegrator(pubSubClientWrapper, deleteThingShadowRequestHandler,
updateThingShadowRequestHandler, getThingShadowRequestHandler);
this.direction = direction;
this.executorService = executorService;
}

private void registerHandlers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand All @@ -48,9 +49,9 @@ public class CloudDataClient {
private final MqttClient mqttClient;
private final ExecutorService executorService;
@Getter(AccessLevel.PACKAGE)
private final Set<String> subscribedUpdateShadowTopics = new HashSet<>();
private final Set<String> subscribedUpdateShadowTopics = ConcurrentHashMap.newKeySet();
@Getter(AccessLevel.PACKAGE)
private final Set<String> subscribedDeleteShadowTopics = new HashSet<>();
private final Set<String> subscribedDeleteShadowTopics = ConcurrentHashMap.newKeySet();
private final Pattern shadowPattern = Pattern.compile("\\$aws\\/things\\/(.*)\\/shadow(\\/name\\/(.*))?"
+ "\\/(update|delete)\\/(accepted|rejected|delta|documents)");
private static final RetryUtils.RetryConfig RETRY_CONFIG = RetryUtils.RetryConfig.builder()
Expand All @@ -60,6 +61,7 @@ public class CloudDataClient {
.retryableExceptions(Collections.singletonList(SubscriptionRetryException.class))
.build();
private Future<?> syncLoopFuture;
private final Object subscriptionLock = new Object();

/**
* Ctr for CloudDataClient.
Expand Down Expand Up @@ -87,37 +89,10 @@ public void stopSubscribing() {
/**
* Unsubscribe to all shadow topics.
*/
public synchronized void unsubscribeForAllShadowsTopics() {
unsubscribeForAllShadowsTopics(subscribedUpdateShadowTopics, this::handleUpdate);
unsubscribeForAllShadowsTopics(subscribedDeleteShadowTopics, this::handleDelete);
}

/**
* Unsubscribe from all the shadow topics.
*
* @param topics topics to unsubscribe
* @param callback Callback function applied to shadow topic
*/
private synchronized void unsubscribeForAllShadowsTopics(Set<String> topics, Consumer<MqttMessage> callback) {
Set<String> topicsToUnsubscribe = new HashSet<>(topics);
for (String topic : topicsToUnsubscribe) {
try {
mqttClient.unsubscribe(UnsubscribeRequest.builder().callback(callback).topic(topic).build());
logger.atDebug().log("Unsubscribed from {}", topic);
topics.remove(topic);
} catch (TimeoutException | ExecutionException e) {
logger.atWarn()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.kv(LOG_TOPIC, topic)
.setCause(e)
.log("Failed to unsubscribe from shadow topic");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.atError()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.log("Failed from unsubscribe to all shadow topics");
}
}
public void unsubscribeForAllShadowsTopics() {
// There are no new topics to add to update/delete sets, so no new topics are subscribed.
// There are no new topics to remove from update/delete sets, so all the existing topics are unsubscribed.
updateSubscriptions(new HashSet<>());
}

/**
Expand Down Expand Up @@ -152,67 +127,64 @@ private void updateSubscriptions(Set<String> updateTopics, Set<String> deleteTop
.log("Attempting to update subscriptions when offline");
return;
}
// It is possible for a thread to hold the lock indefinitely as updating subscriptions is retried forever.
// The lock is released only when updating the subscriptions is successful or when the thread is interrupted
// (happens when we cancel the syncLoopFuture)
synchronized (subscriptionLock) {
// get update topics to remove and subscribe
Set<String> updateTopicsToRemove = new HashSet<>(subscribedUpdateShadowTopics);
updateTopicsToRemove.removeAll(updateTopics);

// get update topics to remove and subscribe
Set<String> updateTopicsToRemove = new HashSet<>(subscribedUpdateShadowTopics);
updateTopicsToRemove.removeAll(updateTopics);

Set<String> updateTopicsToSubscribe = new HashSet<>(updateTopics);
updateTopicsToSubscribe.removeAll(subscribedUpdateShadowTopics);

Set<String> deleteTopicsToRemove = new HashSet<>(subscribedDeleteShadowTopics);
deleteTopicsToRemove.removeAll(deleteTopics);

Set<String> deleteTopicsToSubscribe = new HashSet<>(deleteTopics);
deleteTopicsToSubscribe.removeAll(subscribedDeleteShadowTopics);

boolean success;
try {
success = RetryUtils.runWithRetry(RETRY_CONFIG, () -> {
unsubscribeToShadows(subscribedUpdateShadowTopics, updateTopicsToRemove, this::handleUpdate);
subscribeToShadows(subscribedUpdateShadowTopics, updateTopicsToSubscribe, this::handleUpdate);

unsubscribeToShadows(subscribedDeleteShadowTopics, deleteTopicsToRemove, this::handleDelete);
subscribeToShadows(subscribedDeleteShadowTopics, deleteTopicsToSubscribe, this::handleDelete);
Set<String> updateTopicsToSubscribe = new HashSet<>(updateTopics);
updateTopicsToSubscribe.removeAll(subscribedUpdateShadowTopics);

if (!updateTopicsToRemove.isEmpty() || !updateTopicsToSubscribe.isEmpty()
|| !deleteTopicsToRemove.isEmpty() || !deleteTopicsToSubscribe.isEmpty()
&& !Thread.currentThread().isInterrupted()) {
Set<String> deleteTopicsToRemove = new HashSet<>(subscribedDeleteShadowTopics);
deleteTopicsToRemove.removeAll(deleteTopics);

throw new SubscriptionRetryException("Missed shadow topics to (un)subscribe to");
}
Set<String> deleteTopicsToSubscribe = new HashSet<>(deleteTopics);
deleteTopicsToSubscribe.removeAll(subscribedDeleteShadowTopics);

// if interrupted then handle
if (Thread.currentThread().isInterrupted()) {
Thread.currentThread().interrupt();
logger.atError()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.log("Failed to update subscriptions");
return false;
}

return true;
}, LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code(), logger);
boolean success;
try {
success = RetryUtils.runWithRetry(RETRY_CONFIG, () -> {
if (Thread.currentThread().isInterrupted()) {
Thread.currentThread().interrupt();
logger.atWarn()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.log("Could not update the shadow subscriptions as the thread is interrupted");
return false;
}
unsubscribeToShadows(subscribedUpdateShadowTopics, updateTopicsToRemove, this::handleUpdate);
subscribeToShadows(subscribedUpdateShadowTopics, updateTopicsToSubscribe, this::handleUpdate);

unsubscribeToShadows(subscribedDeleteShadowTopics, deleteTopicsToRemove, this::handleDelete);
subscribeToShadows(subscribedDeleteShadowTopics, deleteTopicsToSubscribe, this::handleDelete);

if (!updateTopicsToRemove.isEmpty() || !updateTopicsToSubscribe.isEmpty()
|| !deleteTopicsToRemove.isEmpty() || !deleteTopicsToSubscribe.isEmpty()) {
throw new SubscriptionRetryException("Missed shadow topics to (un)subscribe to");
}
return true;
}, LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code(), logger);

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.atError()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.setCause(e)
.log("Failed to update subscriptions");
return;
} catch (Exception e) { // NOPMD - thrown by RetryUtils.runWithRetry()
logger.atError()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.setCause(e)
.log("Failed to update subscriptions");
return;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.atError().setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code()).setCause(e)
.log("Got interrupted while updating the shadow subscriptions");
return;
} catch (Exception e) { // NOPMD - thrown by RetryUtils.runWithRetry()
logger.atError()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.setCause(e)
.log("Failed to update subscriptions");
return;
}

if (success) {
logger.atDebug()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.log("Finished updating subscriptions");
if (success) {
logger.atDebug()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.log("Finished updating subscriptions");
}
}
}

Expand All @@ -224,8 +196,9 @@ private void updateSubscriptions(Set<String> updateTopics, Set<String> deleteTop
* @param callback Callback function applied to shadow topic
* @throws InterruptedException Interrupt occurred while trying to unsubscribe to shadows
*/
private synchronized void unsubscribeToShadows(Set<String> currentTopics, Set<String> topicsToUnsubscribe,
Consumer<MqttMessage> callback) throws InterruptedException {
@SuppressWarnings("PMD.PreserveStackTrace")
private void unsubscribeToShadows(Set<String> currentTopics, Set<String> topicsToUnsubscribe,
Consumer<MqttMessage> callback) throws InterruptedException {
Set<String> tempHashSet = new HashSet<>(topicsToUnsubscribe);
for (String topic : tempHashSet) {
try {
Expand All @@ -251,22 +224,18 @@ private synchronized void unsubscribeToShadows(Set<String> currentTopics, Set<St
* @param callback Callback function applied to shadow topic
* @throws InterruptedException Interrupt occurred while trying to subscribe to shadows
*/
private synchronized void subscribeToShadows(Set<String> currentTopics, Set<String> topicsToSubscribe,
Consumer<MqttMessage> callback) throws InterruptedException {
private void subscribeToShadows(Set<String> currentTopics, Set<String> topicsToSubscribe,
Consumer<MqttMessage> callback) throws InterruptedException {
Set<String> tempHashSet = new HashSet<>(topicsToSubscribe);

for (String topic : tempHashSet) {
try {
mqttClient.subscribe(SubscribeRequest.builder().topic(topic).callback(callback).build());
topicsToSubscribe.remove(topic);
currentTopics.add(topic);
logger.atDebug().log("Subscribed to {}", topic);
} catch (TimeoutException | ExecutionException e) {
logger.atWarn()
.setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code())
.kv(LOG_TOPIC, topic)
.setCause(e)
.log("Failed to subscribe to shadow topic");
logger.atWarn().setEventType(LogEvents.CLOUD_DATA_CLIENT_SUBSCRIPTION_ERROR.code()).kv(LOG_TOPIC, topic)
.setCause(e).log("Failed to subscribe to shadow topic");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

Expand Down Expand Up @@ -141,8 +140,6 @@ class ShadowManagerUnitTest extends GGServiceTestUtil {
@Mock
private MqttClient mockMqttClient;
@Mock
private ExecutorService executorService;
@Mock
private GreengrassCoreIPCService mockGreengrassCoreIPCService;

@Captor
Expand All @@ -160,8 +157,7 @@ public void setup() {
initializeMockedConfig();
shadowManager = new ShadowManager(config, mockDatabase, mockDao, mockAuthorizationHandlerWrapper,
mockPubSubClientWrapper, mockInboundRateLimiter, mockDeviceConfiguration, mockSynchronizeHelper,
mockIotDataPlaneClientWrapper, mockSyncHandler, mockCloudDataClient, mockMqttClient, direction,
executorService);
mockIotDataPlaneClientWrapper, mockSyncHandler, mockCloudDataClient, mockMqttClient, direction);
lenient().when(config.lookupTopics(CONFIGURATION_CONFIG_KEY))
.thenReturn(Topics.of(context, CONFIGURATION_CONFIG_KEY, null));
// These are added to not break the existing unit tests. Will be removed later.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ void GIVEN_100_synced_shadows_WHEN_unsubscribeForAllShadowsTopics_THEN_unsubscri
}

cloudDataClient.unsubscribeForAllShadowsTopics();
TimeUnit.MILLISECONDS.sleep(5000);

verify(mockMqttClient, times(200)).unsubscribe(any());
assertThat(cloudDataClient.getSubscribedUpdateShadowTopics().size(), is(0));
Expand Down

0 comments on commit e5f94b4

Please sign in to comment.