Skip to content

Commit 69e8fdb

Browse files
authored
timer: make sure timer which has a max delay will be scheduled first (#57067)
close #57137
1 parent 34ef14a commit 69e8fdb

File tree

3 files changed

+91
-9
lines changed

3 files changed

+91
-9
lines changed

pkg/timer/runtime/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ go_test(
3535
embed = [":runtime"],
3636
flaky = True,
3737
race = "on",
38-
shard_count = 23,
38+
shard_count = 24,
3939
deps = [
4040
"//pkg/testkit/testsetup",
4141
"//pkg/timer/api",

pkg/timer/runtime/runtime.go

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"encoding/hex"
2020
"fmt"
2121
"maps"
22+
"slices"
2223
"sync"
2324
"time"
2425

@@ -252,9 +253,7 @@ func (rt *TimerGroupRuntime) fullRefreshTimers() {
252253

253254
func (rt *TimerGroupRuntime) tryTriggerTimerEvents() {
254255
now := rt.nowFunc()
255-
var retryTimerIDs []string
256-
var retryTimerKeys []string
257-
var busyWorkers map[string]struct{}
256+
var readyTimers []*timerCacheItem
258257
rt.cache.iterTryTriggerTimers(func(timer *api.TimerRecord, tryTriggerTime time.Time, nextEventTime *time.Time) bool {
259258
if tryTriggerTime.After(now) {
260259
return false
@@ -264,9 +263,45 @@ func (rt *TimerGroupRuntime) tryTriggerTimerEvents() {
264263
return true
265264
}
266265

266+
if readyTimers == nil {
267+
readyTimers = make([]*timerCacheItem, 0, 8)
268+
}
269+
270+
readyTimers = append(readyTimers, &timerCacheItem{
271+
timer: timer,
272+
nextEventTime: nextEventTime,
273+
})
274+
return true
275+
})
276+
277+
if len(readyTimers) == 0 {
278+
return
279+
}
280+
281+
// resort timer to make sure the timer has the smallest nextEventTime has a higher priority to trigger
282+
slices.SortFunc(readyTimers, func(a, b *timerCacheItem) int {
283+
if a.nextEventTime == nil || b.nextEventTime == nil {
284+
if a.nextEventTime != nil {
285+
return 1
286+
}
287+
288+
if b.nextEventTime != nil {
289+
return -1
290+
}
291+
292+
return 0
293+
}
294+
return a.nextEventTime.Compare(*b.nextEventTime)
295+
})
296+
297+
var retryTimerIDs []string
298+
var retryTimerKeys []string
299+
var busyWorkers map[string]struct{}
300+
for i, item := range readyTimers {
301+
timer := item.timer
267302
worker, ok := rt.ensureWorker(timer.HookClass)
268303
if !ok {
269-
return true
304+
continue
270305
}
271306

272307
eventID := timer.EventID
@@ -284,20 +319,22 @@ func (rt *TimerGroupRuntime) tryTriggerTimerEvents() {
284319

285320
select {
286321
case <-rt.ctx.Done():
287-
return false
322+
return
288323
case worker.ch <- req:
289324
rt.cache.setTimerProcStatus(timer.ID, procTriggering, eventID)
290325
default:
291326
if busyWorkers == nil {
292-
busyWorkers = make(map[string]struct{})
327+
busySize := len(readyTimers) - i
328+
retryTimerIDs = make([]string, 0, busySize)
329+
retryTimerKeys = make([]string, 0, busySize)
330+
busyWorkers = make(map[string]struct{}, busySize)
293331
}
294332

295333
busyWorkers[timer.HookClass] = struct{}{}
296334
retryTimerIDs = append(retryTimerIDs, timer.ID)
297335
retryTimerKeys = append(retryTimerKeys, fmt.Sprintf("[%s] %s", timer.Namespace, timer.Key))
298336
}
299-
return true
300-
})
337+
}
301338

302339
if len(retryTimerIDs) > 0 {
303340
busyWorkerList := make([]string, 0, len(busyWorkers))

pkg/timer/runtime/runtime_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,51 @@ func TestTryTriggerTimer(t *testing.T) {
264264
consumeAndVerify(t3)
265265
}
266266

267+
func TestTryTriggerTimePriority(t *testing.T) {
268+
now := time.Now()
269+
store := api.NewMemoryTimerStore()
270+
defer store.Close()
271+
runtime := NewTimerRuntimeBuilder("g1", store).Build()
272+
runtime.setNowFunc(func() time.Time {
273+
return now
274+
})
275+
runtime.initCtx()
276+
ch := make(chan *triggerEventRequest, 2)
277+
runtime.workers["hook1"] = &hookWorker{ch: ch}
278+
279+
t1 := newTestTimer("t1", "1m", now.Add(-time.Hour))
280+
runtime.cache.updateTimer(t1)
281+
runtime.cache.updateNextTryTriggerTime(t1.ID, now.Add(-3*time.Minute))
282+
283+
t2 := newTestTimer("t2", "1m", now.Add(-2*time.Hour))
284+
runtime.cache.updateTimer(t2)
285+
runtime.cache.updateNextTryTriggerTime(t2.ID, now.Add(-2*time.Minute))
286+
287+
t3 := newTestTimer("t3", "1h", now)
288+
t3.EventStatus = api.SchedEventTrigger
289+
t3.EventID = "event2"
290+
t3.EventStart = now.Add(-time.Minute)
291+
t3.Enable = false
292+
runtime.cache.updateTimer(t3)
293+
294+
t4 := newTestTimer("t4", "1m", now.Add(-10*time.Hour))
295+
runtime.cache.updateTimer(t4)
296+
runtime.cache.updateNextTryTriggerTime(t4.ID, now.Add(time.Minute))
297+
298+
// nextEventTime: t3 (nil) < t4 < t2 < t1
299+
// nextTryTriggerTime: t1 < t2 < t3 (eventStart) < t4
300+
// we should test the priority trigger is ordered by `nextEventTime` because to ensure the timer who has a max
301+
// delay time will be triggered first.
302+
// t4 should not be scheduled for the next trigger time is after now.
303+
// so, t3 and t2 will be triggered when the capacity of chan is 2
304+
runtime.tryTriggerTimerEvents()
305+
require.Equal(t, procTriggering, runtime.cache.items[t2.ID].procStatus)
306+
require.Equal(t, procTriggering, runtime.cache.items[t3.ID].procStatus)
307+
// t1, t4 should keep not triggered
308+
require.Equal(t, procIdle, runtime.cache.items[t1.ID].procStatus)
309+
require.Equal(t, procIdle, runtime.cache.items[t4.ID].procStatus)
310+
}
311+
267312
func TestHandleHookWorkerResponse(t *testing.T) {
268313
now := time.Now()
269314
store := api.NewMemoryTimerStore()

0 commit comments

Comments
 (0)