Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.

Commit cdb32fa

Browse files
committed
switch to inter-component events for better isolation
1 parent cfef0d6 commit cdb32fa

File tree

6 files changed

+47
-26
lines changed

6 files changed

+47
-26
lines changed

cluster/commander.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@
44

55
class Commander(Component):
66

7-
def __init__(self, member, leader, ballot_num, slot, proposal):
7+
def __init__(self, member, leader, ballot_num, slot, proposal, peers):
88
super(Commander, self).__init__(member)
99
self.leader = leader
1010
self.ballot_num = ballot_num
1111
self.slot = slot
1212
self.proposal = proposal
1313
self.commander_id = CommanderId(self.address, slot, proposal)
1414
self.accepted = set([])
15-
self.peers = member.peers # TODO: pass this in
16-
self.quorum = len(self.peers) / 2 + 1
15+
self.peers = peers
16+
self.quorum = len(peers) / 2 + 1
1717

1818
def start(self):
1919
self.send(self.peers, 'ACCEPT', # p2a

cluster/leader.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ def __init__(self, member, unique_id, commander_cls=Commander, scout_cls=Scout):
1717
self.commanders = {}
1818
self.scout_cls = scout_cls
1919
self.scout = None
20+
self.peers = None
2021

21-
def view_change(self, viewchange):
22+
def on_view_change_event(self, viewchange):
23+
self.peers = viewchange.peers
2224
is_primary = view_primary(viewchange.viewid, viewchange.peers) == self.address
2325
self.is_primary = is_primary
2426
if is_primary:
@@ -33,7 +35,7 @@ def view_change(self, viewchange):
3335

3436
def spawn_scout(self):
3537
assert not self.scout
36-
sct = self.scout = self.scout_cls(self.member, self, self.ballot_num)
38+
sct = self.scout = self.scout_cls(self.member, self, self.ballot_num, self.peers)
3739
sct.start()
3840

3941
def scout_finished(self, adopted, ballot_num, pvals):
@@ -73,7 +75,7 @@ def preempted(self, ballot_num):
7375
self.spawn_scout()
7476

7577
def spawn_commander(self, ballot_num, slot, proposal):
76-
cmd = self.commander_cls(self.member, self, ballot_num, slot, proposal)
78+
cmd = self.commander_cls(self.member, self, ballot_num, slot, proposal, self.peers)
7779
if cmd.commander_id in self.commanders:
7880
return
7981
print "set", cmd.commander_id

cluster/member.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,34 @@ class Member(object): # TODO: rename
66
def __init__(self, node):
77
self.node = node
88
self.address = self.node.address
9+
self.components = []
10+
11+
def register(self, component):
12+
self.components.append(component)
13+
self.node.register(component)
14+
15+
def unregister(self, component):
16+
self.components.remove(component)
17+
self.node.unregister(component)
18+
19+
def event(self, message, **kwargs):
20+
method = 'on_' + message + '_event'
21+
for comp in self.components:
22+
if hasattr(comp, method):
23+
getattr(comp, method)(**kwargs)
924

1025

1126
class Component(object): # TODO: rename
1227

1328
def __init__(self, member):
1429
self.member = member
15-
self.member.node.register(self)
30+
self.member.register(self)
1631
self.address = member.address
1732
self.logger = logging.getLogger("%s.%s" % (self.address, self.__class__.__name__))
1833

34+
def event(self, message, **kwargs):
35+
self.member.event(message, **kwargs)
36+
1937
def send(self, destinations, action, **kwargs):
2038
self.member.node.send(destinations, action, **kwargs)
2139

@@ -27,4 +45,4 @@ def cancel_timer(self, timer):
2745
self.member.node.cancel_timer(timer)
2846

2947
def stop(self):
30-
self.member.node.unregister(self)
48+
self.member.unregister(self)

cluster/member_replicated.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ def __init__(self, node, execute_fn, peers,
2121
def start(self):
2222
self.replica.start()
2323

24-
def view_change(self, viewchange):
25-
self.peers = viewchange.peers
26-
self.leader.view_change(viewchange)
27-
2824

2925
class ClusterSeed(Member):
3026

cluster/replica.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@ def __init__(self, member, execute_fn, peers):
2222
self.proposals = defaultlist()
2323
self.decisions = defaultlist()
2424
self.viewid = 0
25+
self.peers = peers
2526
self.last_heard_from = {}
2627
self.lost_peer_proposal = None
2728

2829
def start(self):
2930
"Try to join the cluster"
3031
if not self.ready:
3132
# TODO: do something more deterministic
32-
self.send([self.member.node.network.rnd.choice(self.member.peers)], 'JOIN',
33+
self.send([self.member.node.network.rnd.choice(self.peers)], 'JOIN',
3334
requester=self.address)
3435
self.set_timer(self.JOIN_RETRANSMIT, self.start)
3536

@@ -49,7 +50,7 @@ def invoke(self, proposal):
4950
self.viewid = viewchange.viewid
5051
self.send(
5152
list(set(viewchange.peers) -
52-
set(self.member.peers)), 'WELCOME',
53+
set(self.peers)), 'WELCOME',
5354
state=self.state,
5455
slot_num=self.slot_num,
5556
decisions=self.decisions,
@@ -58,7 +59,7 @@ def invoke(self, proposal):
5859
if self.address not in viewchange.peers:
5960
self.stop()
6061
return
61-
self.member.view_change(viewchange)
62+
self.event('view_change', viewchange=viewchange)
6263
else:
6364
self.logger.info(
6465
"ignored out-of-sequence view change operation")
@@ -69,8 +70,8 @@ def propose(self, proposal, slot=None):
6970
self.next_slot += 1
7071
self.proposals[slot] = proposal
7172
# find a leader we think is working, deterministically
72-
leaders = [view_primary(self.viewid, self.member.peers)] + \
73-
list(self.member.peers)
73+
leaders = [view_primary(self.viewid, self.peers)] + \
74+
list(self.peers)
7475
for leader in leaders:
7576
# TODO: better way to get current time
7677
if self.last_heard_from.get(leader, 0) > self.member.node.network.now - 2 * self.HEARTBEAT_INTERVAL:
@@ -91,8 +92,8 @@ def repropose(self):
9192

9293
def heartbeat(self):
9394
"send and monitor heartbeats"
94-
self.send(self.member.peers, 'HEARTBEAT', sender=self.address)
95-
for peer in self.member.peers:
95+
self.send(self.peers, 'HEARTBEAT', sender=self.address)
96+
for peer in self.peers:
9697
if peer == self.address or peer not in self.last_heard_from:
9798
continue
9899
if self.last_heard_from[peer] < self.member.node.network.now - 2 * self.HEARTBEAT_INTERVAL:
@@ -108,7 +109,7 @@ def lost_peer(self, peer):
108109
self.lost_peer_proposal = Proposal(None, None,
109110
ViewChange(
110111
self.viewid + 1,
111-
tuple(sorted(set(self.member.peers) - set([peer])))))
112+
tuple(sorted(set(self.peers) - set([peer])))))
112113
self.propose(self.lost_peer_proposal)
113114

114115
def do_HEARTBEAT(self, sender):
@@ -157,10 +158,10 @@ def do_DECISION(self, slot, proposal):
157158
cid=decided_proposal.cid, output=output)
158159

