-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrebalancer.go
320 lines (286 loc) · 10.9 KB
/
rebalancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
package router
import (
"context"
"fmt"
"sort"
"strconv"
"github.com/kent-h/stateful-router/protos/peer"
)
func (router *Router) startRebalancer(handoffAndShutdown chan struct{}, ctxCancelFunc context.CancelFunc) {
fmt.Println("Starting rebalancer")
defer ctxCancelFunc()
router.rebalanceAll()
startingUp, shuttingDown := true, false
for {
router.eventMutex.Lock()
ch := make(chan struct{})
data := router.rebalanceEventData
router.rebalanceEventData = rebalanceEventData{
ch: ch,
newPeers: make(map[peer.PeerClient]struct{}),
resourceTypes: make(map[ResourceType]struct{}),
}
router.eventMutex.Unlock()
readiness := make([]*peer.Readiness, len(router.resources))
ctr := 0
for resourceType, resource := range router.resources {
// update readiness of any new peers
readiness[ctr] = &peer.Readiness{
ResourceType: uint32(resourceType),
Readiness: []byte(resource.readiness),
ReadyForEqual: resource.readyForEqual,
Max: resource.readinessMax,
}
ctr++
}
for node := range data.newPeers {
if _, err := node.UpdateReadiness(router.ctx, &peer.ReadinessRequest{
Ordinal: router.ordinal,
Readiness: readiness,
ShuttingDown: shuttingDown,
}); err != nil {
fmt.Println("unable to update new peer's readiness:", err)
}
}
if startingUp || shuttingDown {
startingUp = false
// at startup and shutdown, rebalance every resource
for resourceType := range router.resources {
data.resourceTypes[resourceType] = struct{}{}
}
}
for len(data.resourceTypes) != 0 {
for resourceType := range data.resourceTypes {
localResource := router.resources[resourceType]
localResource.mutex.RLock()
deviceCount := uint32(len(localResource.loaded))
var proposedDecrementReadiness string
for deviceId := range localResource.loaded {
if deviceId > proposedDecrementReadiness {
proposedDecrementReadiness = deviceId
}
}
localResource.mutex.RUnlock()
// we want to jump to max (accept all resources) if all nodes w/ > readiness are suddenly missing
// we want to increment if resources < average # of resources on nodes w/ >= the proposed readiness
// we want to decrement if resources > average # of resources on nodes w/ > the proposed readiness
router.peerMutex.RLock()
sortedNodes := make([]uint32, 1, len(router.peers)+1)
sortedNodes[0] = router.ordinal
var totalDeviceCount = uint64(deviceCount)
var nodeCount uint64 = 1
shouldJumpToMin := shuttingDown && (localResource.readinessMax || localResource.readiness != "" || localResource.readyForEqual)
shouldJumpToMax := true
for nodeId, node := range router.peers {
if node.connected && !node.shuttingDown {
nodeResource := node.resources[resourceType]
if nodeResource.readinessMax || nodeResource.readiness > localResource.readiness || (nodeResource.readiness == localResource.readiness && nodeResource.readyForEqual && !localResource.readyForEqual) {
shouldJumpToMax = false
}
totalDeviceCount += uint64(nodeResource.count)
nodeCount++
sortedNodes = append(sortedNodes, nodeId)
}
}
// sort nodes by ID
sort.Slice(sortedNodes, func(i, j int) bool { return sortedNodes[i] < sortedNodes[j] })
// resources per peer, and remainder
averageDevices := uint32(totalDeviceCount / nodeCount)
remainingDevices := totalDeviceCount % nodeCount
// calculate how many resources each peer should have
nodesShouldHave := make(map[uint32]uint32, len(sortedNodes))
for _, nodeId := range sortedNodes[0:remainingDevices] {
nodesShouldHave[nodeId] = averageDevices + 1
}
for _, nodeId := range sortedNodes[remainingDevices:] {
nodesShouldHave[nodeId] = averageDevices
}
increment, decrement := false, false
// if too few resources
if deviceCount < nodesShouldHave[router.ordinal] {
// pull device
increment = true
}
// if too many resources
if deviceCount > nodesShouldHave[router.ordinal] {
// iff ALL other nodes have >= correct number of resources OR have MAX readiness
allPeersMeetRequisite := true
for nodeId, node := range router.peers {
if node.connected && !node.shuttingDown {
if nodeData := node.resources[resourceType]; !(nodeData.count >= nodesShouldHave[nodeId] || nodeData.readinessMax) {
allPeersMeetRequisite = false
break
}
}
}
if allPeersMeetRequisite {
// push device
decrement = true
}
}
router.peerMutex.RUnlock()
if !shuttingDown && !localResource.readinessMax && (shouldJumpToMax || increment) {
// increase readiness
proposedIncrementReadiness, proposedIncrementMaxReadiness := router.searchPeersForNextResource(resourceType)
if proposedIncrementMaxReadiness {
fmt.Printf("%d Increment: Changing readiness to MAX\n", router.ordinal)
} else {
fmt.Printf("%d Increment: Changing readiness to ∀ <= %s\n", router.ordinal, strconv.Quote(proposedIncrementReadiness))
}
router.changeReadinessTo(resourceType, true, proposedIncrementReadiness, true, proposedIncrementMaxReadiness, false)
} else if shouldJumpToMin || decrement {
// decrease readiness
fmt.Printf("%d Decrement: Changing readiness to ∀ < %s\n", router.ordinal, strconv.Quote(proposedDecrementReadiness))
router.changeReadinessTo(resourceType, false, proposedDecrementReadiness, false, false, shuttingDown)
// after readiness is decreased, kick out any resources that no longer belong on this node
localResource.mutex.Lock()
originalDeviceCount := uint32(len(localResource.loaded))
instancesToMove := make(map[string]*syncher)
if !localResource.readinessMax {
for deviceId, device := range localResource.loaded {
//for every device that no longer belongs on this node
if deviceId > localResource.readiness || (deviceId == localResource.readiness && !localResource.readyForEqual) {
fmt.Printf("Will migrate device %s\n", strconv.Quote(deviceId))
//release and notify that it's moved
instancesToMove[deviceId] = device
delete(localResource.loaded, deviceId)
}
}
}
localResource.mutex.Unlock()
router.migrateResources(resourceType, instancesToMove, originalDeviceCount)
} else {
delete(data.resourceTypes, resourceType)
}
}
}
// if readiness has stabilized, we don't need to repeat unless something changes
// wait until something changes
select {
case <-handoffAndShutdown:
if !shuttingDown {
// run the rebalancer once more, to cleanly unload all resources
shuttingDown = true
} else {
return
}
case <-ch:
// event received
}
}
}
func (router *Router) searchPeersForNextResource(resourceType ResourceType) (string, bool) {
router.peerMutex.Lock()
toNotify := make(map[uint32]peer.PeerClient, len(router.peers))
for nodeId, node := range router.peers {
if node.connected {
toNotify[nodeId] = node.PeerClient
}
}
router.peerMutex.Unlock()
resource := router.resources[resourceType]
// keep nodes informed on how ready we are
next := ""
have := false
first, isOnlyDeviceToMigrate := true, false
for _, client := range toNotify {
// inform peers that we're ready for resources
resp, err := client.NextResource(router.ctx, &peer.NextResourceRequest{
Ordinal: router.ordinal,
ResourceType: uint32(resourceType),
Readiness: []byte(resource.readiness),
ReadyForEqual: resource.readyForEqual,
ReadinessMax: resource.readinessMax})
if err != nil {
fmt.Println("unable to get next resource from peer:", err)
} else {
// determine which device is next in line for migration
if resp.Has {
have = true
if first {
first = false
isOnlyDeviceToMigrate = resp.Last
next = string(resp.ResourceId)
} else {
isOnlyDeviceToMigrate = false
if string(resp.ResourceId) < next {
next = string(resp.ResourceId)
}
}
}
}
}
if isOnlyDeviceToMigrate {
next = ""
}
return next, !have || isOnlyDeviceToMigrate
}
// changeReadinessTo handles the complexity of changing readiness
// readiness must be updated then broadcast when increasing (start accepting requests, then have other nodes start sending requests)
// but it must be broadcast then updated when decreasing (have other nodes stop sending requests, then stop accepting requests)
// in addition, when decreasing from readinessMax, a peer may reject the request, in which case other peers must be reverted
// (this is to ensure that at least one node always has maximum readiness, so that at least one node can handle any request)
func (router *Router) changeReadinessTo(resourceType ResourceType, increase bool, readiness string, readyForEqual, readinessMax, shuttingDown bool) {
resource := router.resources[resourceType]
router.peerMutex.Lock()
if increase {
// change which requests we will accept locally
resource.readiness, resource.readyForEqual, resource.readinessMax = readiness, readyForEqual, readinessMax
} else {
// if we are decreasing from readinessMax, we will need to be able to conditionally reject requests
resource.decreasingFromMaxReadiness = resource.readinessMax && !readinessMax
}
toNotify := make([]peer.PeerClient, 0, len(router.peers))
for _, node := range router.peers {
if node.connected {
toNotify = append(toNotify, node.PeerClient)
}
}
router.peerMutex.Unlock()
// inform peers of our readiness
abort, undo := false, 0
for i, client := range toNotify {
if _, err := client.UpdateReadiness(router.ctx, &peer.ReadinessRequest{
Ordinal: router.ordinal,
Readiness: []*peer.Readiness{{
ResourceType: uint32(resourceType),
Readiness: []byte(readiness),
ReadyForEqual: readyForEqual,
Max: readinessMax,
}},
ShuttingDown: shuttingDown,
}); err != nil {
if err.Error() == "rpc error: code = Unknown desc = no other nodes exist with max readiness" {
abort, undo = true, i
} else {
fmt.Println("failed to update peer's readiness: ", err)
}
}
}
// if aborted, revert notified peers
for _, client := range toNotify[0:undo] {
if _, err := client.UpdateReadiness(router.ctx, &peer.ReadinessRequest{
Ordinal: router.ordinal,
Readiness: []*peer.Readiness{{
ResourceType: uint32(resourceType),
Readiness: []byte(resource.readiness),
ReadyForEqual: resource.readyForEqual,
Max: resource.readinessMax,
}},
ShuttingDown: shuttingDown,
}); err != nil {
fmt.Println("failed to revert peer's readiness: ", err)
}
}
if (!increase && !abort) || resource.decreasingFromMaxReadiness {
router.peerMutex.Lock()
if !increase && !abort {
// change which requests we will accept locally
resource.readiness, resource.readyForEqual, resource.readinessMax = readiness, readyForEqual, readinessMax
}
if resource.decreasingFromMaxReadiness {
resource.decreasingFromMaxReadiness = false
}
router.peerMutex.Unlock()
}
}