diff --git a/src/integrationtests/java/com/aws/greengrass/integrationtests/SyncTest.java b/src/integrationtests/java/com/aws/greengrass/integrationtests/SyncTest.java index 49eefeae..f36dcf21 100644 --- a/src/integrationtests/java/com/aws/greengrass/integrationtests/SyncTest.java +++ b/src/integrationtests/java/com/aws/greengrass/integrationtests/SyncTest.java @@ -68,6 +68,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; @@ -104,7 +105,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -@SuppressWarnings("PMD.ExcessiveClassLength") +@SuppressWarnings({"PMD.ExcessiveClassLength", "PMD.CouplingBetweenObjects"}) @ExtendWith({MockitoExtension.class, GGExtension.class}) class SyncTest extends NucleusLaunchUtils { public static final String MOCK_THING_NAME_1 = "Thing1"; @@ -1489,6 +1490,69 @@ void GIVEN_local_shadow_state_empty_WHEN_shadow_manager_syncs_THEN_cloud_shadow_ assertCloudUpdateEquals(finalLocalState); } + @ParameterizedTest + @ValueSource(classes = {RealTimeSyncStrategy.class, PeriodicSyncStrategy.class}) + void GIVEN_synced_shadow_WHEN_cloud_cleared_THEN_local_cleared(Class clazz, ExtensionContext context) throws IOException, InterruptedException, IoTDataPlaneClientCreationException { + ignoreExceptionOfType(context, InterruptedException.class); + ignoreExceptionOfType(context, ResourceNotFoundException.class); + + mockCloudUpdateResponsesWithIncreasingVersions(1); + setCloudThingShadow("{\"version\":1,\"state\":{\"desired\":{\"SomeKey\":\"foo\"}}}"); + + startNucleusWithConfig(NucleusLaunchUtilsConfig.builder() + .configFile(getSyncConfigFile(clazz)) + .syncClazz(clazz) + .mockCloud(true) + .build()); + waitForInitialSync(clazz, 1L, 1L); + + // empty state update does not affect shadow state + sendCloudUpdate(MOCK_THING_NAME_1, CLASSIC_SHADOW, "{\"version\":2,\"state\":{}}"); + assertSyncInfo(2L, 1L); + assertLocalShadowEquals("{\"state\":{\"desired\":{\"SomeKey\":\"foo\"}}}"); + + // null state update clears the shadow + sendCloudUpdate(MOCK_THING_NAME_1, CLASSIC_SHADOW, "{\"version\":3,\"state\":null}"); + assertSyncInfo(3L, 2L); + assertLocalShadowEquals("{\"state\":{}}"); + } + + private void mockCloudUpdateResponsesWithIncreasingVersions(int initialVersion) throws IoTDataPlaneClientCreationException { + AtomicInteger version = new AtomicInteger(initialVersion); + when(iotDataPlaneClientFactory.getIotDataPlaneClient() + .updateThingShadow(cloudUpdateThingShadowRequestCaptor.capture())) + .thenAnswer(invocation -> { + UpdateThingShadowResponse response = mock(UpdateThingShadowResponse.class); + String responseDocument = String.format("{\"version\": %d}", version.incrementAndGet()); + when(response.payload()).thenReturn(SdkBytes.fromString(responseDocument, UTF_8)); + return response; + }); + } + + private void sendCloudUpdate(String thingName, String shadowName, String document) { + SyncHandler syncHandler = kernel.getContext().get(SyncHandler.class); + syncHandler.pushLocalUpdateSyncRequest(thingName, shadowName, document.getBytes(UTF_8)); + } + + private void waitForInitialSync(Class strategy, + long expectedCloudVersion, long expectedLocalVersion) throws InterruptedException { + verify(syncQueue, after(7000).atMost(4)).put(any(FullShadowSyncRequest.class)); + assertEmptySyncQueue(strategy); + assertSyncInfo(expectedCloudVersion, expectedLocalVersion); + } + + private void assertSyncInfo(long expectedCloudVersion, long expectedLocalVersion) { + assertThat("sync info exists", () -> syncInfo.get().isPresent(), eventuallyEval(is(true))); + assertThat("cloud version", () -> syncInfo.get().get().getCloudVersion(), eventuallyEval(is(expectedCloudVersion))); + assertThat("local version", syncInfo.get().get().getLocalVersion(), is(expectedLocalVersion)); + } + + private void setCloudThingShadow(String document) throws IoTDataPlaneClientCreationException { + GetThingShadowResponse initialCloudStateShadowResponse = mock(GetThingShadowResponse.class, Answers.RETURNS_DEEP_STUBS); + lenient().when(initialCloudStateShadowResponse.payload().asByteArray()).thenReturn(document.getBytes(UTF_8)); + when(iotDataPlaneClientFactory.getIotDataPlaneClient().getThingShadow(any(GetThingShadowRequest.class))).thenReturn(initialCloudStateShadowResponse); + } + private void assertUpdateThingShadowHandlerResponseStateEquals(String expectedDocument, UpdateThingShadowHandlerResponse resp) throws IOException { JsonNode expectedStateJson = JsonUtil.getPayloadJson(expectedDocument.getBytes(UTF_8)).get(); if (expectedStateJson.get("state") != null) { diff --git a/src/main/java/com/aws/greengrass/shadowmanager/model/ShadowDocument.java b/src/main/java/com/aws/greengrass/shadowmanager/model/ShadowDocument.java index 6e382a89..0eec7ae2 100644 --- a/src/main/java/com/aws/greengrass/shadowmanager/model/ShadowDocument.java +++ b/src/main/java/com/aws/greengrass/shadowmanager/model/ShadowDocument.java @@ -11,6 +11,7 @@ import com.aws.greengrass.util.SerializerFactory; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.node.LongNode; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.Getter; @@ -28,6 +29,7 @@ * Class for managing operations on the Shadow Document. */ @Getter +@JsonDeserialize(using = ShadowDocumentDeserializer.class) public class ShadowDocument { @JsonProperty(value = SHADOW_DOCUMENT_STATE, required = true) private ShadowState state; diff --git a/src/main/java/com/aws/greengrass/shadowmanager/model/ShadowDocumentDeserializer.java b/src/main/java/com/aws/greengrass/shadowmanager/model/ShadowDocumentDeserializer.java new file mode 100644 index 00000000..e581728e --- /dev/null +++ b/src/main/java/com/aws/greengrass/shadowmanager/model/ShadowDocumentDeserializer.java @@ -0,0 +1,40 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.shadowmanager.model; + +import com.aws.greengrass.util.SerializerFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; + +import java.io.IOException; + +public class ShadowDocumentDeserializer extends JsonDeserializer { + // hack to prevent StackOverflowException for custom deserialize, + // allows us to use jackson's default deserialization for type within + // a custom deserializer + static { + SerializerFactory.getFailSafeJsonObjectMapper().addMixIn(ShadowDocument.class, DefaultJsonDeserializer.class); + } + + @JsonDeserialize + private interface DefaultJsonDeserializer { + // Reset default json deserializer + } + + @Override + public ShadowDocument deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + + ShadowDocument document = ctxt.readValue(p, ShadowDocument.class); + if (document.getState() == null) { // handle {"state": null} document + ShadowDocument clearDocument = new ShadowDocument(document); + clearDocument.getState().setClear(true); + return clearDocument; + } + return document; + } +} diff --git a/src/main/java/com/aws/greengrass/shadowmanager/model/ShadowState.java b/src/main/java/com/aws/greengrass/shadowmanager/model/ShadowState.java index 36c9f75f..8e42cbf4 100644 --- a/src/main/java/com/aws/greengrass/shadowmanager/model/ShadowState.java +++ b/src/main/java/com/aws/greengrass/shadowmanager/model/ShadowState.java @@ -7,10 +7,12 @@ import com.aws.greengrass.shadowmanager.util.JsonMerger; import com.aws.greengrass.shadowmanager.util.JsonUtil; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.Getter; +import lombok.Setter; import java.util.Iterator; @@ -33,6 +35,13 @@ public class ShadowState { @JsonProperty(SHADOW_DOCUMENT_STATE_REPORTED) private JsonNode reported; + /** + * If true, this {@link ShadowState} represents the {"state": null} document, which resets shadow state. + */ + @Setter + @JsonIgnore + private boolean clear; + public ShadowState() { this(null, null); } @@ -42,6 +51,11 @@ public ShadowState(final JsonNode desired, final JsonNode reported) { this.reported = nullIfEmpty(reported); } + private ShadowState(final JsonNode desired, final JsonNode reported, boolean clear) { + this(desired, reported); + this.clear = clear; + } + /** * Creates a new instance of the shadow state by deep copying the desired and reported nodes. * @@ -50,7 +64,8 @@ public ShadowState(final JsonNode desired, final JsonNode reported) { public ShadowState deepCopy() { return new ShadowState( isNullOrMissing(this.desired) ? this.desired : this.desired.deepCopy(), - isNullOrMissing(this.reported) ? this.reported : this.reported.deepCopy()); + isNullOrMissing(this.reported) ? this.reported : this.reported.deepCopy(), + this.clear); } /** @@ -103,6 +118,9 @@ public void update(JsonNode updatedStateNode) { * @return a JSON node containing the shadow state. */ public JsonNode toJson() { + if (clear) { + return JsonUtil.OBJECT_MAPPER.nullNode(); + } final ObjectNode result = JsonUtil.OBJECT_MAPPER.createObjectNode(); if (this.desired != null) { result.set(SHADOW_DOCUMENT_STATE_DESIRED, this.desired); diff --git a/src/main/java/com/aws/greengrass/shadowmanager/sync/model/BaseSyncRequest.java b/src/main/java/com/aws/greengrass/shadowmanager/sync/model/BaseSyncRequest.java index e698393d..7bad65e3 100644 --- a/src/main/java/com/aws/greengrass/shadowmanager/sync/model/BaseSyncRequest.java +++ b/src/main/java/com/aws/greengrass/shadowmanager/sync/model/BaseSyncRequest.java @@ -98,6 +98,11 @@ abstract boolean isUpdateNecessary(SyncContext context) throws RetryableExceptio boolean isUpdateNecessary(JsonNode baseDocument, JsonNode update) { JsonNode merged = baseDocument.deepCopy(); JsonMerger.merge(merged.get(SHADOW_DOCUMENT_STATE), update.get(SHADOW_DOCUMENT_STATE)); + // explicitly handle case where state is cleared with null, + // since resulting merge will have equal documents + if (!JsonUtil.isNullStateDocument(baseDocument) && JsonUtil.isNullStateDocument(update)) { + return true; + } return !baseDocument.equals(merged); }