Skip to content

Commit

Permalink
Requeue ServiceExport when the global IP becomes available
Browse files Browse the repository at this point in the history
...rather than continuously re-queueing/retrying when it isn't available yet.
This reduces the amount of re-queueing which can accumulate significant delays
over time at scale due to the queue's bucket rate limiter. The
globalIngressIPCache was adjusted to take a callback to be notified when the
associated GlobalIngressIP is created or updated.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis authored and skitt committed May 7, 2024
1 parent 988d51e commit a7ea394
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 91 deletions.
45 changes: 22 additions & 23 deletions pkg/agent/controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,18 +252,19 @@ func (a *Controller) serviceExportToServiceImport(obj runtime.Object, _ int, op

if svcType == mcsv1a1.ClusterSetIP {
if a.globalnetEnabled {
ip, reason, msg := a.getGlobalIP(svc)
if ip == "" {
ingressIP := a.getIngressIP(svc.Name, svc.Namespace)
if ingressIP.allocatedIP == "" {
logger.V(log.DEBUG).Infof("Service to be exported (%s/%s) doesn't have a global IP yet",
svcExport.Namespace, svcExport.Name)
// Globalnet enabled but service doesn't have globalIp yet, Update the status and requeue
// Globalnet enabled but service doesn't have globalIp yet - update the status.
a.serviceExportClient.updateStatusConditions(ctx, svcExport.Name, svcExport.Namespace,
newServiceExportCondition(mcsv1a1.ServiceExportValid, corev1.ConditionFalse, reason, msg))
newServiceExportCondition(mcsv1a1.ServiceExportValid, corev1.ConditionFalse, ingressIP.unallocatedReason,
ingressIP.unallocatedMsg))

return nil, true
return nil, false
}

serviceImport.Spec.IPs = []string{ip}
serviceImport.Spec.IPs = []string{ingressIP.allocatedIP}
} else {
serviceImport.Spec.IPs = []string{svc.Spec.ClusterIP}
}
Expand Down Expand Up @@ -380,26 +381,24 @@ func (a *Controller) getObjectNameWithClusterID(name, namespace string) string {
return name + "-" + namespace + "-" + a.clusterID
}

func (a *Controller) getGlobalIP(service *corev1.Service) (ip, reason, msg string) {
if a.globalnetEnabled {
ingressIP, found := a.getIngressIP(service.Name, service.Namespace)
if !found {
return "", defaultReasonIPUnavailable, defaultMsgIPUnavailable
}

return ingressIP.allocatedIP, ingressIP.unallocatedReason, ingressIP.unallocatedMsg
}

return "", "GlobalnetDisabled", "Globalnet is not enabled"
}
func (a *Controller) getIngressIP(name, namespace string) *IngressIP {
ret, _ := a.serviceImportController.globalIngressIPCache.getForService(namespace, name,
func(obj *unstructured.Unstructured) (any, bool) {
ingressIP := parseIngressIP(obj)
return ingressIP, ingressIP.allocatedIP != ""
}, func() {
a.serviceExportSyncer.RequeueResource(name, namespace)
})

func (a *Controller) getIngressIP(name, namespace string) (*IngressIP, bool) {
obj, found := a.serviceImportController.globalIngressIPCache.getForService(namespace, name)
if !found {
return nil, false
if ret == nil {
ret = &IngressIP{
namespace: namespace,
unallocatedReason: defaultReasonIPUnavailable,
unallocatedMsg: defaultMsgIPUnavailable,
}
}

return parseIngressIP(obj), true
return ret.(*IngressIP)
}

func (a *Controller) toServiceExport(obj runtime.Object) *mcsv1a1.ServiceExport {
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/controller_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ func assertEquivalentConditions(actual, expected *mcsv1a1.ServiceExportCondition
Expect(actual.Message).To(Not(BeNil()), "Actual: %s", out)

if expected.Message != nil {
Expect(*actual.Message).To(Equal(*expected.Message), "Actual: %s", out)
Expect(*actual.Message).To(ContainSubstring(*expected.Message), "Actual: %s", out)
}
}

Expand Down
93 changes: 72 additions & 21 deletions pkg/agent/controller/global_ingressip_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ limitations under the License.
package controller

import (
"sync"

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/watcher"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -30,22 +28,32 @@ import (

//nolint:gocritic // (hugeParam) This function modifies config so we don't want to pass by pointer.
func newGlobalIngressIPCache(config watcher.Config) (*globalIngressIPCache, error) {
c := &globalIngressIPCache{}
c := &globalIngressIPCache{
byService: globalIngressIPMap{
entries: map[string]*globalIngressIPEntry{},
},
byPod: globalIngressIPMap{
entries: map[string]*globalIngressIPEntry{},
},
byEndpoints: globalIngressIPMap{
entries: map[string]*globalIngressIPEntry{},
},
}

config.ResourceConfigs = []watcher.ResourceConfig{
{
Name: "GlobalIngressIP watcher",
ResourceType: GetGlobalIngressIPObj(),
Handler: watcher.EventHandlerFuncs{
OnCreateFunc: func(obj runtime.Object, numRequeues int) bool {
OnCreateFunc: func(obj runtime.Object, _ int) bool {
c.onCreateOrUpdate(obj.(*unstructured.Unstructured))
return false
},
OnUpdateFunc: func(obj runtime.Object, numRequeues int) bool {
OnUpdateFunc: func(obj runtime.Object, _ int) bool {
c.onCreateOrUpdate(obj.(*unstructured.Unstructured))
return false
},
OnDeleteFunc: func(obj runtime.Object, numRequeues int) bool {
OnDeleteFunc: func(obj runtime.Object, _ int) bool {
c.onDelete(obj.(*unstructured.Unstructured))
return false
},
Expand All @@ -66,19 +74,39 @@ func (c *globalIngressIPCache) start(stopCh <-chan struct{}) error {
}

func (c *globalIngressIPCache) onCreateOrUpdate(obj *unstructured.Unstructured) {
c.applyToCache(obj, func(to *sync.Map, key string, obj *unstructured.Unstructured) {
to.Store(key, obj)
c.applyToCache(obj, func(to *globalIngressIPMap, key string, obj *unstructured.Unstructured) {
var onAddOrUpdate func()

to.Lock()

e := to.entries[key]
if e == nil {
e = &globalIngressIPEntry{}
to.entries[key] = e
}

e.obj = obj
onAddOrUpdate = e.onAddOrUpdate

to.Unlock()

if onAddOrUpdate != nil {
onAddOrUpdate()
}
})
}

func (c *globalIngressIPCache) onDelete(obj *unstructured.Unstructured) {
c.applyToCache(obj, func(to *sync.Map, key string, obj *unstructured.Unstructured) {
to.Delete(key)
c.applyToCache(obj, func(to *globalIngressIPMap, key string, _ *unstructured.Unstructured) {
to.Lock()
defer to.Unlock()

delete(to.entries, key)
})
}

func (c *globalIngressIPCache) applyToCache(obj *unstructured.Unstructured,
apply func(to *sync.Map, key string, obj *unstructured.Unstructured),
apply func(to *globalIngressIPMap, key string, obj *unstructured.Unstructured),
) {
target, _, _ := unstructured.NestedString(obj.Object, "spec", "target")
switch target {
Expand All @@ -95,25 +123,48 @@ func (c *globalIngressIPCache) applyToCache(obj *unstructured.Unstructured,
}
}

func (c *globalIngressIPCache) getForService(namespace, name string) (*unstructured.Unstructured, bool) {
return c.get(&c.byService, namespace, name)
func (c *globalIngressIPCache) getForService(namespace, name string, transform globalIngressIPTransformFn, onAddOrUpdate func(),
) (any, bool) {
return c.get(&c.byService, namespace, name, transform, onAddOrUpdate)
}

func (c *globalIngressIPCache) getForPod(namespace, name string) (*unstructured.Unstructured, bool) {
return c.get(&c.byPod, namespace, name)
func (c *globalIngressIPCache) getForPod(namespace, name string, transform globalIngressIPTransformFn, onAddOrUpdate func(),
) (any, bool) {
return c.get(&c.byPod, namespace, name, transform, onAddOrUpdate)
}

func (c *globalIngressIPCache) getForEndpoints(namespace, ip string) (*unstructured.Unstructured, bool) {
return c.get(&c.byEndpoints, namespace, ip)
func (c *globalIngressIPCache) getForEndpoints(namespace, ip string, transform globalIngressIPTransformFn, onAddOrUpdate func(),
) (any, bool) {
return c.get(&c.byEndpoints, namespace, ip, transform, onAddOrUpdate)
}

func (c *globalIngressIPCache) get(from *sync.Map, namespace, name string) (*unstructured.Unstructured, bool) {
v, found := from.Load(c.key(namespace, name))
if !found {
func (c *globalIngressIPCache) get(from *globalIngressIPMap, namespace, name string, transform globalIngressIPTransformFn,
onAddOrUpdate func(),
) (any, bool) {
from.Lock()
defer from.Unlock()

key := c.key(namespace, name)

e := from.entries[key]
if e == nil {
e = &globalIngressIPEntry{}
from.entries[key] = e
}

if e.obj == nil {
e.onAddOrUpdate = onAddOrUpdate
return nil, false
}

return v.(*unstructured.Unstructured), true
r, ok := transform(e.obj)
if !ok {
e.onAddOrUpdate = onAddOrUpdate
} else {
e.onAddOrUpdate = nil
}

return r, ok
}

func (c *globalIngressIPCache) key(ns, n string) string {
Expand Down
22 changes: 5 additions & 17 deletions pkg/agent/controller/globalingressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
package controller

import (
"github.com/submariner-io/admiral/pkg/util"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

Expand All @@ -32,38 +33,25 @@ const (

type IngressIP struct {
namespace string
target string
allocatedIP string
unallocatedReason string
unallocatedMsg string
}

func parseIngressIP(obj *unstructured.Unstructured) *IngressIP {
var (
found bool
err error
)

gip := &IngressIP{}
gip.namespace = obj.GetNamespace()

gip.target, found, err = unstructured.NestedString(obj.Object, "spec", "target")
if !found || err != nil {
logger.Errorf(nil, "target field not found in spec %#v", obj.Object)
return nil
}

gip.allocatedIP, _, _ = unstructured.NestedString(obj.Object, "status", "allocatedIP")
if gip.allocatedIP == "" {
gip.unallocatedMsg = defaultMsgIPUnavailable
gip.unallocatedReason = defaultReasonIPUnavailable

conditions, _, _ := unstructured.NestedSlice(obj.Object, "status", "conditions")
conditions := util.ConditionsFromUnstructured(obj, "status", "conditions")
for i := range conditions {
c := conditions[i].(map[string]interface{})
if c["type"].(string) == "Allocated" {
gip.unallocatedMsg = c["message"].(string)
gip.unallocatedReason = c["reason"].(string)
if conditions[i].Type == "Allocated" {
gip.unallocatedMsg = "Unable to obtain global IP: " + conditions[i].Message
gip.unallocatedReason = conditions[i].Reason

break
}
Expand Down
25 changes: 25 additions & 0 deletions pkg/agent/controller/globalnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
discovery "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/utils/ptr"
)

var _ = Describe("Globalnet enabled", func() {
Expand Down Expand Up @@ -72,6 +73,7 @@ var _ = Describe("Globalnet enabled", func() {
It("should update the ServiceExport status appropriately and eventually export the service", func() {
t.cluster1.awaitServiceExportCondition(newServiceExportValidCondition(corev1.ConditionFalse, "ServiceGlobalIPUnavailable"))

By("Creating GlobalIngressIP")
t.cluster1.createGlobalIngressIP(ingressIP)
t.awaitNonHeadlessServiceExported(&t.cluster1)
})
Expand All @@ -87,6 +89,7 @@ var _ = Describe("Globalnet enabled", func() {
It("should update the ServiceExport status appropriately and eventually export the service", func() {
t.cluster1.awaitServiceExportCondition(newServiceExportValidCondition(corev1.ConditionFalse, "ServiceGlobalIPUnavailable"))

By("Updating GlobalIngressIP")
setIngressAllocatedIP(ingressIP, globalIP1)
test.UpdateResource(t.cluster1.localIngressIPClient, ingressIP)
t.awaitNonHeadlessServiceExported(&t.cluster1)
Expand Down Expand Up @@ -189,5 +192,27 @@ var _ = Describe("Globalnet enabled", func() {
It("should export the service with the global IPs", func() {
t.awaitHeadlessServiceExported(&t.cluster1)
})

Context("and it initially does not have a global IP for all endpoint addresses", func() {
BeforeEach(func() {
t.cluster1.serviceEndpointSlices[0].Endpoints = append(t.cluster1.serviceEndpointSlices[0].Endpoints,
discovery.Endpoint{
Addresses: []string{epIP3},
Hostname: ptr.To("host3"),
})

t.cluster1.headlessEndpointAddresses[0] = append(t.cluster1.headlessEndpointAddresses[0],
discovery.Endpoint{
Addresses: []string{globalIP3},
Hostname: ptr.To("host3"),
})
})

It("should eventually export the service with the global IPs", func() {
t.cluster1.ensureNoEndpointSlice()
t.cluster1.createGlobalIngressIP(t.cluster1.newHeadlessGlobalIngressIPForEndpointIP("three", globalIP3, epIP3))
t.awaitEndpointSlice(&t.cluster1)
})
})
})
})
Loading

0 comments on commit a7ea394

Please sign in to comment.