Skip to content

Commit 63ab448

Browse files
lcwangchaoti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#57067
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 7e13648 commit 63ab448

File tree

3 files changed

+95
-9
lines changed

3 files changed

+95
-9
lines changed

pkg/timer/runtime/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ go_test(
3535
embed = [":runtime"],
3636
flaky = True,
3737
race = "on",
38-
shard_count = 23,
38+
shard_count = 24,
3939
deps = [
4040
"//pkg/testkit/testsetup",
4141
"//pkg/timer/api",

pkg/timer/runtime/runtime.go

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ import (
1818
"context"
1919
"encoding/hex"
2020
"fmt"
21+
<<<<<<< HEAD
22+
=======
23+
"maps"
24+
"slices"
25+
>>>>>>> 69e8fdb60b5 (timer: make sure timer which has a max delay will be scheduled first (#57067))
2126
"sync"
2227
"time"
2328

@@ -241,9 +246,7 @@ func (rt *TimerGroupRuntime) fullRefreshTimers() {
241246

242247
func (rt *TimerGroupRuntime) tryTriggerTimerEvents() {
243248
now := rt.nowFunc()
244-
var retryTimerIDs []string
245-
var retryTimerKeys []string
246-
var busyWorkers map[string]struct{}
249+
var readyTimers []*timerCacheItem
247250
rt.cache.iterTryTriggerTimers(func(timer *api.TimerRecord, tryTriggerTime time.Time, nextEventTime *time.Time) bool {
248251
if tryTriggerTime.After(now) {
249252
return false
@@ -253,9 +256,45 @@ func (rt *TimerGroupRuntime) tryTriggerTimerEvents() {
253256
return true
254257
}
255258

259+
if readyTimers == nil {
260+
readyTimers = make([]*timerCacheItem, 0, 8)
261+
}
262+
263+
readyTimers = append(readyTimers, &timerCacheItem{
264+
timer: timer,
265+
nextEventTime: nextEventTime,
266+
})
267+
return true
268+
})
269+
270+
if len(readyTimers) == 0 {
271+
return
272+
}
273+
274+
// resort timer to make sure the timer has the smallest nextEventTime has a higher priority to trigger
275+
slices.SortFunc(readyTimers, func(a, b *timerCacheItem) int {
276+
if a.nextEventTime == nil || b.nextEventTime == nil {
277+
if a.nextEventTime != nil {
278+
return 1
279+
}
280+
281+
if b.nextEventTime != nil {
282+
return -1
283+
}
284+
285+
return 0
286+
}
287+
return a.nextEventTime.Compare(*b.nextEventTime)
288+
})
289+
290+
var retryTimerIDs []string
291+
var retryTimerKeys []string
292+
var busyWorkers map[string]struct{}
293+
for i, item := range readyTimers {
294+
timer := item.timer
256295
worker, ok := rt.ensureWorker(timer.HookClass)
257296
if !ok {
258-
return true
297+
continue
259298
}
260299

261300
eventID := timer.EventID
@@ -273,20 +312,22 @@ func (rt *TimerGroupRuntime) tryTriggerTimerEvents() {
273312

274313
select {
275314
case <-rt.ctx.Done():
276-
return false
315+
return
277316
case worker.ch <- req:
278317
rt.cache.setTimerProcStatus(timer.ID, procTriggering, eventID)
279318
default:
280319
if busyWorkers == nil {
281-
busyWorkers = make(map[string]struct{})
320+
busySize := len(readyTimers) - i
321+
retryTimerIDs = make([]string, 0, busySize)
322+
retryTimerKeys = make([]string, 0, busySize)
323+
busyWorkers = make(map[string]struct{}, busySize)
282324
}
283325

284326
busyWorkers[timer.HookClass] = struct{}{}
285327
retryTimerIDs = append(retryTimerIDs, timer.ID)
286328
retryTimerKeys = append(retryTimerKeys, fmt.Sprintf("[%s] %s", timer.Namespace, timer.Key))
287329
}
288-
return true
289-
})
330+
}
290331

291332
if len(retryTimerIDs) > 0 {
292333
busyWorkerList := make([]string, 0, len(busyWorkers))

pkg/timer/runtime/runtime_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,51 @@ func TestTryTriggerTimer(t *testing.T) {
264264
consumeAndVerify(t3)
265265
}
266266

267+
func TestTryTriggerTimePriority(t *testing.T) {
268+
now := time.Now()
269+
store := api.NewMemoryTimerStore()
270+
defer store.Close()
271+
runtime := NewTimerRuntimeBuilder("g1", store).Build()
272+
runtime.setNowFunc(func() time.Time {
273+
return now
274+
})
275+
runtime.initCtx()
276+
ch := make(chan *triggerEventRequest, 2)
277+
runtime.workers["hook1"] = &hookWorker{ch: ch}
278+
279+
t1 := newTestTimer("t1", "1m", now.Add(-time.Hour))
280+
runtime.cache.updateTimer(t1)
281+
runtime.cache.updateNextTryTriggerTime(t1.ID, now.Add(-3*time.Minute))
282+
283+
t2 := newTestTimer("t2", "1m", now.Add(-2*time.Hour))
284+
runtime.cache.updateTimer(t2)
285+
runtime.cache.updateNextTryTriggerTime(t2.ID, now.Add(-2*time.Minute))
286+
287+
t3 := newTestTimer("t3", "1h", now)
288+
t3.EventStatus = api.SchedEventTrigger
289+
t3.EventID = "event2"
290+
t3.EventStart = now.Add(-time.Minute)
291+
t3.Enable = false
292+
runtime.cache.updateTimer(t3)
293+
294+
t4 := newTestTimer("t4", "1m", now.Add(-10*time.Hour))
295+
runtime.cache.updateTimer(t4)
296+
runtime.cache.updateNextTryTriggerTime(t4.ID, now.Add(time.Minute))
297+
298+
// nextEventTime: t3 (nil) < t4 < t2 < t1
299+
// nextTryTriggerTime: t1 < t2 < t3 (eventStart) < t4
300+
// we should test the priority trigger is ordered by `nextEventTime` because to ensure the timer who has a max
301+
// delay time will be triggered first.
302+
// t4 should not be scheduled for the next trigger time is after now.
303+
// so, t3 and t2 will be triggered when the capacity of chan is 2
304+
runtime.tryTriggerTimerEvents()
305+
require.Equal(t, procTriggering, runtime.cache.items[t2.ID].procStatus)
306+
require.Equal(t, procTriggering, runtime.cache.items[t3.ID].procStatus)
307+
// t1, t4 should keep not triggered
308+
require.Equal(t, procIdle, runtime.cache.items[t1.ID].procStatus)
309+
require.Equal(t, procIdle, runtime.cache.items[t4.ID].procStatus)
310+
}
311+
267312
func TestHandleHookWorkerResponse(t *testing.T) {
268313
now := time.Now()
269314
store := api.NewMemoryTimerStore()

0 commit comments

Comments
 (0)