159160
def do_JOIN(self, requester):
160-
if self.ready and requester not in self.member.peers:
161+
if self.ready and requester not in self.peers:
161162
self.last_heard_from[requester] = self.member.node.network.now
162163
viewchange = ViewChange(self.viewid + 1,
163-
tuple(sorted(set(self.member.peers) | set([requester]))))
164+
tuple(sorted(set(self.peers) | set([requester]))))
164165
self.propose(Proposal(None, None, viewchange))
165166

166167
def do_WELCOME(self, state, slot_num, decisions, viewid, peers):
@@ -172,6 +173,9 @@ def do_WELCOME(self, state, slot_num, decisions, viewid, peers):
172173
self.decisions = defaultlist(decisions)
173174
self.viewid = viewid
174175
self.last_heard_from = {}
175-
self.member.view_change(ViewChange(viewid, peers)) # TODO: WELCOME should include a ViewChange
176+
self.event('view_change', viewchange=ViewChange(viewid, peers)) # TODO: WELCOME should include a ViewChange
176177
self.heartbeat()
177178
self.repropose()
179+
180+
def on_view_change_event(self, viewchange):
181+
self.peers = viewchange.peers

cluster/scout.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,23 @@ class Scout(Component):
66

77
PREPARE_RETRANSMIT = 1
88

9-
def __init__(self, member, leader, ballot_num):
9+
def __init__(self, member, leader, ballot_num, peers):
1010
super(Scout, self).__init__(member)
1111
self.leader = leader
1212
self.scout_id = ScoutId(self.address, ballot_num)
1313
self.ballot_num = ballot_num
1414
self.pvals = defaultdict()
1515
self.accepted = set([])
16-
self.quorum = len(member.peers) / 2 + 1
16+
self.peers = peers
17+
self.quorum = len(peers) / 2 + 1
1718
self.retransmit_timer = None
1819

1920
def start(self):
2021
self.logger.info("scout starting")
2122
self.send_prepare()
2223

2324
def send_prepare(self):
24-
self.send(self.member.peers, 'PREPARE', # p1a
25+
self.send(self.peers, 'PREPARE', # p1a
2526
scout_id=self.scout_id,
2627
ballot_num=self.ballot_num)
2728
self.retransmit_timer = self.set_timer(

0 commit comments

Comments
 (0)