Skip to content

Commit

Permalink
fix: handle resource snapshot missing but work already synced and add…
Browse files Browse the repository at this point in the history
… cro/ro annotation to all the works (#936)


Co-authored-by: Ryan Zhang <[email protected]>
  • Loading branch information
ryanzhang-oss and Ryan Zhang authored Oct 30, 2024
1 parent 6b81bdb commit cb9a7a0
Show file tree
Hide file tree
Showing 11 changed files with 398 additions and 43 deletions.
2 changes: 1 addition & 1 deletion apis/cluster/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions apis/placement/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions apis/placement/v1beta1/commons.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ const (
// The format is {workPrefix}-configMap-uuid
WorkNameWithConfigEnvelopeFmt = "%s-configmap-%s"

// ParentClusterResourceOverrideSnapshotHashAnnotation is the annotation to work that contains the hash of the parent cluster resource override snapshot list.
ParentClusterResourceOverrideSnapshotHashAnnotation = fleetPrefix + "parent-cluster-resource-override-snapshot-hash"

// ParentResourceOverrideSnapshotHashAnnotation is the annotation to work that contains the hash of the parent resource override snapshot list.
ParentResourceOverrideSnapshotHashAnnotation = fleetPrefix + "parent-resource-override-snapshot-hash"

// ParentResourceSnapshotNameAnnotation is the annotation applied to work that contains the name of the master resource snapshot that generates the work.
ParentResourceSnapshotNameAnnotation = fleetPrefix + "parent-resource-snapshot-name"

// ParentResourceSnapshotIndexLabel is the label applied to work that contains the index of the resource snapshot that generates the work.
ParentResourceSnapshotIndexLabel = fleetPrefix + "parent-resource-snapshot-index"

Expand Down
2 changes: 1 addition & 1 deletion apis/placement/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/controllers/rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ func createUpdateInfo(binding *fleetv1beta1.ClusterResourceBinding, crp *fleetv1
desiredBinding.Spec.ResourceSnapshotName = latestResourceSnapshot.Name
// update the resource apply strategy when controller rolls out the new changes
desiredBinding.Spec.ApplyStrategy = crp.Spec.Strategy.ApplyStrategy
// TODO: check the size of the cro and ro to not exceed the limit
desiredBinding.Spec.ClusterResourceOverrideSnapshots = cro
desiredBinding.Spec.ResourceOverrideSnapshots = ro
return toBeUpdatedBinding{
Expand Down Expand Up @@ -520,6 +521,7 @@ func calculateMaxToAdd(crp *fleetv1beta1.ClusterResourcePlacement, targetNumber
upperBoundReadyNumber, "maxNumberOfBindingsToAdd", maxNumberToAdd)
return maxNumberToAdd
}

func (r *Reconciler) calculateRealTarget(crp *fleetv1beta1.ClusterResourcePlacement, schedulerTargetedBinds []*fleetv1beta1.ClusterResourceBinding) int {
crpKObj := klog.KObj(crp)
// calculate the target number of bindings
Expand Down
101 changes: 84 additions & 17 deletions pkg/controllers/workgenerator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,14 @@ import (
"go.goms.io/fleet/pkg/utils/controller"
"go.goms.io/fleet/pkg/utils/informer"
"go.goms.io/fleet/pkg/utils/labels"
"go.goms.io/fleet/pkg/utils/resource"
)

var (
// maxFailedResourcePlacementLimit indicates the max number of failed resource placements to include in the status.
maxFailedResourcePlacementLimit = 100

errResourceSnapshotNotFound = errors.New("the master resource snapshot is not found")
errResourceSnapshotNotFound = fmt.Errorf("the master resource snapshot is not found")
)

// Reconciler watches binding objects and generate work objects in the designated cluster namespace
Expand Down Expand Up @@ -135,18 +136,17 @@ func (r *Reconciler) Reconcile(ctx context.Context, req controllerruntime.Reques

workUpdated := false
overrideSucceeded := false
// Reset the conditions and failed placements.
for i := condition.OverriddenCondition; i < condition.TotalCondition; i++ {
resourceBinding.RemoveCondition(string(i.ResourceBindingConditionType()))
}
resourceBinding.Status.FailedPlacements = nil
// list all the corresponding works
works, syncErr := r.listAllWorksAssociated(ctx, &resourceBinding)
if syncErr == nil {
// generate and apply the workUpdated works if we have all the works
overrideSucceeded, workUpdated, syncErr = r.syncAllWork(ctx, &resourceBinding, works, cluster)
}

// Reset the conditions and failed placements.
for i := condition.OverriddenCondition; i < condition.TotalCondition; i++ {
resourceBinding.RemoveCondition(string(i.ResourceBindingConditionType()))
}
resourceBinding.Status.FailedPlacements = nil
if overrideSucceeded {
overrideReason := condition.OverriddenSucceededReason
overrideMessage := "Successfully applied the override rules on the resources"
Expand Down Expand Up @@ -375,10 +375,28 @@ func (r *Reconciler) listAllWorksAssociated(ctx context.Context, resourceBinding
func (r *Reconciler) syncAllWork(ctx context.Context, resourceBinding *fleetv1beta1.ClusterResourceBinding, existingWorks map[string]*fleetv1beta1.Work, cluster clusterv1beta1.MemberCluster) (bool, bool, error) {
updateAny := atomic.NewBool(false)
resourceBindingRef := klog.KObj(resourceBinding)
// the hash256 function can can handle empty list https://go.dev/play/p/_4HW17fooXM
resourceOverrideSnapshotHash, err := resource.HashOf(resourceBinding.Spec.ResourceOverrideSnapshots)
if err != nil {
return false, false, controller.NewUnexpectedBehaviorError(err)
}
clusterResourceOverrideSnapshotHash, err := resource.HashOf(resourceBinding.Spec.ClusterResourceOverrideSnapshots)
if err != nil {
return false, false, controller.NewUnexpectedBehaviorError(err)
}
// TODO: check all work synced first before fetching the snapshots after we put ParentResourceOverrideSnapshotHashAnnotation and ParentClusterResourceOverrideSnapshotHashAnnotation in all the work objects

// Gather all the resource resourceSnapshots
resourceSnapshots, err := r.fetchAllResourceSnapshots(ctx, resourceBinding)
if err != nil {
if errors.Is(err, errResourceSnapshotNotFound) {
// the resourceIndex is deleted but the works might still be up to date with the binding.
if areAllWorkSynced(existingWorks, resourceBinding, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash) {
klog.V(2).InfoS("All the works are synced with the resourceBinding even if the resource snapshot index is removed", "resourceBinding", resourceBindingRef)
return true, false, nil
}
return false, false, controller.NewUserError(err)
}
// TODO(RZ): handle errResourceNotFullyCreated error so we don't need to wait for all the snapshots to be created
return false, false, err
}
Expand Down Expand Up @@ -422,7 +440,7 @@ func (r *Reconciler) syncAllWork(ctx context.Context, resourceBinding *fleetv1be
if uResource.GetObjectKind().GroupVersionKind() == utils.ConfigMapGVK &&
len(uResource.GetAnnotations()[fleetv1beta1.EnvelopeConfigMapAnnotation]) != 0 {
// get a work object for the enveloped configMap
work, err := r.getConfigMapEnvelopWorkObj(ctx, workNamePrefix, resourceBinding, snapshot, &uResource)
work, err := r.getConfigMapEnvelopWorkObj(ctx, workNamePrefix, resourceBinding, snapshot, &uResource, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash)
if err != nil {
return true, false, err
}
Expand All @@ -438,7 +456,7 @@ func (r *Reconciler) syncAllWork(ctx context.Context, resourceBinding *fleetv1be
// generate a work object for the manifests even if there is nothing to place
// to allow CRP to collect the status of the placement
// TODO (RZ): revisit to see if we need this hack
work := generateSnapshotWorkObj(workNamePrefix, resourceBinding, snapshot, simpleManifests)
work := generateSnapshotWorkObj(workNamePrefix, resourceBinding, snapshot, simpleManifests, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash)
activeWork[work.Name] = work
newWork = append(newWork, work)

Expand Down Expand Up @@ -485,6 +503,32 @@ func (r *Reconciler) syncAllWork(ctx context.Context, resourceBinding *fleetv1be
return true, updateAny.Load(), nil
}

// areAllWorkSynced checks if all the works are synced with the resource binding.
func areAllWorkSynced(existingWorks map[string]*fleetv1beta1.Work, resourceBinding *fleetv1beta1.ClusterResourceBinding, _, _ string) bool {
syncedCondition := resourceBinding.GetCondition(string(fleetv1beta1.ResourceBindingWorkSynchronized))
if !condition.IsConditionStatusTrue(syncedCondition, resourceBinding.Generation) {
// The binding has to be synced first before we can check the works
return false
}
// TODO: check resourceOverrideSnapshotHash and clusterResourceOverrideSnapshotHash after all the work has the ParentResourceOverrideSnapshotHashAnnotation and ParentClusterResourceOverrideSnapshotHashAnnotation
resourceSnapshotName := resourceBinding.Spec.ResourceSnapshotName
for _, work := range existingWorks {
recordedName, exist := work.Annotations[fleetv1beta1.ParentResourceSnapshotNameAnnotation]
if !exist {
// TODO: remove this block after all the work has the ParentResourceSnapshotNameAnnotation
// the parent resource snapshot name is not recorded in the work, we need to construct it from the labels
crpName := resourceBinding.Labels[fleetv1beta1.CRPTrackingLabel]
index, _ := labels.ExtractResourceSnapshotIndexFromWork(work)
recordedName = fmt.Sprintf(fleetv1beta1.ResourceSnapshotNameFmt, crpName, index)
}
if recordedName != resourceSnapshotName {
klog.V(2).InfoS("The work is not synced with the resourceBinding", "work", klog.KObj(work), "resourceBinding", klog.KObj(resourceBinding), "annotationExist", exist, "recordedName", recordedName, "resourceSnapshotName", resourceSnapshotName)
return false
}
}
return true
}

// fetchAllResourceSnapshots gathers all the resource snapshots for the resource binding.
func (r *Reconciler) fetchAllResourceSnapshots(ctx context.Context, resourceBinding *fleetv1beta1.ClusterResourceBinding) (map[string]*fleetv1beta1.ClusterResourceSnapshot, error) {
// fetch the master snapshot first
Expand All @@ -504,7 +548,7 @@ func (r *Reconciler) fetchAllResourceSnapshots(ctx context.Context, resourceBind
// getConfigMapEnvelopWorkObj first try to locate a work object for the corresponding envelopObj of type configMap.
// we create a new one if the work object doesn't exist. We do this to avoid repeatedly delete and create the same work object.
func (r *Reconciler) getConfigMapEnvelopWorkObj(ctx context.Context, workNamePrefix string, resourceBinding *fleetv1beta1.ClusterResourceBinding,
resourceSnapshot *fleetv1beta1.ClusterResourceSnapshot, envelopeObj *unstructured.Unstructured) (*fleetv1beta1.Work, error) {
resourceSnapshot *fleetv1beta1.ClusterResourceSnapshot, envelopeObj *unstructured.Unstructured, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash string) (*fleetv1beta1.Work, error) {
// we group all the resources in one configMap to one work
manifest, err := extractResFromConfigMap(envelopeObj)
if err != nil {
Expand All @@ -514,6 +558,7 @@ func (r *Reconciler) getConfigMapEnvelopWorkObj(ctx context.Context, workNamePre
}
klog.V(2).InfoS("Successfully extract the enveloped resources from the configMap", "numOfResources", len(manifest),
"snapshot", klog.KObj(resourceSnapshot), "resourceBinding", klog.KObj(resourceBinding), "configMapWrapper", klog.KObj(envelopeObj))

// Try to see if we already have a work represent the same enveloped object for this CRP in the same cluster
// The ParentResourceSnapshotIndexLabel can change between snapshots so we have to exclude that label in the match
envelopWorkLabelMatcher := client.MatchingLabels{
Expand Down Expand Up @@ -544,6 +589,11 @@ func (r *Reconciler) getConfigMapEnvelopWorkObj(ctx context.Context, workNamePre
fleetv1beta1.EnvelopeNameLabel: envelopeObj.GetName(),
fleetv1beta1.EnvelopeNamespaceLabel: envelopeObj.GetNamespace(),
},
Annotations: map[string]string{
fleetv1beta1.ParentResourceSnapshotNameAnnotation: resourceBinding.Spec.ResourceSnapshotName,
fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation: resourceOverrideSnapshotHash,
fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation: clusterResourceOverrideSnapshotHash,
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: fleetv1beta1.GroupVersion.String(),
Expand All @@ -567,16 +617,19 @@ func (r *Reconciler) getConfigMapEnvelopWorkObj(ctx context.Context, workNamePre
klog.ErrorS(controller.NewUnexpectedBehaviorError(fmt.Errorf("find %d work representing configMap", len(workList.Items))),
"snapshot", klog.KObj(resourceSnapshot), "resourceBinding", klog.KObj(resourceBinding), "configMapWrapper", klog.KObj(envelopeObj))
}
// we just pick the first one if there are more than one.
work := workList.Items[0]
work.Labels[fleetv1beta1.ParentResourceSnapshotIndexLabel] = resourceSnapshot.Labels[fleetv1beta1.ResourceIndexLabel]
work.Annotations[fleetv1beta1.ParentResourceSnapshotNameAnnotation] = resourceBinding.Spec.ResourceSnapshotName
work.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation] = resourceOverrideSnapshotHash
work.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation] = clusterResourceOverrideSnapshotHash
work.Spec.Workload.Manifests = manifest
work.Spec.ApplyStrategy = resourceBinding.Spec.ApplyStrategy
return &work, nil
}

// generateSnapshotWorkObj generates the work object for the corresponding snapshot
func generateSnapshotWorkObj(workName string, resourceBinding *fleetv1beta1.ClusterResourceBinding, resourceSnapshot *fleetv1beta1.ClusterResourceSnapshot, manifest []fleetv1beta1.Manifest) *fleetv1beta1.Work {
func generateSnapshotWorkObj(workName string, resourceBinding *fleetv1beta1.ClusterResourceBinding, resourceSnapshot *fleetv1beta1.ClusterResourceSnapshot,
manifest []fleetv1beta1.Manifest, resourceOverrideSnapshotHash, clusterResourceOverrideSnapshotHash string) *fleetv1beta1.Work {
return &fleetv1beta1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: workName,
Expand All @@ -586,6 +639,11 @@ func generateSnapshotWorkObj(workName string, resourceBinding *fleetv1beta1.Clus
fleetv1beta1.CRPTrackingLabel: resourceBinding.Labels[fleetv1beta1.CRPTrackingLabel],
fleetv1beta1.ParentResourceSnapshotIndexLabel: resourceSnapshot.Labels[fleetv1beta1.ResourceIndexLabel],
},
Annotations: map[string]string{
fleetv1beta1.ParentResourceSnapshotNameAnnotation: resourceBinding.Spec.ResourceSnapshotName,
fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation: resourceOverrideSnapshotHash,
fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation: clusterResourceOverrideSnapshotHash,
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: fleetv1beta1.GroupVersion.String(),
Expand Down Expand Up @@ -619,6 +677,7 @@ func (r *Reconciler) upsertWork(ctx context.Context, newWork, existingWork *flee
"resourceSnapshot", resourceSnapshotObj, "work", workObj)
return true, nil
}
// TODO: remove the compare after we did the check on all work in the sync all
// check if we need to update the existing work object
workResourceIndex, err := labels.ExtractResourceSnapshotIndexFromWork(existingWork)
if err != nil {
Expand All @@ -628,12 +687,20 @@ func (r *Reconciler) upsertWork(ctx context.Context, newWork, existingWork *flee
// we already checked the label in fetchAllResourceSnapShots function so no need to check again
resourceIndex, _ := labels.ExtractResourceIndexFromClusterResourceSnapshot(resourceSnapshot)
if workResourceIndex == resourceIndex {
// no need to do anything if the work is generated from the same resource snapshot group since the resource snapshot is immutable.
klog.V(2).InfoS("Work is already associated with the desired resourceSnapshot", "resourceIndex", resourceIndex, "work", workObj, "resourceSnapshot", resourceSnapshotObj)
return false, nil
// no need to do anything if the work is generated from the same snapshots
if existingWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation] == newWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation] &&
existingWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation] == newWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation] {
klog.V(2).InfoS("Work is not associated with the desired override snapshots", "existingROHash", existingWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation],
"existingCROHash", existingWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation], "work", workObj)
return false, nil
}
klog.V(2).InfoS("Work is already associated with the desired resourceSnapshot but still not having the right override snapshots", "resourceIndex", resourceIndex, "work", workObj, "resourceSnapshot", resourceSnapshotObj)
}
// need to update the existing work, only two possible changes:
existingWork.Labels[fleetv1beta1.ParentResourceSnapshotIndexLabel] = resourceSnapshot.Labels[fleetv1beta1.ResourceIndexLabel]
// need to copy the new work to the existing work, only 5 possible changes:
existingWork.Labels[fleetv1beta1.ParentResourceSnapshotIndexLabel] = newWork.Labels[fleetv1beta1.ParentResourceSnapshotIndexLabel]
existingWork.Annotations[fleetv1beta1.ParentResourceSnapshotNameAnnotation] = newWork.Annotations[fleetv1beta1.ParentResourceSnapshotNameAnnotation]
existingWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation] = newWork.Annotations[fleetv1beta1.ParentResourceOverrideSnapshotHashAnnotation]
existingWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation] = newWork.Annotations[fleetv1beta1.ParentClusterResourceOverrideSnapshotHashAnnotation]
existingWork.Spec.Workload.Manifests = newWork.Spec.Workload.Manifests
if err := r.Client.Update(ctx, existingWork); err != nil {
klog.ErrorS(err, "Failed to update the work associated with the resourceSnapshot", "resourceSnapshot", resourceSnapshotObj, "work", workObj)
Expand Down
Loading

0 comments on commit cb9a7a0

Please sign in to comment.