From 6f192040607503770d97cc34014066a45dc4fc53 Mon Sep 17 00:00:00 2001 From: Donnie Adams Date: Thu, 27 Feb 2025 17:34:00 -0500 Subject: [PATCH] enhance: add enqueue key with delay If a user uses resp.RetryAfter in a handler, the object's key is enqueued without the trigger designation. That means, that all triggers for the object will be invoked regardless of whether the object changes when it goes through the handlers. This is not the desired behavior. If the object doesn't change, then the triggers should not be invoked. This change adds an EnqueueKeyWithDelay function that achieves this desired behavior. Signed-off-by: Donnie Adams --- pkg/router/handler.go | 2 +- pkg/router/trigger.go | 4 +--- pkg/runtime/backend.go | 13 ++----------- pkg/runtime/controller.go | 22 +++++++++------------- pkg/runtime/errorcontroller.go | 3 +++ pkg/runtime/sharedcontroller.go | 4 ++++ 6 files changed, 20 insertions(+), 28 deletions(-) diff --git a/pkg/router/handler.go b/pkg/router/handler.go index 8026b8a..c048b09 100644 --- a/pkg/router/handler.go +++ b/pkg/router/handler.go @@ -327,7 +327,7 @@ func (m *HandlerSet) handle(gvk schema.GroupVersionKind, key string, unmodifiedO if unmodifiedObject == nil { // A nil object here means that the object was deleted, so unregister the triggers m.triggers.UnregisterAndTrigger(req) - } else { + } else if !req.FromTrigger { m.triggers.Trigger(req) } diff --git a/pkg/router/trigger.go b/pkg/router/trigger.go index 9e49115..f93a98e 100644 --- a/pkg/router/trigger.go +++ b/pkg/router/trigger.go @@ -105,9 +105,7 @@ func (m *triggers) shouldAddTrigger(gvk schema.GroupVersionKind, key string, tar } func (m *triggers) Trigger(req Request) { - if !req.FromTrigger { - m.invokeTriggers(req) - } + m.invokeTriggers(req) } func (m *triggers) Register(sourceGVK schema.GroupVersionKind, key string, obj runtime.Object, namespace, name string, selector labels.Selector, fields fields.Selector) (schema.GroupVersionKind, bool, error) { diff --git a/pkg/runtime/backend.go b/pkg/runtime/backend.go index 1f6a7fc..76c9e91 100644 --- a/pkg/runtime/backend.go +++ b/pkg/runtime/backend.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "strconv" - "strings" "sync" "time" @@ -87,16 +86,8 @@ func (b *Backend) Trigger(ctx context.Context, gvk schema.GroupVersionKind, key if err != nil { return err } - if delay > 0 { - ns, name, ok := strings.Cut(key, "/") - if ok { - controller.EnqueueAfter(ns, name, delay) - } else { - controller.EnqueueAfter("", key, delay) - } - } else { - controller.EnqueueKey(router.TriggerPrefix + key) - } + + controller.EnqueueKeyAfter(router.TriggerPrefix+key, delay) return nil } diff --git a/pkg/runtime/controller.go b/pkg/runtime/controller.go index f84ab82..4b4842c 100644 --- a/pkg/runtime/controller.go +++ b/pkg/runtime/controller.go @@ -40,6 +40,7 @@ type Controller interface { Enqueue(namespace, name string) EnqueueAfter(namespace, name string, delay time.Duration) EnqueueKey(key string) + EnqueueKeyAfter(key string, delay time.Duration) Cache() (cache.Cache, error) Start(ctx context.Context, workers int) error } @@ -206,7 +207,7 @@ func (c *controller) Start(ctx context.Context, workers int) error { if c.registration == nil { registration, err := c.informer.AddEventHandler(clientgocache.ResourceEventHandlerFuncs{ AddFunc: c.handleObject, - UpdateFunc: func(old, new interface{}) { + UpdateFunc: func(old, new any) { c.handleObject(new) }, DeleteFunc: c.handleObject, @@ -332,27 +333,22 @@ func (c *controller) syncHandler(ctx context.Context, key string) error { } func (c *controller) EnqueueKey(key string) { + c.EnqueueKeyAfter(key, 0) +} + +func (c *controller) EnqueueKeyAfter(key string, after time.Duration) { c.startLock.Lock() defer c.startLock.Unlock() if c.workqueues == nil { - c.startKeys = append(c.startKeys, startKey{key: key}) + c.startKeys = append(c.startKeys, startKey{key: key, after: after}) } else { - c.workqueues[c.splitter.Split(key)].Add(key) + c.workqueues[c.splitter.Split(key)].AddAfter(key, after) } } func (c *controller) Enqueue(namespace, name string) { - key := keyFunc(namespace, name) - - c.startLock.Lock() - defer c.startLock.Unlock() - - if c.workqueues == nil { - c.startKeys = append(c.startKeys, startKey{key: key}) - } else { - c.workqueues[c.splitter.Split(key)].AddRateLimited(key) - } + c.EnqueueAfter(namespace, name, 0) } func (c *controller) EnqueueAfter(namespace, name string, duration time.Duration) { diff --git a/pkg/runtime/errorcontroller.go b/pkg/runtime/errorcontroller.go index 4c41001..11f71fb 100644 --- a/pkg/runtime/errorcontroller.go +++ b/pkg/runtime/errorcontroller.go @@ -21,6 +21,9 @@ func (n *errorController) Enqueue(namespace, name string) { func (n *errorController) EnqueueAfter(namespace, name string, delay time.Duration) { } +func (n *errorController) EnqueueKeyAfter(key string, delay time.Duration) { +} + func (n *errorController) EnqueueKey(key string) { } diff --git a/pkg/runtime/sharedcontroller.go b/pkg/runtime/sharedcontroller.go index e9c8748..9c1f11c 100644 --- a/pkg/runtime/sharedcontroller.go +++ b/pkg/runtime/sharedcontroller.go @@ -53,6 +53,10 @@ func (s *sharedController) EnqueueAfter(namespace, name string, delay time.Durat s.initController().EnqueueAfter(namespace, name, delay) } +func (s *sharedController) EnqueueKeyAfter(key string, delay time.Duration) { + s.initController().EnqueueKeyAfter(key, delay) +} + func (s *sharedController) EnqueueKey(key string) { s.initController().EnqueueKey(key) }