-
Notifications
You must be signed in to change notification settings - Fork 0
/
manager.go
156 lines (139 loc) · 4.38 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package sup
import (
"fmt"
"sync"
"time"
"go.polydawn.net/go-sup/latch"
"go.polydawn.net/go-sup/sluice"
)
type manager struct {
reportingTo Supervisor // configured at start
ctrlChan_winddown latch.Fuse // set at init. fired by external event.
ctrlChan_quit latch.Fuse // set at init. fired by external event.
doneFuse latch.Fuse // set at init. fired to announce internal state change.
mu sync.Mutex // must hold while touching wards
accepting bool // must hold `mu`. if false, may no longer append to wards.
wards map[Writ]func() // live writs -> cancelfunc
ctrlChan_childDone chan Writ // writs report here when done
tombstones sluice.Sluice // of `Writ`s that are done and not yet externally ack'd. no sync needed.
}
type (
reqWrit struct {
name string
ret chan<- Writ
}
)
func newManager(reportingTo Supervisor) Manager {
mgr := &manager{
reportingTo: reportingTo,
ctrlChan_winddown: latch.NewFuse(),
ctrlChan_quit: latch.NewFuse(),
doneFuse: latch.NewFuse(),
accepting: true,
wards: make(map[Writ]func()),
ctrlChan_childDone: make(chan Writ),
tombstones: sluice.New(),
}
go mgr.run()
return mgr
}
func (mgr *manager) NewTask(name string) Writ {
return mgr.releaseWrit(name)
}
/*
"probably what you want" to do after launching all tasks to get your
management tree to wind up nice.
- Moves the manager to winddown mode (no more new tasks will be accepted).
- Starts gathering child statuses...
- If any are errors...
- Moves the manager to quit mode (quit signals are sent to all other children).
- Saves that error
- Keeps waiting
- When all children are done...
- Panic up the first error we gathered. (The rest are lost.)
*/
func (mgr *manager) Work() {
mgr.ctrlChan_winddown.Fire()
var devastation error
// While we're in the winddown state --
// Passively collecting results, and jump ourselves to quit in case of errors.
PreDoneLoop:
for {
// note: if we had a true fire-drill exit mode, we'd probably
// have a select over `<-mgr.reportingTo.QuitCh()` here as well.
// but we don't really believe in that: cleanup is important.
select {
case rcv := <-mgr.tombstones.Next():
writ := (rcv).(*writ)
if writ.err != nil {
msg := fmt.Sprintf("manager autoquitting because of error child error: %s", writ.err)
log(mgr.reportingTo.Name(), msg, writ.name, false)
devastation = writ.err
mgr.ctrlChan_quit.Fire()
break PreDoneLoop
}
case <-mgr.doneFuse.Selectable():
break PreDoneLoop
}
}
// If we need to keep waiting for alldone, we also tick during it, so
// we can warn you about children not responding to quit reasonably quickly.
quitTime := time.Now()
tick := time.NewTicker(2 * time.Second)
YUNoDoneLoop:
for {
select {
case <-tick.C:
mgr.mu.Lock()
var names []string
for ward, _ := range mgr.wards {
names = append(names, ward.Name().Coda())
if len(names) > 6 {
names = append(names, "...")
break
}
}
msg := fmt.Sprintf("quit %d ago, still waiting for children: %d remaining [%s]",
int(time.Now().Sub(quitTime).Seconds()),
len(mgr.wards),
names,
)
mgr.mu.Unlock()
log(mgr.reportingTo.Name(), msg, nil, true)
case <-mgr.doneFuse.Selectable():
break YUNoDoneLoop
}
}
tick.Stop()
// Now that we're fully done: range over all the child tombstones, so that
// any errors can be raised upstream (or if we already have a little
// bundle of joy, at least make brief mention of others in the log).
FinalizeLoop:
for {
select {
case rcv := <-mgr.tombstones.Next():
writ := (rcv).(*writ)
if writ.err != nil {
if devastation != nil {
msg := fmt.Sprintf("manager gathered additional errors while shutting down: %s", writ.err)
log(mgr.reportingTo.Name(), msg, writ.name, true)
continue
}
msg := fmt.Sprintf("manager gathered an error while shutting down: %s", writ.err)
log(mgr.reportingTo.Name(), msg, writ.name, true)
devastation = writ.err
}
default:
// no new tombstones should be coming, so the first time
// polling it blocks, we're done: leave.
break FinalizeLoop
}
}
// If we collected a child error at any point, raise it now.
if devastation != nil {
panic(devastation)
}
}
func (mgr *manager) GatherChild() <-chan sluice.T {
return mgr.tombstones.Next()
}