Skip to content

Commit 597a464

Browse files
Workqueue Batching improvements (#63)
Using StopAndWait would cause a very long-running task in the queue to block any additional work being processed (can happen on long running k8s object deletions). This will allow us to control pulling work off the queue w/o overwhelming the pool while still not blocking.
1 parent c3abbe7 commit 597a464

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

pkg/sync/loop.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,13 @@ func (engine *Engine) ControlLoop() {
3333

3434
engine.RegisterHandlers()
3535

36+
pool := pond.New(20, 100, pond.MinWorkers(20))
37+
3638
wait.PollInfinite(syncDelay, func() (done bool, err error) {
3739
log.Info("Polling for new service updates")
38-
pool := pond.New(20, 100, pond.MinWorkers(20))
39-
for i := 0; i < engine.svcQueue.Len(); i++ {
40+
count := min(engine.svcQueue.Len(), 20-pool.RunningWorkers())
41+
log.Info("pulling next %d items from queue", count)
42+
for i := 0; i < count; i++ {
4043
item, shutdown := engine.svcQueue.Get()
4144
if shutdown {
4245
return true, nil
@@ -47,7 +50,6 @@ func (engine *Engine) ControlLoop() {
4750
}
4851
})
4952
}
50-
pool.StopAndWait()
5153
return false, nil
5254

5355
})

0 commit comments

Comments
 (0)