diff --git a/src/integrationtests/java/com/aws/greengrass/integrationtests/deployment/SubGroupDeploymentIntegrationTest.java b/src/integrationtests/java/com/aws/greengrass/integrationtests/deployment/SubGroupDeploymentIntegrationTest.java index 6767569a3f..c5f3d4bde3 100644 --- a/src/integrationtests/java/com/aws/greengrass/integrationtests/deployment/SubGroupDeploymentIntegrationTest.java +++ b/src/integrationtests/java/com/aws/greengrass/integrationtests/deployment/SubGroupDeploymentIntegrationTest.java @@ -13,9 +13,13 @@ import com.aws.greengrass.config.Topics; import com.aws.greengrass.dependency.State; import com.aws.greengrass.deployment.DeploymentQueue; +import com.aws.greengrass.deployment.DeploymentService; import com.aws.greengrass.deployment.DeploymentStatusKeeper; import com.aws.greengrass.deployment.DeviceConfiguration; +import com.aws.greengrass.deployment.IotJobsClientWrapper; +import com.aws.greengrass.deployment.IotJobsHelper; import com.aws.greengrass.deployment.ThingGroupHelper; +import com.aws.greengrass.deployment.exceptions.DeploymentRejectedException; import com.aws.greengrass.deployment.model.Deployment; import com.aws.greengrass.helper.PreloadComponentStoreHelper; import com.aws.greengrass.integrationtests.BaseITCase; @@ -23,24 +27,36 @@ import com.aws.greengrass.lifecyclemanager.GreengrassService; import com.aws.greengrass.lifecyclemanager.Kernel; import com.aws.greengrass.mqttclient.MqttClient; +import com.aws.greengrass.mqttclient.PublishRequest; import com.aws.greengrass.status.FleetStatusService; +import com.aws.greengrass.status.model.DeploymentInformation; +import com.aws.greengrass.status.model.FleetStatusDetails; +import com.aws.greengrass.status.model.Trigger; import com.aws.greengrass.testcommons.testutilities.NoOpPathOwnershipHandler; import com.aws.greengrass.util.Coerce; import com.aws.greengrass.util.Utils; import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtensionContext; import org.mockito.Mock; +import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.crt.mqtt.QualityOfService; +import software.amazon.awssdk.iot.iotjobs.model.JobStatus; +import software.amazon.awssdk.iot.iotjobs.model.UpdateJobExecutionRequest; +import software.amazon.awssdk.iot.iotjobs.model.UpdateJobExecutionResponse; +import software.amazon.awssdk.iot.iotjobs.model.UpdateJobExecutionSubscriptionRequest; import java.io.File; import java.io.IOException; import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -49,9 +65,12 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -63,11 +82,21 @@ import static com.aws.greengrass.status.FleetStatusService.FLEET_STATUS_SERVICE_TOPICS; import static com.aws.greengrass.testcommons.testutilities.ExceptionLogProtector.ignoreExceptionOfType; import static com.aws.greengrass.util.Utils.copyFolderRecursively; +import static com.github.grantwest.eventually.EventuallyLambdaMatcher.eventuallyEval; import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -80,23 +109,30 @@ public class SubGroupDeploymentIntegrationTest extends BaseITCase { private static final String ROOT_GROUP_DEPLOYMENT_CONFIG = "FleetConfigWithSimpleAppv1.json"; private static final Map ROOT_GROUP_SERVICE_MAP = Utils.immutableMap("SimpleApp", "1.0.0"); private static final List REQUIRED_CAPABILITY = - Arrays.asList(DeploymentCapability.SUBGROUP_DEPLOYMENTS.toString()); + Arrays.asList(DeploymentCapability.SUB_DEPLOYMENTS.toString()); private Kernel kernel; + private DeviceConfiguration deviceConfiguration; private DeploymentQueue deploymentQueue; private Path localStoreContentPath; @Mock private ThingGroupHelper thingGroupHelper; @Mock private MqttClient mqttClient; - + private IotJobsClientWrapper iotJobsClientWrapper; private Map groupLatchMap; + private Map> groupDeploymentStatus; + private Map> fleetStatusUpdates; + private Map jobStatusUpdates; @BeforeEach void before(ExtensionContext context) throws Exception { ignoreExceptionOfType(context, PackageDownloadException.class); ignoreExceptionOfType(context, SdkClientException.class); + groupDeploymentStatus = new ConcurrentHashMap<>(); + fleetStatusUpdates = new ConcurrentHashMap<>(); + jobStatusUpdates = new ConcurrentHashMap<>(); groupLatchMap = new ConcurrentHashMap<>(); kernel = new Kernel(); @@ -117,6 +153,42 @@ void before(ExtensionContext context) throws Exception { // set up device config setDeviceConfig(kernel, DeviceConfiguration.DEPLOYMENT_POLLING_FREQUENCY_SECONDS, 1L); + // listen to mqtt deployment status updates + when(mqttClient.getMqttOperationTimeoutMillis()).thenReturn(10000); + when(mqttClient.publish(any(PublishRequest.class))).thenAnswer(i -> { + PublishRequest publishRequest = i.getArgument(0); + if (publishRequest.getTopic().endsWith("namespace-aws-gg-deployment/update")) { + try { + UpdateJobExecutionRequest updateJobExecReq = + OBJECT_MAPPER.readValue(publishRequest.getPayload(), UpdateJobExecutionRequest.class); + String resource = Arn.fromString(updateJobExecReq.jobId).resource().resource(); + String groupName = resource; + if (resource.startsWith(THING_GROUP_PREFIX)) { + groupName = groupName.substring(THING_GROUP_PREFIX.length()); + } + jobStatusUpdates.put(groupName, updateJobExecReq); + } catch (JsonMappingException ignored) { + } + } + + if (publishRequest.getTopic().endsWith("greengrassv2/health/json")) { + try { + FleetStatusDetails statusDetails = + OBJECT_MAPPER.readValue(publishRequest.getPayload(), FleetStatusDetails.class); + if (Trigger.THING_GROUP_DEPLOYMENT.equals(statusDetails.getTrigger())) { + // record only deployment status + String groupName = statusDetails.getDeploymentInformation().getDeploymentId(); + AtomicReference ref = + fleetStatusUpdates.getOrDefault(groupName, new AtomicReference<>()); + ref.set(statusDetails); + fleetStatusUpdates.put(groupName, ref); + } + } catch (JsonMappingException ignored) { + } + } + return CompletableFuture.completedFuture(0); + }); + // launch kernel kernel.launch(); assertTrue(deploymentServiceLatch.await(10, TimeUnit.SECONDS)); @@ -126,9 +198,32 @@ void before(ExtensionContext context) throws Exception { Paths.get(DeploymentTaskIntegrationTest.class.getResource("local_store_content").toURI()); deploymentQueue = kernel.getContext().get(DeploymentQueue.class); + // setup spy device configuration for FSS + deviceConfiguration = spy(kernel.getContext().get(DeviceConfiguration.class)); + doNothing().when(deviceConfiguration).validate(); + // setup fss such that it could send mqtt messages to the mock listener FleetStatusService fleetStatusService = (FleetStatusService) kernel.locate(FLEET_STATUS_SERVICE_TOPICS); - fleetStatusService.getIsConnected().set(false); + fleetStatusService.setDeviceConfiguration(deviceConfiguration); + fleetStatusService.postInject(); + + // setup jobs helper such that it could send mqtt messages to the mock listener + IotJobsHelper jobsHelper = kernel.getContext().get(IotJobsHelper.class); + jobsHelper.setDeviceConfiguration(deviceConfiguration); + jobsHelper.postInject(); + iotJobsClientWrapper = spy(jobsHelper.getIotJobsClientWrapper()); + + // mimic response back from cloud to update job accepted topic + doAnswer((i) -> { + Consumer handler = i.getArgument(2); + handler.accept(new UpdateJobExecutionResponse()); + return CompletableFuture.completedFuture(0); + }).when(iotJobsClientWrapper) + .SubscribeToUpdateJobExecutionAccepted(any(UpdateJobExecutionSubscriptionRequest.class), + any(QualityOfService.class), any(Consumer.class)); + + // replace with spied version + jobsHelper.setIotJobsClientWrapper(iotJobsClientWrapper); } @AfterEach @@ -196,6 +291,36 @@ void GIVEN_root_deployment_success_WHEN_nested_subgroups_deploy_with_different_c verifyServices(Utils.immutableMap("SimpleApp", "3.0.0")); } + @Test + void GIVEN_root_deployment_success_WHEN_nested_subgroups_deploy_with_different_component_versions_and_deploy_received_not_in_order_THEN_stale_deployment_rejected( + ExtensionContext context) throws Exception { + // expect a rejection exception + ignoreExceptionOfType(context, DeploymentRejectedException.class); + + // Given + // deploys SimpleApp 1.0.0 + setupRootParentGroupDeploymentFor("subGroup1", "subGroup2", "subGroup3"); + + // When + // sub-group deployment maintains SimpleApp 1.0.0 and adds GreenSignal 1.0.0 + createSubGroupDeploymentAndWait("FleetConfigWithSimpleAppv1AndGreenSignal.json", 1, "subGroup1", + ROOT_GROUP_NAME); + verifyServices(Utils.immutableMap("SimpleApp", "1.0.0", "GreenSignal", "1.0.0")); + // create a stale deployment which should override SimpleApp to 2.0.0 + Deployment oldDeployment = + createDeployment("FleetConfigWithSimpleAppv2.json", 1, "subGroup2", ROOT_GROUP_NAME, ROOT_GROUP_NAME, + false); + // sub-group deployment overrides SimpleApp to 3.0.0 + createSubGroupDeploymentAndWait("FleetConfigWithSimpleAppv3.json", 1, "subGroup3", ROOT_GROUP_NAME); + // queue stale deployment received in bad order, which should get rejected + queueDeploymentAndWait("subGroup2", oldDeployment); + + // Then + verifyServices(Utils.immutableMap("SimpleApp", "3.0.0")); + verifyServicesRemoved("GreenSignal"); + verifyRejectionStatus("subGroup2"); + } + @Test void GIVEN_root_deployment_success_with_multiple_sibling_subgroups_WHEN_new_root_deployment_revision_THEN_root_deployment_overrides_subgroups() throws Exception { @@ -251,6 +376,40 @@ private void verifyServicesRemoved(String... services) { } } + private void verifyRejectionStatus(String... groups) { + for (String groupName : groups) { + Map statusDetails = groupDeploymentStatus.get(groupName); + String deploymentStatus = (String) statusDetails.get(DEPLOYMENT_STATUS_KEY_NAME); + assertEquals("REJECTED", deploymentStatus); + + // verify fleet status update + assertThat(() -> fleetStatusUpdates.get(groupName), + eventuallyEval(notNullValue(AtomicReference.class), Duration.ofSeconds(20))); + FleetStatusDetails statusUpdate = fleetStatusUpdates.get(groupName).get(); + DeploymentInformation deploymentInfo = statusUpdate.getDeploymentInformation(); + assertEquals("REJECTED", deploymentInfo.getStatus()); + assertNotNull(deploymentInfo.getStatusDetails()); + assertEquals("REJECTED", deploymentInfo.getStatusDetails().getDetailedStatus()); + assertThat("Rejected error stack in status update doesn't match", + deploymentInfo.getStatusDetails().getErrorStack(), + contains("DEPLOYMENT_REJECTED", "REJECTED_STALE_DEPLOYMENT")); + + // verify job status update + assertThat(() -> jobStatusUpdates.get(groupName), + eventuallyEval(notNullValue(UpdateJobExecutionRequest.class), Duration.ofSeconds(20))); + UpdateJobExecutionRequest updateJobExecReq = jobStatusUpdates.get(groupName); + assertEquals(JobStatus.REJECTED, updateJobExecReq.status); + assertNotNull(updateJobExecReq.statusDetails); + assertEquals("REJECTED", updateJobExecReq.statusDetails + .get(DeploymentService.DEPLOYMENT_DETAILED_STATUS_KEY)); + String jobsErrorStack = updateJobExecReq.statusDetails.get(DeploymentService.DEPLOYMENT_ERROR_STACK_KEY); + assertThat("Rejected error stack in update job doesn't match", jobsErrorStack, + containsString("DEPLOYMENT_REJECTED")); + assertThat("Rejected error stack in update job doesn't match", jobsErrorStack, + containsString("REJECTED_STALE_DEPLOYMENT")); + } + } + private void setupRootParentGroupDeploymentFor(@Nonnull String... subGroupNames) throws Exception { for (String subGroup : subGroupNames) { groupLatchMap.put(subGroup, new CountDownLatch(1)); @@ -261,6 +420,7 @@ private void setupRootParentGroupDeploymentFor(@Nonnull String... subGroupNames) deploymentStatusKeeper.registerDeploymentStatusConsumer(Deployment.DeploymentType.IOT_JOBS, (status) -> { String groupName = (String) status.get(GG_DEPLOYMENT_ID_KEY_NAME); String deploymentStatus = (String) status.get(DEPLOYMENT_STATUS_KEY_NAME); + groupDeploymentStatus.put(groupName, status); if ("SUCCEEDED".equals(deploymentStatus) || "REJECTED".equals(deploymentStatus)) { CountDownLatch latch = groupLatchMap.get(groupName); if (latch != null) { diff --git a/src/main/java/com/aws/greengrass/deployment/DefaultDeploymentTask.java b/src/main/java/com/aws/greengrass/deployment/DefaultDeploymentTask.java index 71143e15e0..a3f91709ec 100644 --- a/src/main/java/com/aws/greengrass/deployment/DefaultDeploymentTask.java +++ b/src/main/java/com/aws/greengrass/deployment/DefaultDeploymentTask.java @@ -14,6 +14,8 @@ import com.aws.greengrass.componentmanager.models.ComponentRequirementIdentifier; import com.aws.greengrass.config.Node; import com.aws.greengrass.config.Topics; +import com.aws.greengrass.deployment.errorcode.DeploymentErrorCode; +import com.aws.greengrass.deployment.exceptions.DeploymentRejectedException; import com.aws.greengrass.deployment.exceptions.DeploymentTaskFailureException; import com.aws.greengrass.deployment.model.Deployment; import com.aws.greengrass.deployment.model.DeploymentDocument; @@ -41,6 +43,8 @@ import java.util.stream.Collectors; import static com.aws.greengrass.deployment.DeploymentConfigMerger.DEPLOYMENT_ID_LOG_KEY; +import static com.aws.greengrass.deployment.DeploymentService.GROUP_TO_LAST_DEPLOYMENT_CONFIG_ARN_KEY; +import static com.aws.greengrass.deployment.DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TIMESTAMP_KEY; import static com.aws.greengrass.deployment.DeploymentService.GROUP_TO_ROOT_COMPONENTS_VERSION_KEY; import static com.aws.greengrass.deployment.converter.DeploymentDocumentConverter.LOCAL_DEPLOYMENT_GROUP_NAME; @@ -111,6 +115,13 @@ public DeploymentResult call() throws InterruptedException { .kv("Deployment service config", deploymentServiceConfig.toPOJO().toString()) .log("Starting deployment task"); + /* + * Enforce deployments are received for a given deployment target (thing or thingGroup) in sequence such + * that old deployments for that target does not override a new deployment. + */ + if (checkIfDeploymentReceivedIsStale(deploymentDocument)) { + return prepareRejectionResult(deploymentDocument); + } Map> nonTargetGroupsToRootPackagesMap = getNonTargetGroupToRootPackagesMap(deploymentDocument); @@ -182,6 +193,80 @@ public DeploymentResult call() throws InterruptedException { } } + /* + * Enforce deployments are received for a given deployment target (thing or thingGroup) in sequence such + * that old deployments for that target does not override a new deployment. + * + * For thing deployments, we don't consider them here as they are always in sequence and always for only + * one target. + * + * For thingGroup deployments sent to different targets (thingGroup A & B), nucleus allows components from + * both groups to be deployment as long as they don't have a conflicting component versions. This + * behavior is not changed. + * + * For thingGroup deployments sent to the same target (thingGroup A) are always in sequence, however if + * receive a bad/stale deployment due to cloud error we don't want that stale deployment to override a + * new deployment already performed on device. + * + * For a subgroup deployments targeted for a parent fleet group (subgroup A1, A2 & A3 targeted for + * thingGroup A), as there could be multiple subgroup deployments each of these sent as different jobs to + * the device could be received in any order yielding an unpredictable behavior. To resolve this, nucleus + * enforces processing these subgroup deployment in-order of their creation irrespective of when these + * signals are received. For example: + * + * Order of deployment creation is: A1, A2, A3 + * So these, have to be processed in this order. + * + * Order of deployments received: A2, A1, A3 + * then A2 and A3 deployment will succeed, but A1 would be rejected as nucleus has already processed + * newer deployment A2. + * + * @return true if deployment is considered stale, false otherwise + */ + private boolean checkIfDeploymentReceivedIsStale(DeploymentDocument deploymentDocument) { + // Check if group deployment + boolean isGroupDeployment = Deployment.DeploymentType.IOT_JOBS.equals(deployment.getDeploymentType()) + && deploymentDocument.getGroupName() != null; + + // if not a group deployment, then not stale + if (!isGroupDeployment) { + return false; + } + + // Get timestamp for the root target group + Topics lastDeployment = deploymentServiceConfig + .lookupTopics(DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS, deploymentDocument.getGroupName()); + + long timestamp = Coerce.toLong(lastDeployment.find(GROUP_TO_LAST_DEPLOYMENT_TIMESTAMP_KEY)); + + // if don't have last deployment detail, then its a new deployment + if (timestamp == 0 || deploymentDocument.getTimestamp() == null) { + return false; + } + + // if the new deployment creation timestamp is smaller than last deployment creation timestamp then its stale + return deploymentDocument.getTimestamp() < timestamp; + } + + private DeploymentResult prepareRejectionResult(DeploymentDocument deploymentDocument) { + logger.atInfo().setEventType(DEPLOYMENT_TASK_EVENT_TYPE) + .log("Nucleus has a newer deployment for '{}' target. Rejecting the deployment", + deploymentDocument.getGroupName()); + + Topics lastDeployment = + deploymentServiceConfig.lookupTopics(DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS, + deploymentDocument.getGroupName()); + + String lastDeploymentConfigArn = + Coerce.toString(lastDeployment.find(GROUP_TO_LAST_DEPLOYMENT_CONFIG_ARN_KEY)); + return new DeploymentResult(DeploymentResult.DeploymentStatus.REJECTED, new DeploymentRejectedException( + String.format("Nucleus has a newer deployment for '%s' target deployed by '%s'. Rejecting the " + + "deployment from '%s'", deploymentDocument.getGroupName(), lastDeploymentConfigArn, + deploymentDocument.getConfigurationArn()), + DeploymentErrorCode.REJECTED_STALE_DEPLOYMENT)); + + } + @SuppressWarnings("PMD.AvoidCatchingGenericException") private Map> getNonTargetGroupToRootPackagesMap( DeploymentDocument deploymentDocument) diff --git a/src/main/java/com/aws/greengrass/deployment/DeploymentService.java b/src/main/java/com/aws/greengrass/deployment/DeploymentService.java index 68e85d5b95..b938d77f39 100644 --- a/src/main/java/com/aws/greengrass/deployment/DeploymentService.java +++ b/src/main/java/com/aws/greengrass/deployment/DeploymentService.java @@ -90,8 +90,11 @@ public class DeploymentService extends GreengrassService { public static final String DEPLOYMENT_SERVICE_TOPICS = "DeploymentService"; public static final String DEPLOYMENT_QUEUE_TOPIC = "DeploymentQueue"; public static final String GROUP_TO_ROOT_COMPONENTS_TOPICS = "GroupToRootComponents"; + public static final String GROUP_TO_LAST_DEPLOYMENT_TOPICS = "GroupToLastDeployment"; public static final String GROUP_MEMBERSHIP_TOPICS = "GroupMembership"; public static final String COMPONENTS_TO_GROUPS_TOPICS = "ComponentToGroups"; + public static final String GROUP_TO_LAST_DEPLOYMENT_TIMESTAMP_KEY = "timestamp"; + public static final String GROUP_TO_LAST_DEPLOYMENT_CONFIG_ARN_KEY = "configArn"; public static final String GROUP_TO_ROOT_COMPONENTS_VERSION_KEY = "version"; public static final String GROUP_TO_ROOT_COMPONENTS_GROUP_CONFIG_ARN = "groupConfigArn"; public static final String GROUP_TO_ROOT_COMPONENTS_GROUP_NAME = "groupConfigName"; @@ -343,6 +346,19 @@ private void finishCurrentDeployment() throws InterruptedException { } } deploymentDirectoryManager.persistLastSuccessfulDeployment(); + } else if (DeploymentStatus.REJECTED.equals(deploymentStatus)) { + if (result.getFailureCause() != null) { + updateStatusDetailsFromException(statusDetails, result.getFailureCause(), + currentDeploymentTaskMetadata.getDeploymentType()); + logger.atWarn().setCause(result.getFailureCause()).kv(DEPLOYMENT_ID_LOG_KEY_NAME, deploymentId) + .kv(GG_DEPLOYMENT_ID_LOG_KEY_NAME, ggDeploymentId) + .kv(DEPLOYMENT_DETAILED_STATUS_KEY, result.getDeploymentStatus()) + .kv(DEPLOYMENT_ERROR_STACK_KEY, statusDetails.get(DEPLOYMENT_ERROR_STACK_KEY)) + .kv(DEPLOYMENT_ERROR_TYPES_KEY, statusDetails.get(DEPLOYMENT_ERROR_TYPES_KEY)) + .log("Deployment task rejected with following errors"); + } + deploymentStatusKeeper.persistAndPublishDeploymentStatus(deploymentId, ggDeploymentId, + configurationArn, type, JobStatus.REJECTED.toString(), statusDetails, rootPackages); } else { if (result.getFailureCause() != null) { updateStatusDetailsFromException(statusDetails, result.getFailureCause(), @@ -402,19 +418,14 @@ private void finishCurrentDeployment() throws InterruptedException { } private void persistGroupToRootComponents(DeploymentDocument deploymentDocument) { - Map deploymentGroupToRootPackages = new HashMap<>(); Topics deploymentGroupTopics = config.lookupTopics(GROUP_TO_ROOT_COMPONENTS_TOPICS); - Topics groupMembershipTopics = config.lookupTopics(GROUP_MEMBERSHIP_TOPICS); - deploymentGroupTopics.forEach(node -> { - Topics groupTopics = (Topics) node; - if (groupMembershipTopics.find(groupTopics.getName()) == null - && !groupTopics.getName().startsWith(DEVICE_DEPLOYMENT_GROUP_NAME_PREFIX) - && !groupTopics.getName().equals(LOCAL_DEPLOYMENT_GROUP_NAME)) { - logger.info("Removing mapping for thing group " + groupTopics.getName()); - groupTopics.remove(); - } - }); - groupMembershipTopics.remove(); + Topics groupLastDeploymentTopics = config.lookupTopics(GROUP_TO_LAST_DEPLOYMENT_TOPICS); + + // clean up group + cleanupGroupData(deploymentGroupTopics, groupLastDeploymentTopics); + + // persist group to root components + Map deploymentGroupToRootPackages = new HashMap<>(); deploymentDocument.getDeploymentPackageConfigurationList().stream().forEach(pkgConfig -> { if (pkgConfig.isRootComponent()) { Map pkgDetails = new HashMap<>(); @@ -427,11 +438,51 @@ private void persistGroupToRootComponents(DeploymentDocument deploymentDocument) deploymentGroupToRootPackages.put(pkgConfig.getPackageName(), pkgDetails); } }); + + // persist last deployment details + Map lastDeploymentDetails = new HashMap<>(); + lastDeploymentDetails.put(GROUP_TO_LAST_DEPLOYMENT_TIMESTAMP_KEY, deploymentDocument.getTimestamp()); + lastDeploymentDetails.put(GROUP_TO_LAST_DEPLOYMENT_CONFIG_ARN_KEY, deploymentDocument.getConfigurationArn()); + groupLastDeploymentTopics.lookupTopics(deploymentDocument.getGroupName()) + .replaceAndWait(lastDeploymentDetails); + + // persist group to root packages mapping deploymentGroupTopics.lookupTopics(deploymentDocument.getGroupName()) .replaceAndWait(deploymentGroupToRootPackages); setComponentsToGroupsMapping(deploymentGroupTopics); } + /** + * Group memberships for a device can change. If the device is no longer part of a group, then perform cleanup. + */ + private void cleanupGroupData(Topics deploymentGroupTopics, Topics groupLastDeploymentTopics) { + Topics groupMembershipTopics = config.lookupTopics(GROUP_MEMBERSHIP_TOPICS); + deploymentGroupTopics.forEach(node -> { + if (node instanceof Topics) { + Topics groupTopics = (Topics) node; + if (groupMembershipTopics.find(groupTopics.getName()) == null && !groupTopics.getName() + .startsWith(DEVICE_DEPLOYMENT_GROUP_NAME_PREFIX) && !groupTopics.getName() + .equals(LOCAL_DEPLOYMENT_GROUP_NAME)) { + logger.debug("Removing mapping for thing group " + groupTopics.getName()); + groupTopics.remove(); + } + } + }); + + groupLastDeploymentTopics.forEach(node -> { + if (node instanceof Topics) { + Topics groupTopics = (Topics) node; + if (groupMembershipTopics.find(groupTopics.getName()) == null && !groupTopics.getName() + .startsWith(DEVICE_DEPLOYMENT_GROUP_NAME_PREFIX) && !groupTopics.getName() + .equals(LOCAL_DEPLOYMENT_GROUP_NAME)) { + logger.debug("Removing last deployment information for thing group " + groupTopics.getName()); + groupTopics.remove(); + } + } + }); + groupMembershipTopics.remove(); + } + /* * When a cancellation is received, there are following possibilities - * - No task has yet been created for current deployment so result future is null, we do nothing for this diff --git a/src/main/java/com/aws/greengrass/deployment/IotJobsHelper.java b/src/main/java/com/aws/greengrass/deployment/IotJobsHelper.java index 2e2e0f191f..1dcc8dcdd1 100644 --- a/src/main/java/com/aws/greengrass/deployment/IotJobsHelper.java +++ b/src/main/java/com/aws/greengrass/deployment/IotJobsHelper.java @@ -104,6 +104,8 @@ public class IotJobsHelper implements InjectionActions { private static final long WAIT_TIME_MS_TO_SUBSCRIBE_AGAIN = Duration.ofMinutes(2).toMillis(); private static final Random RANDOM = new Random(); + // setter is only used for testing + @Setter @Inject private DeviceConfiguration deviceConfiguration; @@ -137,6 +139,7 @@ public class IotJobsHelper implements InjectionActions { @Setter(AccessLevel.PACKAGE) // For tests private long waitTimeToSubscribeAgain = WAIT_TIME_MS_TO_SUBSCRIBE_AGAIN; + @Getter // For tests @Setter // For tests private IotJobsClientWrapper iotJobsClientWrapper; diff --git a/src/main/java/com/aws/greengrass/deployment/errorcode/DeploymentErrorCode.java b/src/main/java/com/aws/greengrass/deployment/errorcode/DeploymentErrorCode.java index 3da71c51ba..f631002156 100644 --- a/src/main/java/com/aws/greengrass/deployment/errorcode/DeploymentErrorCode.java +++ b/src/main/java/com/aws/greengrass/deployment/errorcode/DeploymentErrorCode.java @@ -10,12 +10,14 @@ public enum DeploymentErrorCode { // Generic types DEPLOYMENT_FAILURE(DeploymentErrorType.NONE), + DEPLOYMENT_REJECTED(DeploymentErrorType.NONE), DEPLOYMENT_INTERRUPTED(DeploymentErrorType.NONE), ARTIFACT_DOWNLOAD_ERROR(DeploymentErrorType.NONE), NO_AVAILABLE_COMPONENT_VERSION(DeploymentErrorType.NONE), COMPONENT_PACKAGE_LOADING_ERROR(DeploymentErrorType.NONE), // Deployment request error + REJECTED_STALE_DEPLOYMENT(DeploymentErrorType.NONE), NUCLEUS_MISSING_REQUIRED_CAPABILITIES(DeploymentErrorType.REQUEST_ERROR), COMPONENT_CIRCULAR_DEPENDENCY_ERROR(DeploymentErrorType.REQUEST_ERROR), UNAUTHORIZED_NUCLEUS_MINOR_VERSION_UPDATE(DeploymentErrorType.REQUEST_ERROR), diff --git a/src/main/java/com/aws/greengrass/deployment/errorcode/DeploymentErrorCodeUtils.java b/src/main/java/com/aws/greengrass/deployment/errorcode/DeploymentErrorCodeUtils.java index 3133b1b53d..49515ab44e 100644 --- a/src/main/java/com/aws/greengrass/deployment/errorcode/DeploymentErrorCodeUtils.java +++ b/src/main/java/com/aws/greengrass/deployment/errorcode/DeploymentErrorCodeUtils.java @@ -10,6 +10,7 @@ import com.aws.greengrass.componentmanager.models.ComponentIdentifier; import com.aws.greengrass.config.Topics; import com.aws.greengrass.deployment.exceptions.DeploymentException; +import com.aws.greengrass.deployment.exceptions.DeploymentRejectedException; import com.aws.greengrass.deployment.exceptions.DeviceConfigurationException; import com.aws.greengrass.deployment.model.Deployment; import com.aws.greengrass.lifecyclemanager.GreengrassService; @@ -77,6 +78,13 @@ public final class DeploymentErrorCodeUtils { private DeploymentErrorCodeUtils() { } + private static DeploymentErrorCode generateDefaultErrorCode(Throwable e) { + if (e instanceof DeploymentRejectedException) { + return DeploymentErrorCode.DEPLOYMENT_REJECTED; + } + return DeploymentErrorCode.DEPLOYMENT_FAILURE; + } + /** * Walk through exception chain and generate deployment error report. * @@ -86,7 +94,7 @@ private DeploymentErrorCodeUtils() { public static Pair, List> generateErrorReportFromExceptionStack(Throwable e) { // Use a linked hash set to remove duplicates while preserving order Set errorCodeSet = - new LinkedHashSet<>(Collections.singletonList(DeploymentErrorCode.DEPLOYMENT_FAILURE)); + new LinkedHashSet<>(Collections.singletonList(generateDefaultErrorCode(e))); Map errorContext = new HashMap<>(); List errorTypesFromException = new ArrayList<>(); diff --git a/src/main/java/com/aws/greengrass/deployment/exceptions/DeploymentRejectedException.java b/src/main/java/com/aws/greengrass/deployment/exceptions/DeploymentRejectedException.java new file mode 100644 index 0000000000..d3b33890da --- /dev/null +++ b/src/main/java/com/aws/greengrass/deployment/exceptions/DeploymentRejectedException.java @@ -0,0 +1,17 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.deployment.exceptions; + +import com.aws.greengrass.deployment.errorcode.DeploymentErrorCode; + +public class DeploymentRejectedException extends DeploymentException { + + private static final long serialVersionUID = -8212002201272098501L; + + public DeploymentRejectedException(String message, DeploymentErrorCode errorCode) { + super(message, errorCode); + } +} diff --git a/src/main/java/com/aws/greengrass/deployment/model/DeploymentResult.java b/src/main/java/com/aws/greengrass/deployment/model/DeploymentResult.java index b9d97ed0da..84e117588a 100644 --- a/src/main/java/com/aws/greengrass/deployment/model/DeploymentResult.java +++ b/src/main/java/com/aws/greengrass/deployment/model/DeploymentResult.java @@ -24,6 +24,7 @@ public enum DeploymentStatus { FAILED_NO_STATE_CHANGE, FAILED_ROLLBACK_NOT_REQUESTED, FAILED_ROLLBACK_COMPLETE, - FAILED_UNABLE_TO_ROLLBACK + FAILED_UNABLE_TO_ROLLBACK, + REJECTED } } diff --git a/src/main/java/com/aws/greengrass/lifecyclemanager/Kernel.java b/src/main/java/com/aws/greengrass/lifecyclemanager/Kernel.java index b864045c65..2427ebd51f 100644 --- a/src/main/java/com/aws/greengrass/lifecyclemanager/Kernel.java +++ b/src/main/java/com/aws/greengrass/lifecyclemanager/Kernel.java @@ -105,7 +105,7 @@ public class Kernel { private static final List SUPPORTED_CAPABILITIES = Arrays.asList(DeploymentCapability.LARGE_CONFIGURATION.toString(), DeploymentCapability.LINUX_RESOURCE_LIMITS.toString(), - DeploymentCapability.SUBGROUP_DEPLOYMENTS.toString()); + DeploymentCapability.SUB_DEPLOYMENTS.toString()); @Getter private final Context context; diff --git a/src/main/java/com/aws/greengrass/status/FleetStatusService.java b/src/main/java/com/aws/greengrass/status/FleetStatusService.java index e1b1b08ecc..04af91d475 100644 --- a/src/main/java/com/aws/greengrass/status/FleetStatusService.java +++ b/src/main/java/com/aws/greengrass/status/FleetStatusService.java @@ -86,7 +86,10 @@ public class FleetStatusService extends GreengrassService { private static final int MAX_PAYLOAD_LENGTH_BYTES = 128_000; public static final String DEVICE_OFFLINE_MESSAGE = "Device not configured to talk to AWS IoT cloud. " + "FleetStatusService is offline"; - private final DeviceConfiguration deviceConfiguration; + + // setter is only used for testing + @Setter + private DeviceConfiguration deviceConfiguration; private final GlobalStateChangeListener handleServiceStateChange = this::handleServiceStateChange; private final Function, Boolean> deploymentStatusChanged = this::deploymentStatusChanged; diff --git a/src/test/java/com/aws/greengrass/deployment/DeploymentServiceTest.java b/src/test/java/com/aws/greengrass/deployment/DeploymentServiceTest.java index e8cfca42dc..45a8ce2339 100644 --- a/src/test/java/com/aws/greengrass/deployment/DeploymentServiceTest.java +++ b/src/test/java/com/aws/greengrass/deployment/DeploymentServiceTest.java @@ -64,6 +64,7 @@ import static com.aws.greengrass.deployment.DeploymentService.DEPLOYMENT_SERVICE_TOPICS; import static com.aws.greengrass.deployment.DeploymentService.GROUP_MEMBERSHIP_TOPICS; import static com.aws.greengrass.deployment.DeploymentService.GROUP_TO_ROOT_COMPONENTS_TOPICS; +import static com.aws.greengrass.deployment.DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS; import static com.aws.greengrass.deployment.converter.DeploymentDocumentConverter.LOCAL_DEPLOYMENT_GROUP_NAME; import static com.aws.greengrass.lifecyclemanager.GreengrassService.SERVICE_NAME_KEY; import static com.aws.greengrass.testcommons.testutilities.ExceptionLogProtector.ignoreExceptionUltimateCauseOfType; @@ -385,6 +386,7 @@ void GIVEN_deployment_job_WHEN_deployment_process_succeeds_THEN_correctly_map_co deploymentQueue.offer(new Deployment(deploymentDocument, type, TEST_JOB_ID_1)); Topics allGroupTopics = Topics.of(context, GROUP_TO_ROOT_COMPONENTS_TOPICS, null); Topics groupMembershipTopics = Topics.of(context, GROUP_MEMBERSHIP_TOPICS, null); + Topics groupToLastDeploymentTopics = Topics.of(context, GROUP_TO_LAST_DEPLOYMENT_TOPICS, null); groupMembershipTopics.lookup(expectedGroupName); Topics deploymentGroupTopics = Topics.of(context, expectedGroupName, allGroupTopics); Topic pkgTopic1 = Topic.of(context, DeploymentService.GROUP_TO_ROOT_COMPONENTS_VERSION_KEY, "1.0.0"); @@ -404,6 +406,7 @@ void GIVEN_deployment_job_WHEN_deployment_process_succeeds_THEN_correctly_map_co deploymentGroupTopics.children.put(new CaseInsensitiveString(EXPECTED_ROOT_PACKAGE_NAME), pkgTopics); allGroupTopics.children.put(new CaseInsensitiveString(expectedGroupName), deploymentGroupTopics); + when(config.lookupTopics(GROUP_TO_LAST_DEPLOYMENT_TOPICS)).thenReturn(groupToLastDeploymentTopics); when(config.lookupTopics(GROUP_MEMBERSHIP_TOPICS)).thenReturn(groupMembershipTopics); when(config.lookupTopics(GROUP_TO_ROOT_COMPONENTS_TOPICS)).thenReturn(allGroupTopics); lenient().when(config.lookupTopics(GROUP_TO_ROOT_COMPONENTS_TOPICS, expectedGroupName)) @@ -504,8 +507,10 @@ void GIVEN_deployment_job_with_auto_rollback_not_requested_WHEN_deployment_proce Topics allGroupTopics = mock(Topics.class); Topics groupMembershipTopics = mock(Topics.class); Topics deploymentGroupTopics = mock(Topics.class); + Topics groupToLastDeploymentTopics = Topics.of(context, GROUP_TO_LAST_DEPLOYMENT_TOPICS, null); when(allGroupTopics.lookupTopics(EXPECTED_GROUP_NAME)).thenReturn(deploymentGroupTopics); + when(config.lookupTopics(GROUP_TO_LAST_DEPLOYMENT_TOPICS)).thenReturn(groupToLastDeploymentTopics); when(config.lookupTopics(GROUP_MEMBERSHIP_TOPICS)).thenReturn(groupMembershipTopics); when(config.lookupTopics(GROUP_TO_ROOT_COMPONENTS_TOPICS)).thenReturn(allGroupTopics); when(config.lookupTopics(COMPONENTS_TO_GROUPS_TOPICS)).thenReturn(mockComponentsToGroupPackages); diff --git a/src/test/java/com/aws/greengrass/deployment/DeploymentTaskTest.java b/src/test/java/com/aws/greengrass/deployment/DeploymentTaskTest.java index 51e3271584..2f023b149e 100644 --- a/src/test/java/com/aws/greengrass/deployment/DeploymentTaskTest.java +++ b/src/test/java/com/aws/greengrass/deployment/DeploymentTaskTest.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -92,6 +93,7 @@ class DeploymentTaskTest { private Topics mockGroupToRootConfig; private Topics mockGroupMembership; + private Topics mockLastDeploymentTopics; private DefaultDeploymentTask deploymentTask; @Mock @@ -113,11 +115,20 @@ void setup() { mockGroupToRootConfig.lookupTopics("group1", COMPONENT_2_ROOT_PACKAGE_NAME) .replaceAndWait(ImmutableMap.of(DeploymentService.GROUP_TO_ROOT_COMPONENTS_VERSION_KEY, "1.0.0")); + Map lastDeploymentData = + ImmutableMap.of(DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TIMESTAMP_KEY, 0, + DeploymentService.GROUP_TO_LAST_DEPLOYMENT_CONFIG_ARN_KEY, "group1"); + mockLastDeploymentTopics = Topics.of(context, DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS, null); + mockLastDeploymentTopics.lookupTopics(DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS, "group1") + .replaceAndWait(lastDeploymentData); + mockGroupMembership = Topics.of(context, DeploymentService.GROUP_MEMBERSHIP_TOPICS, null); lenient().when(mockDeploymentServiceConfig.lookupTopics(eq(DeploymentService.GROUP_MEMBERSHIP_TOPICS))) .thenReturn(mockGroupMembership); lenient().when(mockDeploymentServiceConfig.lookupTopics(eq(DeploymentService.GROUP_TO_ROOT_COMPONENTS_TOPICS))) .thenReturn(mockGroupToRootConfig); + lenient().when(mockDeploymentServiceConfig.lookupTopics(eq(DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS))) + .thenReturn(mockLastDeploymentTopics); deploymentTask = new DefaultDeploymentTask(mockDependencyResolver, mockComponentManager, mockKernelConfigResolver, mockDeploymentConfigMerger, logger, @@ -127,6 +138,8 @@ void setup() { @Test void GIVEN_deploymentDocument_WHEN_start_deploymentTask_THEN_succeeds() throws Exception { + lenient().when(mockDeploymentServiceConfig.lookupTopics(eq(DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS), + eq(deploymentDocument.getGroupName()))).thenReturn(mockLastDeploymentTopics.lookupTopics("group1")); when(mockComponentManager.preparePackages(anyList())).thenReturn(CompletableFuture.completedFuture(null)); when(mockExecutorService.submit(any(Callable.class))) @@ -142,6 +155,9 @@ void GIVEN_deploymentDocument_WHEN_start_deploymentTask_THEN_succeeds() throws E @Test void GIVEN_deploymentDocument_WHEN_thingGroupHelper_return_forbidden_THEN_succeeds(ExtensionContext context) throws Exception { ignoreExceptionUltimateCauseOfType(context, GreengrassV2DataException.class); + lenient().when(mockDeploymentServiceConfig.lookupTopics(eq(DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS), + eq(deploymentDocument.getGroupName()))).thenReturn(mockLastDeploymentTopics.lookupTopics("group1")); + when(mockComponentManager.preparePackages(anyList())).thenReturn(CompletableFuture.completedFuture(null)); when(mockExecutorService.submit(any(Callable.class))) .thenReturn(CompletableFuture.completedFuture(Collections.emptyList())); @@ -161,6 +177,9 @@ void GIVEN_deploymentDocument_WHEN_thingGroupHelper_return_forbidden_THEN_succee void GIVEN_deploymentDocument_WHEN_thingGroupHelper_throws_error_THEN_deployment_result_has_chain_of_error_messages(ExtensionContext context) throws Exception { ignoreExceptionUltimateCauseOfType(context, GreengrassV2DataException.class); + lenient().when(mockDeploymentServiceConfig.lookupTopics(eq(DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS), + eq(deploymentDocument.getGroupName()))).thenReturn(mockLastDeploymentTopics.lookupTopics("group1")); + when(mockThingGroupHelper.listThingGroupsForDevice(anyInt())) .thenThrow(GreengrassV2DataException.builder().message("Original error message").build()); @@ -174,6 +193,9 @@ void GIVEN_deploymentDocument_WHEN_thingGroupHelper_throws_error_THEN_deployment void GIVEN_deploymentDocument_WHEN_resolveDependencies_errored_THEN_deploymentTask_aborted(ExtensionContext context) throws Exception { ignoreExceptionUltimateCauseOfType(context, PackagingException.class); + lenient().when(mockDeploymentServiceConfig.lookupTopics(eq(DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS), + eq(deploymentDocument.getGroupName()))).thenReturn(mockLastDeploymentTopics.lookupTopics("group1")); + when(mockExecutorService.submit(any(Callable.class))).thenReturn(mockResolveDependencyFuture); when(mockResolveDependencyFuture.get()) .thenThrow(new ExecutionException(new PackagingException("unknown package"))); @@ -190,6 +212,9 @@ void GIVEN_deploymentDocument_WHEN_resolveDependencies_errored_THEN_deploymentTa void GIVEN_deploymentDocument_WHEN_resolve_kernel_config_throws_PackageLoadingException_THEN_deploymentTask_aborted( ExtensionContext context) throws Exception { ignoreExceptionUltimateCauseOfType(context, PackageLoadingException.class); + lenient().when(mockDeploymentServiceConfig.lookupTopics(eq(DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS), + eq(deploymentDocument.getGroupName()))).thenReturn(mockLastDeploymentTopics.lookupTopics("group1")); + when(mockExecutorService.submit(any(Callable.class))) .thenReturn(CompletableFuture.completedFuture(Collections.emptyList())); when(mockComponentManager.preparePackages(anyList())).thenReturn(CompletableFuture.completedFuture(null)); @@ -208,6 +233,9 @@ void GIVEN_deploymentDocument_WHEN_resolve_kernel_config_throws_PackageLoadingEx void GIVEN_deployment_task_interrupted_WHEN_preparePackages_not_started_THEN_do_nothing(ExtensionContext context) throws Exception { ignoreExceptionUltimateCauseOfType(context, InterruptedException.class); + lenient().when(mockDeploymentServiceConfig.lookupTopics(eq(DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS), + eq(deploymentDocument.getGroupName()))).thenReturn(mockLastDeploymentTopics.lookupTopics("group1")); + when(mockExecutorService.submit(any(Callable.class))).thenReturn(mockResolveDependencyFuture); when(mockResolveDependencyFuture.get()).thenThrow(new ExecutionException(new InterruptedException())); @@ -219,6 +247,9 @@ void GIVEN_deployment_task_interrupted_WHEN_preparePackages_not_started_THEN_do_ @Test void GIVEN_deployment_task_interrupted_WHEN_preparePackages_in_progress_THEN_cancel_prepare_packages() throws Exception { + lenient().when(mockDeploymentServiceConfig.lookupTopics(eq(DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS), + eq(deploymentDocument.getGroupName()))).thenReturn(mockLastDeploymentTopics.lookupTopics("group1")); + when(mockExecutorService.submit(any(Callable.class))) .thenReturn(CompletableFuture.completedFuture(Collections.emptyList())); CountDownLatch preparePackagesInvoked = new CountDownLatch(1); @@ -244,6 +275,9 @@ void GIVEN_deployment_task_interrupted_WHEN_preparePackages_in_progress_THEN_can @Test void GIVEN_deployment_task_interrupted_WHEN_preparePackages_done_merge_not_started_THEN_do_nothing() throws Exception { + lenient().when(mockDeploymentServiceConfig.lookupTopics(eq(DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS), + eq(deploymentDocument.getGroupName()))).thenReturn(mockLastDeploymentTopics.lookupTopics("group1")); + when(mockExecutorService.submit(any(Callable.class))) .thenReturn(CompletableFuture.completedFuture(Collections.emptyList())); CountDownLatch resolveConfigInvoked = new CountDownLatch(1); @@ -268,6 +302,9 @@ void GIVEN_deployment_task_interrupted_WHEN_preparePackages_done_merge_not_start @Test void GIVEN_deployment_task_interrupted_WHEN_merge_in_progress_THEN_cancel_merge() throws Exception { + lenient().when(mockDeploymentServiceConfig.lookupTopics(eq(DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS), + eq(deploymentDocument.getGroupName()))).thenReturn(mockLastDeploymentTopics.lookupTopics("group1")); + when(mockExecutorService.submit(any(Callable.class))) .thenReturn(CompletableFuture.completedFuture(Collections.emptyList())); CountDownLatch mergeConfigInvoked = new CountDownLatch(1);