Skip to content

Commit b062d9e

Browse files
committed
enhance: add enqueue key with delay
If a user uses resp.RetryAfter in a handler, the object's key is enqueued without the trigger designation. That means, that all triggers for the object will be invoked regardless of whether the object changes when it goes through the handlers. This is not the desired behavior. If the object doesn't change, then the triggers should not be invoked. This change adds an EnqueueKeyWithDelay function that achieves this desired behavior. Signed-off-by: Donnie Adams <[email protected]>
1 parent 3825a76 commit b062d9e

File tree

7 files changed

+54
-28
lines changed

7 files changed

+54
-28
lines changed

pkg/router/handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ func (m *HandlerSet) handle(gvk schema.GroupVersionKind, key string, unmodifiedO
327327
if unmodifiedObject == nil {
328328
// A nil object here means that the object was deleted, so unregister the triggers
329329
m.triggers.UnregisterAndTrigger(req)
330-
} else {
330+
} else if !req.FromTrigger {
331331
m.triggers.Trigger(req)
332332
}
333333

pkg/router/router.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ func (r *Router) DumpTriggers(indent bool) ([]byte, error) {
6464
return b, nil
6565
}
6666

67+
func (r *Router) DumpTriggersGraphViz() []byte {
68+
return r.handlers.triggers.DumpGraphViz()
69+
}
70+
6771
type RouteBuilder struct {
6872
includeRemove bool
6973
includeFinalizing bool

pkg/router/trigger.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package router
22

33
import (
4+
"bytes"
45
"encoding/json"
6+
"fmt"
57
"strings"
68
"sync"
79

@@ -105,9 +107,7 @@ func (m *triggers) shouldAddTrigger(gvk schema.GroupVersionKind, key string, tar
105107
}
106108

107109
func (m *triggers) Trigger(req Request) {
108-
if !req.FromTrigger {
109-
m.invokeTriggers(req)
110-
}
110+
m.invokeTriggers(req)
111111
}
112112

113113
func (m *triggers) Register(sourceGVK schema.GroupVersionKind, key string, obj runtime.Object, namespace, name string, selector labels.Selector, fields fields.Selector) (schema.GroupVersionKind, bool, error) {
@@ -180,6 +180,34 @@ func (m *triggers) Dump(indent bool) ([]byte, error) {
180180
return json.MarshalIndent(m.matchers, "", " ")
181181
}
182182

183+
func (m *triggers) DumpGraphViz() []byte {
184+
m.lock.RLock()
185+
defer m.lock.RUnlock()
186+
187+
var buf bytes.Buffer
188+
189+
buf.WriteString(`digraph mygraph {
190+
fontname="Helvetica,Arial,sans-serif"
191+
node [fontname="Helvetica,Arial,sans-serif"]
192+
edge [fontname="Helvetica,Arial,sans-serif"]
193+
node [shape=box];
194+
`)
195+
196+
for key := range m.matchers {
197+
for et, matchers := range m.matchers[key] {
198+
for matcher := range matchers {
199+
text, _ := et.MarshalText()
200+
buf.WriteString(fmt.Sprintf(`"%s" -> "%s"
201+
`, text, matcher))
202+
}
203+
}
204+
}
205+
206+
buf.WriteString(`}`)
207+
208+
return buf.Bytes()
209+
}
210+
183211
type groupVersionKind struct {
184212
schema.GroupVersionKind
185213
}

pkg/runtime/backend.go

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"os"
77
"strconv"
8-
"strings"
98
"sync"
109
"time"
1110

@@ -87,16 +86,8 @@ func (b *Backend) Trigger(ctx context.Context, gvk schema.GroupVersionKind, key
8786
if err != nil {
8887
return err
8988
}
90-
if delay > 0 {
91-
ns, name, ok := strings.Cut(key, "/")
92-
if ok {
93-
controller.EnqueueAfter(ns, name, delay)
94-
} else {
95-
controller.EnqueueAfter("", key, delay)
96-
}
97-
} else {
98-
controller.EnqueueKey(router.TriggerPrefix + key)
99-
}
89+
90+
controller.EnqueueKeyAfter(router.TriggerPrefix+key, delay)
10091
return nil
10192
}
10293

pkg/runtime/controller.go

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type Controller interface {
4040
Enqueue(namespace, name string)
4141
EnqueueAfter(namespace, name string, delay time.Duration)
4242
EnqueueKey(key string)
43+
EnqueueKeyAfter(key string, delay time.Duration)
4344
Cache() (cache.Cache, error)
4445
Start(ctx context.Context, workers int) error
4546
}
@@ -206,7 +207,7 @@ func (c *controller) Start(ctx context.Context, workers int) error {
206207
if c.registration == nil {
207208
registration, err := c.informer.AddEventHandler(clientgocache.ResourceEventHandlerFuncs{
208209
AddFunc: c.handleObject,
209-
UpdateFunc: func(old, new interface{}) {
210+
UpdateFunc: func(old, new any) {
210211
c.handleObject(new)
211212
},
212213
DeleteFunc: c.handleObject,
@@ -332,27 +333,22 @@ func (c *controller) syncHandler(ctx context.Context, key string) error {
332333
}
333334

334335
func (c *controller) EnqueueKey(key string) {
336+
c.EnqueueKeyAfter(key, 0)
337+
}
338+
339+
func (c *controller) EnqueueKeyAfter(key string, after time.Duration) {
335340
c.startLock.Lock()
336341
defer c.startLock.Unlock()
337342

338343
if c.workqueues == nil {
339-
c.startKeys = append(c.startKeys, startKey{key: key})
344+
c.startKeys = append(c.startKeys, startKey{key: key, after: after})
340345
} else {
341-
c.workqueues[c.splitter.Split(key)].Add(key)
346+
c.workqueues[c.splitter.Split(key)].AddAfter(key, after)
342347
}
343348
}
344349

345350
func (c *controller) Enqueue(namespace, name string) {
346-
key := keyFunc(namespace, name)
347-
348-
c.startLock.Lock()
349-
defer c.startLock.Unlock()
350-
351-
if c.workqueues == nil {
352-
c.startKeys = append(c.startKeys, startKey{key: key})
353-
} else {
354-
c.workqueues[c.splitter.Split(key)].AddRateLimited(key)
355-
}
351+
c.EnqueueAfter(namespace, name, 0)
356352
}
357353

358354
func (c *controller) EnqueueAfter(namespace, name string, duration time.Duration) {

pkg/runtime/errorcontroller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ func (n *errorController) Enqueue(namespace, name string) {
2121
func (n *errorController) EnqueueAfter(namespace, name string, delay time.Duration) {
2222
}
2323

24+
func (n *errorController) EnqueueKeyAfter(key string, delay time.Duration) {
25+
}
26+
2427
func (n *errorController) EnqueueKey(key string) {
2528
}
2629

pkg/runtime/sharedcontroller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func (s *sharedController) EnqueueAfter(namespace, name string, delay time.Durat
5353
s.initController().EnqueueAfter(namespace, name, delay)
5454
}
5555

56+
func (s *sharedController) EnqueueKeyAfter(key string, delay time.Duration) {
57+
s.initController().EnqueueKeyAfter(key, delay)
58+
}
59+
5660
func (s *sharedController) EnqueueKey(key string) {
5761
s.initController().EnqueueKey(key)
5862
}

0 commit comments

Comments
 (0)