Skip to content

Commit 657c193

Browse files
Work on dynamic poller fixes (#519)
This apparently is still broken, working on a fix now
1 parent ff4680e commit 657c193

File tree

5 files changed

+34
-24
lines changed

5 files changed

+34
-24
lines changed

internal/helpers/poll.go

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ import (
1414
// If syncFirstRun is set to true, it will execute the condition function synchronously first and then start
1515
// polling. Since error is returned synchronously, the only way to check for it is to use syncFirstRun.
1616
// Background poller does not sync errors. It can be stopped externally by cancelling the provided context.
17-
func DynamicBackgroundPollUntilContextCancel(ctx context.Context, getInterval func() time.Duration, syncFirstRun bool, condition wait.ConditionWithContextFunc) (err error) {
17+
func DynamicBackgroundPollUntilContextCancel(ctx context.Context, getInterval func() time.Duration, syncFirstRun bool, callback wait.ConditionWithContextFunc) (err error) {
1818
if syncFirstRun {
19-
_, err = condition(ctx)
19+
_, err = callback(ctx)
2020
}
2121

2222
go func() {
23-
_ = DynamicPollUntilContextCancel(ctx, getInterval, condition)
23+
_ = DynamicPollUntilContextCancel(ctx, getInterval, callback)
2424
}()
2525

2626
return err
@@ -29,37 +29,34 @@ func DynamicBackgroundPollUntilContextCancel(ctx context.Context, getInterval fu
2929
func DynamicPollUntilContextCancel(
3030
ctx context.Context,
3131
intervalFunc func() time.Duration,
32-
condition wait.ConditionWithContextFunc,
32+
callback wait.ConditionWithContextFunc,
3333
) error {
3434
for {
3535
interval := intervalFunc()
3636

3737
// Handle inactive state (interval == 0) and wait 1sec
38-
for interval <= 0 {
39-
ticker := time.NewTicker(time.Second)
38+
if interval <= 0 {
4039
select {
4140
case <-ctx.Done():
42-
ticker.Stop()
4341
return ctx.Err()
44-
case <-ticker.C:
45-
interval = intervalFunc()
42+
case <-time.After(time.Second):
43+
continue
4644
}
47-
ticker.Stop()
4845
}
4946

50-
jitter := time.Duration(rand.Int63n(int64(interval)) - int64(interval/2))
47+
asInt := int64(interval)
48+
jitter := time.Duration(rand.Int63n(asInt) - asInt/2)
5149

52-
// Active polling mode
5350
select {
54-
case <-ctx.Done():
55-
return ctx.Err()
5651
case <-time.After(interval + jitter):
57-
if ok, err := func() (bool, error) {
52+
if done, err := func() (bool, error) {
5853
defer runtime.HandleCrashWithContext(ctx)
59-
return condition(ctx)
60-
}(); err != nil || ok {
54+
return callback(ctx)
55+
}(); err != nil || done {
6156
return err
6257
}
58+
case <-ctx.Done():
59+
return ctx.Err()
6360
}
6461
}
6562
}

pkg/controller/controller.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ func (c *Controller) SetupWithManager(manager *Manager) {
6262
c.PollJitter = manager.PollJitter
6363
c.DeQueueJitter = time.Second
6464
c.lastPollTime = time.Now()
65+
c.lastReconcileTime = time.Now()
6566
}
6667

6768
// Start implements controller.Controller.
@@ -119,7 +120,7 @@ func (c *Controller) startPoller(ctx context.Context) {
119120
defer c.Do.Shutdown()
120121

121122
klog.V(internallog.LogLevelTrace).InfoS("Starting controller poller", "ctrl", c.Name)
122-
_ = helpers.DynamicPollUntilContextCancel(ctx, c.Do.GetPollInterval(), func(_ context.Context) (bool, error) {
123+
err := helpers.DynamicPollUntilContextCancel(ctx, c.Do.GetPollInterval(), func(_ context.Context) (bool, error) {
123124
defer func() {
124125
c.lastPollTime = time.Now()
125126
}()
@@ -129,11 +130,13 @@ func (c *Controller) startPoller(ctx context.Context) {
129130
}
130131

131132
c.lastPollTime = time.Now()
132-
time.Sleep(time.Duration(rand.Int63n(int64(c.PollJitter))))
133-
134133
// never stop
135134
return false, nil
136135
})
136+
if err != nil {
137+
klog.V(internallog.LogLevelDefault).ErrorS(err, "Controller poller failed", "ctrl", c.Name)
138+
}
139+
137140
klog.V(internallog.LogLevelDefault).InfoS("Controller poller finished", "ctrl", c.Name)
138141
}
139142

@@ -200,7 +203,7 @@ func (c *Controller) reconcile(ctx context.Context, req string) (_ reconcile.Res
200203
for _, fn := range utilruntime.PanicHandlers {
201204
fn(ctx, r)
202205
}
203-
err = fmt.Errorf("panic: %v [recovered]", r)
206+
logf.FromContext(ctx).V(1).Error(err, fmt.Sprintf("Observed a panic in reconciler: %v", r))
204207
return
205208
}
206209

pkg/controller/controller_manager.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func (cm *Manager) startControllerSupervised(ctx context.Context, ctrl *Controll
141141
// Make last controller action deadline 5 times the time of regular poll.
142142
// It means that the controller hasn't polled/reconciled any resources.
143143
// It could indicate that the controller might have died and should be restarted.
144-
lastControllerActionDeadline := 5 * (cm.PollInterval + cm.PollJitter)
144+
145145
ticker := time.NewTicker(livenessCheckInterval)
146146
defer ticker.Stop()
147147

@@ -176,6 +176,15 @@ func (cm *Manager) startControllerSupervised(ctx context.Context, ctrl *Controll
176176
cm.restartController(internalCtx, ctrl)
177177
}()
178178
case <-ticker.C:
179+
lastControllerActionDeadline := 5 * ctrl.Do.GetPollInterval()()
180+
if lastControllerActionDeadline <= 0 {
181+
klog.V(log.LogLevelDebug).InfoS(
182+
"Controller poll interval is 0, skipping last poll time check",
183+
"name", ctrl.Name,
184+
)
185+
continue
186+
}
187+
179188
lastPollTime := ctrl.LastPollTime()
180189
klog.V(log.LogLevelDebug).InfoS(
181190
"Controller last poll time check",
@@ -187,6 +196,7 @@ func (cm *Manager) startControllerSupervised(ctx context.Context, ctrl *Controll
187196
"Controller unresponsive, restarting",
188197
"ctrl", ctrl.Name,
189198
"lastPollTime", lastPollTime.Format(time.RFC3339),
199+
"lastControllerActionDeadline", lastControllerActionDeadline.String(),
190200
)
191201
cancel()
192202
break
@@ -211,6 +221,7 @@ func (cm *Manager) startControllerSupervised(ctx context.Context, ctrl *Controll
211221
"Controller unresponsive, restarting",
212222
"ctrl", ctrl.Name,
213223
"lastReconcileTime", lastPollTime.Format(time.RFC3339),
224+
"lastControllerActionDeadline", lastControllerActionDeadline.String(),
214225
)
215226
cancel()
216227
}

pkg/controller/restore/reconciler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (s *RestoreReconciler) Shutdown() {
8989
}
9090

9191
func (s *RestoreReconciler) GetPollInterval() func() time.Duration {
92-
return func() time.Duration { return s.pollInterval } // use default poll interval
92+
return func() time.Duration { return time.Duration(0) } // use default poll interval
9393
}
9494

9595
func (s *RestoreReconciler) GetPublisher() (string, websocket.Publisher) {

pkg/controller/service/reconciler.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ func (s *ServiceReconciler) GetPollInterval() func() time.Duration {
144144
if servicePollInterval := agentcommon.GetConfigurationManager().GetServicePollInterval(); servicePollInterval != nil {
145145
return *servicePollInterval
146146
}
147-
148147
return s.pollInterval
149148
}
150149
}

0 commit comments

Comments
 (0)