-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapi.go
113 lines (96 loc) · 3.77 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package router
import (
"context"
"github.com/kent-h/stateful-router/protos/peer"
"google.golang.org/grpc"
"time"
)
const errorResourceTypeNotFound = "the resource type %d does not exist"
type Loader interface {
Load(ctx context.Context, resourceType ResourceType, resourceId string) error
Unload(resourceType ResourceType, resourceId string)
}
func New(server *grpc.Server, ordinal uint32, peerDNSFormat string, loader Loader, readyCallback func(), resourceTypes ...ResourceType) *Router {
ctx, ctxCancelFunc := context.WithCancel(context.Background())
handoffAndShutdown := make(chan struct{})
router := &Router{
ordinal: ordinal,
peerDNSFormat: peerDNSFormat,
handoffAndShutdown: handoffAndShutdown,
ctx: ctx,
peers: make(map[uint32]*node),
resources: make(map[ResourceType]*resourceData),
resourceCountEventData: resourceCountEventData{
updateComplete: make(chan struct{}),
resources: make(map[ResourceType]map[uint32]uint32),
},
rebalanceEventData: rebalanceEventData{
newPeers: make(map[peer.PeerClient]struct{}),
resourceTypes: make(map[ResourceType]struct{}),
},
statusNotifierDone: make(chan struct{}),
rebalancerDone: make(chan struct{}),
loader: loader,
}
// create data structure for each resource type
if len(resourceTypes) == 0 {
panic("at least one resources type must be provided for routing")
}
for _, resourceType := range resourceTypes {
router.resources[resourceType] = &resourceData{
loaded: make(map[string]*syncher),
}
}
// connect to all nodes with smaller ordinal than ours
for i := uint32(0); i < router.ordinal; i++ {
router.connect(i)
}
go router.startStatsNotifier()
// node will only start accepting requests after rebalancer is started (default readiness allows no requests)
time.AfterFunc(waitReadyTime, func() {
if readyCallback != nil {
readyCallback()
}
router.startRebalancer(router.handoffAndShutdown, ctxCancelFunc)
})
peer.RegisterPeerServer(server, peerApi{router})
return router
}
func (router *Router) Stop() {
router.eventMutex.Lock()
if router.handoffAndShutdown != nil {
close(router.handoffAndShutdown)
router.handoffAndShutdown = nil
}
router.eventMutex.Unlock()
<-router.ctx.Done()
<-router.statusNotifierDone
}
// this should be called when a device has been, or should be,
// unloaded **from the local node only** due to external circumstance
// (device lock lost, device deleted, inactivity timeout, etc.)
func (router *Router) UnloadDevice(resourceType ResourceType, key string) {
router.UnloadDeviceWithUnloadFunc(resourceType, key, router.loader.Unload)
}
func (router *Router) UnloadDeviceWithUnloadFunc(resourceType ResourceType, resourceId string, unloadFunc func(resourceType ResourceType, resourceId string)) {
resource := router.resources[resourceType]
resource.mutex.Lock()
sync, have := resource.loaded[resourceId]
if have {
delete(resource.loaded, resourceId)
}
resource.mutex.Unlock()
if have {
router.resourceCountChanged(resourceType)
router.unloadResource(resourceType, resourceId, sync, unloadFunc)
}
}
// Locate returns a processor for the given device,
// to either handle the request locally,
// or forward it on to the appropriate peer
func (router *Router) Locate(resourceType ResourceType, resourceId string) (interface{ RUnlock() }, *grpc.ClientConn, bool, error) {
return router.locate(resourceType, resourceId, router.resourceCountChanged, router.loader.Load)
}
func (router *Router) LocateWithLoadFunc(resourceType ResourceType, resourceId string, loadFunc func(ctx context.Context, resourceType ResourceType, deviceId string) error) (interface{ RUnlock() }, *grpc.ClientConn, bool, error) {
return router.locate(resourceType, resourceId, router.resourceCountChanged, loadFunc)
}