Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: synchronize cloud data client operations properly #204

Merged
merged 4 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions gdk-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"component": {
"aws.greengrass.ShadowManager": {
"author": "Me",
"version": "NEXT_PATCH",
"build": {
"build_system": "maven"
},
"publish": {
"bucket": "ggv2componentartifacts",
"region": "us-east-1"
}
}
},
"gdk_version": "1.0.0"
}
43 changes: 27 additions & 16 deletions src/main/java/com/aws/greengrass/shadowmanager/ShadowManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.inject.Inject;
Expand Down Expand Up @@ -101,22 +104,34 @@ public class ShadowManager extends PluginService {
private final CloudDataClient cloudDataClient;
private final MqttClient mqttClient;
private final PubSubIntegrator pubSubIntegrator;
private final ExecutorService executorService;
private final AtomicReference<Strategy> currentStrategy = new AtomicReference<>(DEFAULT_STRATEGY);
// This is used from within the mqtt thread only
private Future<?> mqttCallbackFuture = new CompletableFuture<>();
public final MqttClientConnectionEvents callbacks = new MqttClientConnectionEvents() {
@Override
public void onConnectionInterrupted(int errorCode) {
stopSyncingShadows(true);
handleAsync(() -> stopSyncingShadows(true));
}

@Override
public void onConnectionResumed(boolean sessionPresent) {
if (inState(State.RUNNING)) {
startSyncingShadows(StartSyncInfo.builder().startSyncStrategy(true)
.updateCloudSubscriptions(true).build());
handleAsync(() -> {
if (inState(State.RUNNING)) {
saranyailla marked this conversation as resolved.
Show resolved Hide resolved
startSyncingShadows(
StartSyncInfo.builder().startSyncStrategy(true).updateCloudSubscriptions(true).build());
}
});
}

private void handleAsync(Runnable runnable) {
if (!mqttCallbackFuture.isDone() && !mqttCallbackFuture.isCancelled()) {
mqttCallbackFuture.cancel(true);
saranyailla marked this conversation as resolved.
Show resolved Hide resolved
}
mqttCallbackFuture = executorService.submit(runnable);
}
};

private final CallbackEventManager.OnConnectCallback onConnect = callbacks::onConnectionResumed;

@Getter(AccessLevel.PUBLIC)
Expand Down Expand Up @@ -147,23 +162,18 @@ public void onConnectionResumed(boolean sessionPresent) {
* @param cloudDataClient the data client subscribing to cloud shadow topics
* @param mqttClient the mqtt client connected to IoT Core
* @param direction The sync direction
* @param executorService Executor service
*/
@SuppressWarnings("PMD.ExcessiveParameterList")
@Inject
public ShadowManager(
Topics topics,
ShadowManagerDatabase database,
ShadowManagerDAOImpl dao,
AuthorizationHandlerWrapper authorizationHandlerWrapper,
PubSubClientWrapper pubSubClientWrapper,
InboundRateLimiter inboundRateLimiter,
DeviceConfiguration deviceConfiguration,
ShadowWriteSynchronizeHelper synchronizeHelper,
IotDataPlaneClientWrapper iotDataPlaneClientWrapper,
SyncHandler syncHandler,
CloudDataClient cloudDataClient,
MqttClient mqttClient,
DirectionWrapper direction) {
ShadowManagerDatabase database, ShadowManagerDAOImpl dao,
AuthorizationHandlerWrapper authorizationHandlerWrapper, PubSubClientWrapper pubSubClientWrapper,
InboundRateLimiter inboundRateLimiter, DeviceConfiguration deviceConfiguration,
ShadowWriteSynchronizeHelper synchronizeHelper, IotDataPlaneClientWrapper iotDataPlaneClientWrapper,
SyncHandler syncHandler, CloudDataClient cloudDataClient, MqttClient mqttClient, DirectionWrapper direction,
ExecutorService executorService) {
super(topics);
this.database = database;
this.authorizationHandlerWrapper = authorizationHandlerWrapper;
Expand All @@ -184,6 +194,7 @@ public ShadowManager(
this.pubSubIntegrator = new PubSubIntegrator(pubSubClientWrapper, deleteThingShadowRequestHandler,
updateThingShadowRequestHandler, getThingShadowRequestHandler);
this.direction = direction;
this.executorService = executorService;
}

private void registerHandlers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

Expand Down Expand Up @@ -140,6 +141,8 @@ class ShadowManagerUnitTest extends GGServiceTestUtil {
@Mock
private MqttClient mockMqttClient;
@Mock
private ExecutorService executorService;
@Mock
private GreengrassCoreIPCService mockGreengrassCoreIPCService;

@Captor
Expand All @@ -157,7 +160,8 @@ public void setup() {
initializeMockedConfig();
shadowManager = new ShadowManager(config, mockDatabase, mockDao, mockAuthorizationHandlerWrapper,
mockPubSubClientWrapper, mockInboundRateLimiter, mockDeviceConfiguration, mockSynchronizeHelper,
mockIotDataPlaneClientWrapper, mockSyncHandler, mockCloudDataClient, mockMqttClient, direction);
mockIotDataPlaneClientWrapper, mockSyncHandler, mockCloudDataClient, mockMqttClient, direction,
executorService);
lenient().when(config.lookupTopics(CONFIGURATION_CONFIG_KEY))
.thenReturn(Topics.of(context, CONFIGURATION_CONFIG_KEY, null));
// These are added to not break the existing unit tests. Will be removed later.
Expand Down
Loading