Skip to content

Commit

Permalink
Add cluster name to aggregated SI status on local SI creation
Browse files Browse the repository at this point in the history
...instead of when local EPS is synced to the broker. This can cause
inconsistency if another service instance is unexported on another
cluster simultaneously. The scenario is:

- a service is exported on C2
- the service is then exported on C1. The local SI is created and it's
  observed that the aggregated SI on the broker already exists.
- the service on C2 is unexported and the aggregated SI is deleted b/c
  its cluster status is now empty.
- the local EPS on C1 is synced to the broker. At this point, it tries
  to update the aggregated SI with the cluster info but it no longer exists.

There’s a couple ways to address it.

1) Do create-or-update when merging the local cluster info on EPS creation.
   The downside is that this wouldn’t do the service type conflict checking
   although the possibility that the SI was re-created by another cluster
   with a different service type in that window would be remote.

2) Add the cluster name to the aggregated SI cluster status when created on
   local SI creation. This would’ve prevented C1 from deleting the aggregated
   SI b/c C2's name would’ve been present in the cluster status. I didn’t do
   it this way for consistency so the cluster name and port info are added
   atomically and after the EPS has been successfully exported to ensure it’s
   ready for use if a consumer observes the cluster info present. But this
   isn’t a requirement.

The consensus is #2.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Mar 27, 2024
1 parent 89782b6 commit add4147
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pkg/agent/controller/headless_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ var _ = Describe("Headless Service export", func() {
Context("and no backend service EndpointSlice initially exists", func() {
It("should eventually export the EndpointSlice", func() {
t.cluster1.createServiceExport()
t.awaitAggregatedServiceImport(mcsv1a1.Headless, t.cluster1.service.Name, t.cluster1.service.Namespace)
t.awaitAggregatedServiceImport(mcsv1a1.Headless, t.cluster1.service.Name, t.cluster1.service.Namespace, &t.cluster1)

t.cluster1.createServiceEndpointSlices()
t.awaitEndpointSlice(&t.cluster1)
Expand Down
21 changes: 19 additions & 2 deletions pkg/agent/controller/service_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob
Type: localServiceImport.Spec.Type,
Ports: []mcsv1a1.ServicePort{},
},
Status: mcsv1a1.ServiceImportStatus{
Clusters: []mcsv1a1.ClusterStatus{
{
Cluster: c.clusterID,
},
},
},
}

conflict := false
Expand Down Expand Up @@ -330,9 +337,19 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob
} else {
c.serviceExportClient.removeStatusCondition(ctx, serviceName, serviceNamespace, mcsv1a1.ServiceExportConflict,
typeConflictReason)

var added bool

existing.Status.Clusters, added = slices.AppendIfNotPresent(existing.Status.Clusters,
mcsv1a1.ClusterStatus{Cluster: c.clusterID}, clusterStatusKey)

if added {
logger.V(log.DEBUG).Infof("Added cluster name %q to aggregated ServiceImport %q. New status: %#v",
c.clusterID, existing.Name, existing.Status.Clusters)
}
}

return obj, nil
return c.converter.toUnstructured(existing), nil
})
if err == nil && !conflict {
err = c.startEndpointsController(localServiceImport)
Expand All @@ -345,7 +362,7 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob
}

if result == util.OperationResultCreated {
logger.V(log.DEBUG).Infof("Created aggregated ServiceImport %q", aggregate.Name)
logger.V(log.DEBUG).Infof("Created aggregated ServiceImport %s", resource.ToJSON(aggregate))
}

return err
Expand Down
10 changes: 0 additions & 10 deletions pkg/agent/controller/service_import_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,6 @@ func newServiceImportAggregator(brokerClient dynamic.Interface, brokerNamespace,

func (a *ServiceImportAggregator) updateOnCreateOrUpdate(ctx context.Context, name, namespace string) error {
return a.update(ctx, name, namespace, func(existing *mcsv1a1.ServiceImport) error {
var added bool

existing.Status.Clusters, added = slices.AppendIfNotPresent(existing.Status.Clusters,
mcsv1a1.ClusterStatus{Cluster: a.clusterID}, clusterStatusKey)

if added {
logger.V(log.DEBUG).Infof("Added cluster name %q to aggregated ServiceImport %q. New status: %#v",
a.clusterID, existing.Name, existing.Status.Clusters)
}

return a.setServicePorts(ctx, existing)
})
}
Expand Down

0 comments on commit add4147

Please sign in to comment.