Skip to content

Commit

Permalink
ttl: optimize ttl
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Nov 5, 2024
1 parent 8a62d5a commit da31f68
Showing 1 changed file with 45 additions and 8 deletions.
53 changes: 45 additions & 8 deletions pkg/timer/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/hex"
"fmt"
"maps"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -252,9 +253,7 @@ func (rt *TimerGroupRuntime) fullRefreshTimers() {

func (rt *TimerGroupRuntime) tryTriggerTimerEvents() {
now := rt.nowFunc()
var retryTimerIDs []string
var retryTimerKeys []string
var busyWorkers map[string]struct{}
var readyTimers []*timerCacheItem
rt.cache.iterTryTriggerTimers(func(timer *api.TimerRecord, tryTriggerTime time.Time, nextEventTime *time.Time) bool {
if tryTriggerTime.After(now) {
return false
Expand All @@ -264,9 +263,45 @@ func (rt *TimerGroupRuntime) tryTriggerTimerEvents() {
return true
}

if readyTimers == nil {
readyTimers = make([]*timerCacheItem, 0, 8)
}

readyTimers = append(readyTimers, &timerCacheItem{
timer: timer,
nextEventTime: nextEventTime,
})
return true
})

if len(readyTimers) == 0 {
return
}

// resort timer to make sure the timer has the smallest nextEventTime has a higher priority to trigger
slices.SortFunc(readyTimers, func(a, b *timerCacheItem) int {
if a.nextEventTime == nil || b.nextEventTime == nil {
if a.nextEventTime != nil {
return 1
}

if b.nextEventTime != nil {
return -1
}

return 0
}
return a.nextEventTime.Compare(*b.nextEventTime)
})

var retryTimerIDs []string
var retryTimerKeys []string
var busyWorkers map[string]struct{}
for i, item := range readyTimers {
timer := item.timer
worker, ok := rt.ensureWorker(timer.HookClass)
if !ok {
return true
continue
}

eventID := timer.EventID
Expand All @@ -284,20 +319,22 @@ func (rt *TimerGroupRuntime) tryTriggerTimerEvents() {

select {
case <-rt.ctx.Done():
return false
return
case worker.ch <- req:
rt.cache.setTimerProcStatus(timer.ID, procTriggering, eventID)
default:
if busyWorkers == nil {
busyWorkers = make(map[string]struct{})
busySize := len(readyTimers) - i
retryTimerIDs = make([]string, 0, busySize)
retryTimerKeys = make([]string, 0, busySize)
busyWorkers = make(map[string]struct{}, busySize)
}

busyWorkers[timer.HookClass] = struct{}{}
retryTimerIDs = append(retryTimerIDs, timer.ID)
retryTimerKeys = append(retryTimerKeys, fmt.Sprintf("[%s] %s", timer.Namespace, timer.Key))
}
return true
})
}

if len(retryTimerIDs) > 0 {
busyWorkerList := make([]string, 0, len(busyWorkers))
Expand Down

0 comments on commit da31f68

Please sign in to comment.