-
Notifications
You must be signed in to change notification settings - Fork 14
/
ignore.go
141 lines (121 loc) · 2.93 KB
/
ignore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package metafora
import (
"container/heap"
"sync"
"time"
)
// ignoremgr handles ignoring tasks and sending them back to the consumer once
// their ignore deadline is reached.
type ignoremgr struct {
incoming chan *timetask
stop <-chan struct{}
mu *sync.RWMutex
ignores map[string]struct{}
}
func ignorer(tasks chan<- Task, stop <-chan struct{}) *ignoremgr {
im := &ignoremgr{
incoming: make(chan *timetask),
stop: stop,
mu: &sync.RWMutex{},
ignores: make(map[string]struct{}),
}
go im.monitor(tasks, stop)
return im
}
func (im *ignoremgr) add(task Task, until time.Time) {
// short circuit zero times; queue everything else
if until.IsZero() {
return
}
// Add to ignore map
im.mu.Lock()
im.ignores[task.ID()] = struct{}{}
im.mu.Unlock()
// Send to monitor for pushing onto time heap
select {
case im.incoming <- &timetask{time: until, task: task}:
case <-im.stop:
// Don't bother adding ignore if we're just exiting
}
}
func (im *ignoremgr) ignored(taskID string) (ignored bool) {
im.mu.RLock()
_, ok := im.ignores[taskID]
im.mu.RUnlock()
return ok
}
func (im *ignoremgr) monitor(tasks chan<- Task, stop <-chan struct{}) {
times := timeheap{}
heap.Init(×)
var next *timetask
for {
if times.Len() > 0 {
// Get next ignore from the ignore heap
next = heap.Pop(×).(*timetask)
} else {
// No ignores! Wait for one to come in or an exit signal
select {
case <-stop:
return
case newtask := <-im.incoming:
next = newtask
}
}
// this duration *may* be negative, in which case the
// task will be pushed immediately
timer := time.NewTimer(time.Until(next.time))
select {
case newtask := <-im.incoming:
// Push onto next task and new task onto time heap
heap.Push(×, newtask)
heap.Push(×, next)
// Stop the existing timer for this loop iteration
timer.Stop()
case <-timer.C:
// Ignore expired, remove the entry
im.mu.Lock()
delete(im.ignores, next.task.ID())
im.mu.Unlock()
// Notify the consumer
select {
case tasks <- next.task:
case <-stop:
return
}
case <-stop:
return
}
}
}
func (im *ignoremgr) all() []string {
im.mu.RLock()
defer im.mu.RUnlock()
ignores := make([]string, len(im.ignores))
i := 0
for k := range im.ignores {
ignores[i] = k
i++
}
return ignores
}
type timetask struct {
time time.Time
task Task
}
// timeheap is a min-heap of time/task tuples sorted by time.
type timeheap []*timetask
func (h timeheap) Len() int { return len(h) }
func (h timeheap) Less(i, j int) bool { return h[i].time.Before(h[j].time) }
func (h timeheap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *timeheap) Push(x interface{}) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(*timetask))
}
func (h *timeheap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}