Skip to content

Commit

Permalink
Merge branch 'main' into json_context_optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
JimBugwadia authored Sep 8, 2023
2 parents 0a6028c + 5beaec6 commit 29111cd
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 97 deletions.
13 changes: 10 additions & 3 deletions pkg/clients/dclient/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,17 @@ func (c serverResources) FindResource(groupVersion string, kind string) (apiReso

func (c serverResources) FindResources(group, version, kind, subresource string) (map[TopLevelApiDescription]metav1.APIResource, error) {
resources, err := c.findResources(group, version, kind, subresource)
if err != nil {
if !c.cachedClient.Fresh() {
// if no resource was found, we have to force cache invalidation
if err != nil || len(resources) == 0 {
if !c.cachedClient.Fresh() || len(resources) == 0 {
c.cachedClient.Invalidate()
return c.findResources(group, version, kind, subresource)
resources, err := c.findResources(group, version, kind, subresource)
if err != nil {
return nil, err
} else if len(resources) == 0 {
return nil, fmt.Errorf("failed to find resource (%s/%s/%s/%s)", group, version, kind, subresource)
}
return resources, err
}
}
return resources, err
Expand Down
104 changes: 25 additions & 79 deletions pkg/controllers/ttl/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,30 @@ import (
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/api/kyverno"
"github.com/kyverno/kyverno/pkg/metrics"
controllerutils "github.com/kyverno/kyverno/pkg/utils/controller"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/metadata"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

const (
// Workers is the number of workers for this controller
maxRetries = 10
)

type controller struct {
name string
client metadata.Getter
queue workqueue.RateLimitingInterface
lister cache.GenericLister
wg wait.Group
informer cache.SharedIndexInformer
registration cache.ResourceEventHandlerRegistration
logger logr.Logger
Expand All @@ -39,21 +44,29 @@ type ttlMetrics struct {
}

func newController(client metadata.Getter, metainformer informers.GenericInformer, logger logr.Logger, gvr schema.GroupVersionResource) (*controller, error) {
name := gvr.Version + "/" + gvr.Resource
if gvr.Group != "" {
name = gvr.Group + "/" + name
}
queue := workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: name})
c := &controller{
name: name,
client: client,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
queue: queue,
lister: metainformer.Lister(),
wg: wait.Group{},
informer: metainformer.Informer(),
logger: logger,
metrics: newTTLMetrics(logger),
}
registration, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleAdd,
UpdateFunc: c.handleUpdate,
})
enqueue := controllerutils.LogError(logger, controllerutils.Parse(controllerutils.MetaNamespaceKey, controllerutils.Queue(queue)))
registration, err := controllerutils.AddEventHandlers(
c.informer,
controllerutils.AddFunc(logger, enqueue),
controllerutils.UpdateFunc(logger, enqueue),
nil,
)
if err != nil {
logger.Error(err, "failed to register event handler")
logger.Error(err, "failed to register event handlers")
return nil, err
}
c.registration = registration
Expand Down Expand Up @@ -82,46 +95,18 @@ func newTTLMetrics(logger logr.Logger) ttlMetrics {
}
}

func (c *controller) handleAdd(obj interface{}) {
c.enqueue(obj)
}

func (c *controller) handleUpdate(oldObj, newObj interface{}) {
old := oldObj.(metav1.Object)
new := newObj.(metav1.Object)
if old.GetResourceVersion() != new.GetResourceVersion() {
c.enqueue(newObj)
}
}

func (c *controller) Start(ctx context.Context, workers int) {
for i := 0; i < workers; i++ {
c.wg.StartWithContext(ctx, func(ctx context.Context) {
defer c.logger.V(3).Info("worker stopped")
c.logger.V(3).Info("worker starting ....")
wait.UntilWithContext(ctx, c.worker, 1*time.Second)
})
}
controllerutils.Run(ctx, c.logger, c.name, time.Second, c.queue, workers, maxRetries, c.reconcile)
}

func (c *controller) Stop() {
defer c.logger.V(3).Info("queue stopped")
defer c.wg.Wait()
// Unregister the event handlers
c.deregisterEventHandlers()
c.logger.V(3).Info("queue stopping ....")
c.queue.ShutDown()
}

func (c *controller) enqueue(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
c.logger.Error(err, "failed to extract name")
return
}
c.queue.Add(key)
}

// deregisterEventHandlers deregisters the event handlers from the informer.
func (c *controller) deregisterEventHandlers() {
err := c.informer.RemoveEventHandler(c.registration)
Expand All @@ -132,36 +117,7 @@ func (c *controller) deregisterEventHandlers() {
c.logger.V(3).Info("deregistered event handlers")
}

func (c *controller) worker(ctx context.Context) {
for {
if !c.processItem() {
// No more items in the queue, exit the loop
break
}
}
}

func (c *controller) processItem() bool {
item, shutdown := c.queue.Get()
if shutdown {
return false
}
// In any case we need to call Done
defer c.queue.Done(item)
err := c.reconcile(item.(string))
if err != nil {
c.logger.Error(err, "reconciliation failed")
c.queue.AddRateLimited(item)
return true
} else {
// If no error, we call Forget to reset the rate limiter
c.queue.Forget(item)
}
return true
}

func (c *controller) reconcile(itemKey string) error {
logger := c.logger.WithValues("key", itemKey)
func (c *controller) reconcile(ctx context.Context, logger logr.Logger, itemKey string, _, _ string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(itemKey)
if err != nil {
return err
Expand All @@ -175,43 +131,33 @@ func (c *controller) reconcile(itemKey string) error {
// there was an error, return it to requeue the key
return err
}

metaObj, err := meta.Accessor(obj)
if err != nil {
logger.Info("object is not of type metav1.Object")
return err
}

commonLabels := []attribute.KeyValue{
attribute.String("resource_namespace", metaObj.GetNamespace()),
attribute.String("resource_group", c.gvr.Group),
attribute.String("resource_version", c.gvr.Version),
attribute.String("resource", c.gvr.Resource),
}

// if the object is being deleted, return early
if metaObj.GetDeletionTimestamp() != nil {
return nil
}

labels := metaObj.GetLabels()
ttlValue, ok := labels[kyverno.LabelCleanupTtl]

if !ok {
// No 'ttl' label present, no further action needed
return nil
}

var deletionTime time.Time

// Try parsing ttlValue as duration
err = parseDeletionTime(metaObj, &deletionTime, ttlValue)

if err != nil {
if err := parseDeletionTime(metaObj, &deletionTime, ttlValue); err != nil {
logger.Error(err, "failed to parse label", "value", ttlValue)
return nil
}

if time.Now().After(deletionTime) {
err = c.client.Namespace(namespace).Delete(context.Background(), metaObj.GetName(), metav1.DeleteOptions{})
if err != nil {
Expand Down
16 changes: 11 additions & 5 deletions pkg/controllers/ttl/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,17 @@ func (m *manager) start(ctx context.Context, gvr schema.GroupVersionResource, wo
options,
)
cont, cancel := context.WithCancel(ctx)
var wg wait.Group
wg.StartWithContext(cont, func(ctx context.Context) {
var informerWaitGroup wait.Group
informerWaitGroup.StartWithContext(cont, func(ctx context.Context) {
logger.V(3).Info("informer starting...")
defer logger.V(3).Info("informer stopping...")
informer.Informer().Run(cont.Done())
})
stopInformer := func() {
// Send stop signal to informer's goroutine
cancel()
// Wait for the group to terminate
wg.Wait()
informerWaitGroup.Wait()
}
if !cache.WaitForCacheSync(ctx.Done(), informer.Informer().HasSynced) {
stopInformer()
Expand All @@ -168,11 +169,16 @@ func (m *manager) start(ctx context.Context, gvr schema.GroupVersionResource, wo
stopInformer()
return err
}
logger.V(3).Info("controller starting...")
controller.Start(cont, workers)
var controllerWaitGroup wait.Group
controllerWaitGroup.StartWithContext(cont, func(ctx context.Context) {
logger.V(3).Info("controller starting...")
defer logger.V(3).Info("controller stopping...")
controller.Start(ctx, workers)
})
m.resController[gvr] = func() {
stopInformer()
controller.Stop()
controllerWaitGroup.Wait()
}
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/engine/adapters/dclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package adapters

import (
"context"
"fmt"
"io"

"github.com/kyverno/kyverno/pkg/auth"
Expand Down Expand Up @@ -59,6 +60,9 @@ func (a *dclientAdapter) IsNamespaced(group, version, kind string) (bool, error)
if err != nil {
return false, err
}
if len(gvrss) != 1 {
return false, fmt.Errorf("function IsNamespaced expects only one resource, got (%d)", len(gvrss))
}
for _, apiResource := range gvrss {
return apiResource.Namespaced, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/handlers/validation/validate_cel.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func collectParams(ctx context.Context, client engineapi.Client, paramKind *admi
var paramsNamespace string
isNamespaced, err := client.IsNamespaced(gv.Group, gv.Version, kind)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to check if resource is namespaced or not (%w)", err)
}

// check if `paramKind` is namespace-scoped
Expand Down
29 changes: 20 additions & 9 deletions pkg/utils/controller/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,33 @@ type (
)

func AddEventHandlers(informer cache.SharedInformer, a addFunc, u updateFunc, d deleteFunc) (cache.ResourceEventHandlerRegistration, error) {
var onDelete deleteFunc
if d != nil {
onDelete = func(obj interface{}) {
d(kubeutils.GetObjectWithTombstone(obj))
}
}
return informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: a,
UpdateFunc: u,
DeleteFunc: func(obj interface{}) {
d(kubeutils.GetObjectWithTombstone(obj))
},
DeleteFunc: onDelete,
})
}

func AddEventHandlersT[T any](informer cache.SharedInformer, a addFuncT[T], u updateFuncT[T], d deleteFuncT[T]) (cache.ResourceEventHandlerRegistration, error) {
return AddEventHandlers(
informer,
func(obj interface{}) { a(obj.(T)) },
func(old, obj interface{}) { u(old.(T), obj.(T)) },
func(obj interface{}) { d(obj.(T)) },
)
var onAdd addFunc
var onUpdate updateFunc
var onDelete deleteFunc
if a != nil {
onAdd = func(obj interface{}) { a(obj.(T)) }
}
if u != nil {
onUpdate = func(old, obj interface{}) { u(old.(T), obj.(T)) }
}
if d != nil {
onDelete = func(obj interface{}) { d(obj.(T)) }
}
return AddEventHandlers(informer, onAdd, onUpdate, onDelete)
}

func AddKeyedEventHandlers(logger logr.Logger, informer cache.SharedInformer, queue workqueue.RateLimitingInterface, parseKey keyFunc) (EnqueueFunc, cache.ResourceEventHandlerRegistration, error) {
Expand Down

0 comments on commit 29111cd

Please sign in to comment.