-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpeer_api.go
177 lines (157 loc) · 6.12 KB
/
peer_api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package router
import (
"context"
"errors"
"fmt"
"github.com/golang/protobuf/ptypes/empty"
"github.com/kent-h/stateful-router/protos/peer"
)
type peerApi struct {
*Router
}
func (router peerApi) Hello(ctx context.Context, request *peer.HelloRequest) (*empty.Empty, error) {
router.connect(request.Ordinal)
return &empty.Empty{}, nil
}
func (router peerApi) UpdateStats(ctx context.Context, request *peer.StatsRequest) (*empty.Empty, error) {
router.peerMutex.Lock()
defer router.peerMutex.Unlock()
for resourceType, resourceStat := range request.ResourceStats {
resourceType := ResourceType(resourceType)
for _, nodeStat := range resourceStat.Stats {
if node, have := router.peers[nodeStat.Ordinal]; have {
resourceData := node.resources[resourceType]
resourceData.count = nodeStat.Count
node.resources[resourceType] = resourceData
}
}
router.rebalance(resourceType)
}
return &empty.Empty{}, nil
}
func (router peerApi) NextResource(ctx context.Context, request *peer.NextResourceRequest) (*peer.NextResourceResponse, error) {
resource, have := router.resources[ResourceType(request.ResourceType)]
if !have {
return &peer.NextResourceResponse{}, fmt.Errorf(errorResourceTypeNotFound, ResourceType(request.ResourceType))
}
resource.mutex.RLock()
defer resource.mutex.RUnlock()
migrateNext := ""
found := false
first, isOnlyDeviceToMigrate := true, false
if !request.ReadinessMax {
for deviceId := range resource.loaded {
// for every device that belongs on the other node
if BestNode(deviceId, router.ordinal, map[uint32]struct{}{request.Ordinal: {}}) == request.Ordinal {
// find the lowest device that's > other.readiness
if deviceId > string(request.Readiness) || (deviceId == string(request.Readiness) && !request.ReadyForEqual) {
found = true
if first {
first = false
migrateNext = deviceId
isOnlyDeviceToMigrate = true
} else {
isOnlyDeviceToMigrate = false
if deviceId < migrateNext {
migrateNext = deviceId
}
}
}
}
}
}
return &peer.NextResourceResponse{Has: found, ResourceId: []byte(migrateNext), Last: isOnlyDeviceToMigrate}, nil
}
func (router peerApi) UpdateReadiness(ctx context.Context, request *peer.ReadinessRequest) (*empty.Empty, error) {
router.peerMutex.Lock()
node := router.peers[request.Ordinal]
shuttingDownStateChanged := node.shuttingDown != request.ShuttingDown
// no matter what, set the shutting-down state
node.shuttingDown = request.ShuttingDown
// for each resource, verify that there will be at least one node with max readiness
for _, readiness := range request.Readiness {
resourceType := ResourceType(readiness.ResourceType)
node := node.resources[resourceType]
routerResource := router.resources[resourceType]
// if decreasing from max readiness
if node.readinessMax && !readiness.Max {
// complain if no other node exists with max readiness
if !routerResource.readinessMax || routerResource.decreasingFromMaxReadiness {
foundMax := false
for nodeId, otherNode := range router.peers {
if nodeId != request.Ordinal && otherNode.resources[resourceType].readinessMax {
foundMax = true
break
}
}
if !foundMax {
router.peerMutex.Unlock()
if shuttingDownStateChanged {
router.rebalanceAll()
}
return &empty.Empty{}, errors.New("no other nodes exist with max readiness")
}
}
}
}
// -- if this line is reached, we will accept the readiness change --
recalculateReadiness := make(map[ResourceType]struct{})
for _, requestReadiness := range request.Readiness {
resourceType := ResourceType(requestReadiness.ResourceType)
nodeResource := node.resources[resourceType]
if shuttingDownStateChanged || nodeResource.readiness != string(requestReadiness.Readiness) || nodeResource.readyForEqual != requestReadiness.ReadyForEqual || nodeResource.readinessMax != requestReadiness.Max {
recalculateReadiness[resourceType] = struct{}{}
}
nodeResource.readiness, nodeResource.readyForEqual, nodeResource.readinessMax = string(requestReadiness.Readiness), requestReadiness.ReadyForEqual, requestReadiness.Max
node.resources[resourceType] = nodeResource
}
router.peerMutex.Unlock()
for _, requestResource := range request.Readiness {
requestResourceType := ResourceType(requestResource.ResourceType)
routerResource := router.resources[requestResourceType]
routerResource.mutex.Lock()
originalDeviceCount := uint32(len(routerResource.loaded))
instancesToMove := make(map[string]*syncher)
for deviceId, device := range routerResource.loaded {
//for every device that belongs on the other node
if BestNode(deviceId, router.ordinal, map[uint32]struct{}{request.Ordinal: {}}) == request.Ordinal {
// if the other node is ready for this device
if deviceId < string(requestResource.Readiness) || (deviceId == string(requestResource.Readiness) && requestResource.ReadyForEqual) {
//release and notify that it's moved
instancesToMove[deviceId] = device
delete(routerResource.loaded, deviceId)
}
}
}
routerResource.mutex.Unlock()
router.migrateResources(requestResourceType, instancesToMove, originalDeviceCount)
if shuttingDownStateChanged {
router.rebalanceAll()
} else {
for resourceType := range recalculateReadiness {
router.rebalance(resourceType)
}
}
}
return &empty.Empty{}, nil
}
// Handoff is just basically hint to load a device, so we'll do normal loading/locking
func (router peerApi) Handoff(ctx context.Context, request *peer.HandoffRequest) (*empty.Empty, error) {
var peerUpdateComplete chan struct{}
if mutex, remoteHandler, forward, err := router.locate(ResourceType(request.ResourceType), string(request.ResourceId), func(resourceType ResourceType) {
peerUpdateComplete = router.resourceCountChangedWithUpdateForPeer(resourceType, request.Ordinal, request.ResourceCount)
}, router.loader.Load); err != nil {
return &empty.Empty{}, err
} else if forward {
return peer.NewPeerClient(remoteHandler).Handoff(ctx, request)
} else {
mutex.RUnlock()
if peerUpdateComplete != nil {
select {
case <-peerUpdateComplete:
case <-ctx.Done():
}
}
return &empty.Empty{}, nil
}
}