Skip to content

Commit

Permalink
fix: reject before putting deployment in in_progress (#1354)
Browse files Browse the repository at this point in the history
An IoT job execution can't transition to REJECTED from IN_PROGRESS state. Valid
transition would be QUEUED to REJECTED. This change either moves the QUEUED job
to IN_PROGRESS or REJECTED state.
  • Loading branch information
devnaruka authored Nov 2, 2022
1 parent 2f98760 commit da667cc
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import com.aws.greengrass.componentmanager.models.ComponentRequirementIdentifier;
import com.aws.greengrass.config.Node;
import com.aws.greengrass.config.Topics;
import com.aws.greengrass.deployment.errorcode.DeploymentErrorCode;
import com.aws.greengrass.deployment.exceptions.DeploymentRejectedException;
import com.aws.greengrass.deployment.exceptions.DeploymentTaskFailureException;
import com.aws.greengrass.deployment.model.Deployment;
import com.aws.greengrass.deployment.model.DeploymentDocument;
Expand Down Expand Up @@ -43,8 +41,6 @@
import java.util.stream.Collectors;

import static com.aws.greengrass.deployment.DeploymentConfigMerger.DEPLOYMENT_ID_LOG_KEY;
import static com.aws.greengrass.deployment.DeploymentService.GROUP_TO_LAST_DEPLOYMENT_CONFIG_ARN_KEY;
import static com.aws.greengrass.deployment.DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TIMESTAMP_KEY;
import static com.aws.greengrass.deployment.DeploymentService.GROUP_TO_ROOT_COMPONENTS_VERSION_KEY;
import static com.aws.greengrass.deployment.converter.DeploymentDocumentConverter.LOCAL_DEPLOYMENT_GROUP_NAME;

Expand Down Expand Up @@ -115,14 +111,6 @@ public DeploymentResult call() throws InterruptedException {
.kv("Deployment service config", deploymentServiceConfig.toPOJO().toString())
.log("Starting deployment task");

/*
* Enforce deployments are received for a given deployment target (thing or thingGroup) in sequence such
* that old deployments for that target does not override a new deployment.
*/
if (checkIfDeploymentReceivedIsStale(deploymentDocument)) {
return prepareRejectionResult(deploymentDocument);
}

Map<String, Set<ComponentRequirementIdentifier>> nonTargetGroupsToRootPackagesMap =
getNonTargetGroupToRootPackagesMap(deploymentDocument);

Expand Down Expand Up @@ -193,80 +181,6 @@ public DeploymentResult call() throws InterruptedException {
}
}

/*
* Enforce deployments are received for a given deployment target (thing or thingGroup) in sequence such
* that old deployments for that target does not override a new deployment.
*
* For thing deployments, we don't consider them here as they are always in sequence and always for only
* one target.
*
* For thingGroup deployments sent to different targets (thingGroup A & B), nucleus allows components from
* both groups to be deployment as long as they don't have a conflicting component versions. This
* behavior is not changed.
*
* For thingGroup deployments sent to the same target (thingGroup A) are always in sequence, however if
* receive a bad/stale deployment due to cloud error we don't want that stale deployment to override a
* new deployment already performed on device.
*
* For a subgroup deployments targeted for a parent fleet group (subgroup A1, A2 & A3 targeted for
* thingGroup A), as there could be multiple subgroup deployments each of these sent as different jobs to
* the device could be received in any order yielding an unpredictable behavior. To resolve this, nucleus
* enforces processing these subgroup deployment in-order of their creation irrespective of when these
* signals are received. For example:
*
* Order of deployment creation is: A1, A2, A3
* So these, have to be processed in this order.
*
* Order of deployments received: A2, A1, A3
* then A2 and A3 deployment will succeed, but A1 would be rejected as nucleus has already processed
* newer deployment A2.
*
* @return true if deployment is considered stale, false otherwise
*/
private boolean checkIfDeploymentReceivedIsStale(DeploymentDocument deploymentDocument) {
// Check if group deployment
boolean isGroupDeployment = Deployment.DeploymentType.IOT_JOBS.equals(deployment.getDeploymentType())
&& deploymentDocument.getGroupName() != null;

// if not a group deployment, then not stale
if (!isGroupDeployment) {
return false;
}

// Get timestamp for the root target group
Topics lastDeployment = deploymentServiceConfig
.lookupTopics(DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS, deploymentDocument.getGroupName());

long timestamp = Coerce.toLong(lastDeployment.find(GROUP_TO_LAST_DEPLOYMENT_TIMESTAMP_KEY));

// if don't have last deployment detail, then its a new deployment
if (timestamp == 0 || deploymentDocument.getTimestamp() == null) {
return false;
}

// if the new deployment creation timestamp is smaller than last deployment creation timestamp then its stale
return deploymentDocument.getTimestamp() < timestamp;
}

private DeploymentResult prepareRejectionResult(DeploymentDocument deploymentDocument) {
logger.atInfo().setEventType(DEPLOYMENT_TASK_EVENT_TYPE)
.log("Nucleus has a newer deployment for '{}' target. Rejecting the deployment",
deploymentDocument.getGroupName());

Topics lastDeployment =
deploymentServiceConfig.lookupTopics(DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS,
deploymentDocument.getGroupName());

String lastDeploymentConfigArn =
Coerce.toString(lastDeployment.find(GROUP_TO_LAST_DEPLOYMENT_CONFIG_ARN_KEY));
return new DeploymentResult(DeploymentResult.DeploymentStatus.REJECTED, new DeploymentRejectedException(
String.format("Nucleus has a newer deployment for '%s' target deployed by '%s'. Rejecting the "
+ "deployment from '%s'", deploymentDocument.getGroupName(), lastDeploymentConfigArn,
deploymentDocument.getConfigurationArn()),
DeploymentErrorCode.REJECTED_STALE_DEPLOYMENT));

}

@SuppressWarnings("PMD.AvoidCatchingGenericException")
private Map<String, Set<ComponentRequirementIdentifier>> getNonTargetGroupToRootPackagesMap(
DeploymentDocument deploymentDocument)
Expand Down
99 changes: 94 additions & 5 deletions src/main/java/com/aws/greengrass/deployment/DeploymentService.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.aws.greengrass.deployment.errorcode.DeploymentErrorCode;
import com.aws.greengrass.deployment.errorcode.DeploymentErrorCodeUtils;
import com.aws.greengrass.deployment.exceptions.DeploymentException;
import com.aws.greengrass.deployment.exceptions.DeploymentRejectedException;
import com.aws.greengrass.deployment.exceptions.InvalidRequestException;
import com.aws.greengrass.deployment.exceptions.MissingRequiredCapabilitiesException;
import com.aws.greengrass.deployment.model.Deployment;
Expand Down Expand Up @@ -555,13 +556,34 @@ private void createNewDeployment(Deployment deployment) {
if (deploymentTask == null) {
return;
}
deploymentStatusKeeper.persistAndPublishDeploymentStatus(deployment.getId(),
deployment.getGreengrassDeploymentId(), deployment.getConfigurationArn(),
deployment.getDeploymentType(), JobStatus.IN_PROGRESS.toString(), new HashMap<>(),
deployment.getDeploymentDocumentObj().getRootPackages());

if (DEFAULT.equals(deployment.getDeploymentStage())) {
/*
* Enforce deployments are received for a given deployment target (thing or thingGroup) in sequence such
* that old deployments for that target does not override a new deployment.
*/
if (checkIfDeploymentReceivedIsStale(deployment.getDeploymentDocumentObj(), deployment.getDeploymentType())) {
logger.atInfo().log("Nucleus has a newer deployment for '{}' target. Rejecting the deployment",
deployment.getDeploymentDocumentObj().getGroupName());
Topics lastDeployment = config.lookupTopics(DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS,
deployment.getDeploymentDocumentObj().getGroupName());

String lastDeploymentConfigArn =
Coerce.toString(lastDeployment.find(GROUP_TO_LAST_DEPLOYMENT_CONFIG_ARN_KEY));

updateDeploymentResultAsRejected(deployment, deploymentTask, new DeploymentRejectedException(String.format(
"Nucleus has a newer deployment for '%s' target deployed by '%s'. Rejecting the "
+ "deployment from '%s'", deployment.getDeploymentDocumentObj().getGroupName(),
lastDeploymentConfigArn, deployment.getDeploymentDocumentObj().getConfigurationArn()),
DeploymentErrorCode.REJECTED_STALE_DEPLOYMENT));
return;
} else {
deploymentStatusKeeper.persistAndPublishDeploymentStatus(deployment.getId(),
deployment.getGreengrassDeploymentId(), deployment.getConfigurationArn(),
deployment.getDeploymentType(), JobStatus.IN_PROGRESS.toString(), new HashMap<>(),
deployment.getDeploymentDocumentObj().getRootPackages());
}

if (DEFAULT.equals(deployment.getDeploymentStage())) {
try {
context.get(KernelAlternatives.class).cleanupLaunchDirectoryLinks();
deploymentDirectoryManager.createNewDeploymentDirectory(deployment.getGreengrassDeploymentId());
Expand Down Expand Up @@ -615,6 +637,73 @@ private void createNewDeployment(Deployment deployment) {
cancellable);
}

/*
* Enforce deployments are received for a given deployment target (thing or thingGroup) in sequence such
* that old deployments for that target does not override a new deployment.
*
* For thing deployments, we don't consider them here as they are always in sequence and always for only
* one target.
*
* For thingGroup deployments sent to different targets (thingGroup A & B), nucleus allows components from
* both groups to be deployment as long as they don't have a conflicting component versions. This
* behavior is not changed.
*
* For thingGroup deployments sent to the same target (thingGroup A) are always in sequence, however if
* receive a bad/stale deployment due to cloud error we don't want that stale deployment to override a
* new deployment already performed on device.
*
* For a subgroup deployments targeted for a parent fleet group (subgroup A1, A2 & A3 targeted for
* thingGroup A), as there could be multiple subgroup deployments each of these sent as different jobs to
* the device could be received in any order yielding an unpredictable behavior. To resolve this, nucleus
* enforces processing these subgroup deployment in-order of their creation irrespective of when these
* signals are received. For example:
*
* Order of deployment creation is: A1, A2, A3
* So these, have to be processed in this order.
*
* Order of deployments received: A2, A1, A3
* then A2 and A3 deployment will succeed, but A1 would be rejected as nucleus has already processed
* newer deployment A2.
*
* @return true if deployment is considered stale, false otherwise
*/
private boolean checkIfDeploymentReceivedIsStale(DeploymentDocument deploymentDocument,
DeploymentType deploymentType) {
// Check if group deployment
boolean isGroupDeployment = Deployment.DeploymentType.IOT_JOBS.equals(deploymentType)
&& deploymentDocument.getGroupName() != null;

// if not a group deployment, then not stale
if (!isGroupDeployment) {
return false;
}

// Get timestamp for the root target group
Topics lastDeployment = config
.lookupTopics(DeploymentService.GROUP_TO_LAST_DEPLOYMENT_TOPICS, deploymentDocument.getGroupName());

long timestamp = Coerce.toLong(lastDeployment.find(GROUP_TO_LAST_DEPLOYMENT_TIMESTAMP_KEY));

// if don't have last deployment detail, then its a new deployment
if (timestamp == 0 || deploymentDocument.getTimestamp() == null) {
return false;
}

// if the new deployment creation timestamp is smaller than last deployment creation timestamp then its stale
return deploymentDocument.getTimestamp() < timestamp;
}

private void updateDeploymentResultAsRejected(Deployment deployment, DeploymentTask deploymentTask,
Throwable rejectionCause) {

DeploymentResult result = new DeploymentResult(DeploymentResult.DeploymentStatus.REJECTED, rejectionCause);

CompletableFuture<DeploymentResult> process = CompletableFuture.completedFuture(result);

currentDeploymentTaskMetadata =
new DeploymentTaskMetadata(deployment, deploymentTask, process, new AtomicInteger(1), false);
}

private void updateDeploymentResultAsFailed(Deployment deployment, DeploymentTask deploymentTask,
boolean completeExceptionally, Throwable e) {
DeploymentResult result = new DeploymentResult(DeploymentStatus.FAILED_NO_STATE_CHANGE, e);
Expand Down
Loading

0 comments on commit da667cc

Please sign in to comment.