Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.

Commit e6d3139

Browse files
committed
refactor into more, smaller modules. Write Small Things.
1 parent 1a40d44 commit e6d3139

13 files changed

+633
-585
lines changed

cluster/acceptor.py

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from collections import defaultdict
2+
from protocol import Ballot
3+
from member import Component
4+
5+
6+
class Acceptor(Component):
7+
8+
def __init__(self, member):
9+
super(Acceptor, self).__init__(member)
10+
self.ballot_num = Ballot(-1, -1)
11+
self.accepted = defaultdict() # { (b,s) : p }
12+
13+
def do_PREPARE(self, scout_id, ballot_num): # p1a
14+
if ballot_num > self.ballot_num:
15+
self.ballot_num = ballot_num
16+
self.send([scout_id.address], 'PROMISE', # p1b
17+
scout_id=scout_id,
18+
acceptor=self.address,
19+
ballot_num=self.ballot_num,
20+
accepted=self.accepted)
21+
22+
def do_ACCEPT(self, commander_id, ballot_num, slot, proposal): # p2a
23+
if ballot_num >= self.ballot_num:
24+
self.ballot_num = ballot_num
25+
self.accepted[(ballot_num, slot)] = proposal
26+
self.send([commander_id.address], 'ACCEPTED', # p2b
27+
commander_id=commander_id,
28+
acceptor=self.address,
29+
ballot_num=self.ballot_num)

cluster/client.py

+31-51
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,50 @@
11
import sys
2-
import logging
3-
from deterministic_network import Node
2+
from member import Member
3+
from member import Component
44

55

6-
class Client(Node):
6+
# TODO: eliminate - this library doesn't have distinct client nodes
7+
class Client(Member):
78

8-
def __init__(self, members):
9-
super(Client, self).__init__()
10-
self.cid = 1000000
9+
def __init__(self, node):
10+
super(Client, self).__init__(node)
1111
self.current_request = None
1212

1313
def start(self):
1414
def re_invoke(n):
1515
self.invoke(n, lambda output: re_invoke(n+1))
16-
self.set_timer(1, lambda: re_invoke(1))
16+
self.node.set_timer(1, lambda: re_invoke(1))
1717

18-
def invoke(self, input, callback):
18+
def invoke(self, n, callback):
1919
assert self.current_request is None
20+
def done(output):
21+
self.current_request = None
22+
callback(output)
23+
self.current_request = Request(self, n, done)
24+
self.current_request.start()
25+
26+
27+
class Request(Component):
28+
29+
client_ids = xrange(1000000, sys.maxint).__iter__()
30+
RETRANSMIT_TIME = 0.1
31+
32+
def __init__(self, member, n, callback):
33+
super(Request, self).__init__(member)
34+
self.cid = self.client_ids.next()
35+
self.n = n
2036
self.output = None
21-
self.current_request = (self.cid, input, callback)
22-
self.cid += 1
23-
self.send_invoke()
24-
return self.output
37+
self.callback = callback
2538

26-
def send_invoke(self):
27-
cid, input, callback = self.current_request
28-
nodes = [k for k in self.core.nodes.keys() if k.startswith('Node-')]
29-
self.send([self.core.rnd.choice(nodes)], 'INVOKE',
30-
caller=self.address, cid=cid, input=input)
31-
self.invoke_timer = self.set_timer(3, self.send_invoke)
39+
def start(self):
40+
self.send([self.member.node.network.rnd.choice(self.member.node.network.nodes.keys())], 'INVOKE',
41+
caller=self.address, cid=self.cid, input=self.n)
42+
self.invoke_timer = self.set_timer(self.RETRANSMIT_TIME, self.start)
3243

3344
def do_INVOKED(self, cid, output):
34-
if not self.current_request or cid != self.current_request[0]:
45+
if cid != self.cid:
3546
return
3647
self.logger.debug("received output %r" % (output,))
37-
callback = self.current_request[2]
38-
self.current_request = None
3948
self.cancel_timer(self.invoke_timer)
40-
callback(output)
41-
42-
if __name__ == "__main__":
43-
logging.basicConfig(
44-
format="%(asctime)s %(name)s %(message)s", level=logging.DEBUG)
45-
client = Client(sys.argv[1])
46-
print client.invoke(4)
47-
print client.invoke(1)
48-
49-
# tests
50-
51-
import unittest
52-
import threading
53-
54-
55-
class FakeMember(Node):
56-
57-
def do_INVOKE(self, caller, cid, input):
58-
self.send([caller], 'INVOKED', cid=cid, output=input * 10)
49+
self.callback(output)
5950
self.stop()
60-
61-
62-
class ClientTests(unittest.TestCase):
63-
64-
def test_invoke(self):
65-
member = FakeMember()
66-
client = Client(member.address)
67-
memberthd = threading.Thread(target=member.run)
68-
memberthd.start()
69-
self.assertEqual(client.invoke(5), 50)
70-
memberthd.join()

cluster/commander.py

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from protocol import CommanderId
2+
from member import Component
3+
4+
5+
class Commander(Component):
6+
7+
def __init__(self, member, leader, ballot_num, slot, proposal):
8+
super(Commander, self).__init__(member)
9+
self.leader = leader
10+
self.ballot_num = ballot_num
11+
self.slot = slot
12+
self.proposal = proposal
13+
self.commander_id = CommanderId(self.address, slot, proposal)
14+
self.accepted = set([])
15+
self.peers = member.peers # TODO: pass this in
16+
self.quorum = len(self.peers) / 2 + 1
17+
18+
def start(self):
19+
self.send(self.peers, 'ACCEPT', # p2a
20+
commander_id=self.commander_id,
21+
ballot_num=self.ballot_num,
22+
slot=self.slot,
23+
proposal=self.proposal)
24+
25+
def finished(self, ballot_num, preempted):
26+
self.leader.commander_finished(self.commander_id, ballot_num, preempted)
27+
self.stop()
28+
29+
def do_ACCEPTED(self, commander_id, acceptor, ballot_num): # p2b
30+
if commander_id != self.commander_id:
31+
return
32+
if ballot_num == self.ballot_num:
33+
self.accepted.add(acceptor)
34+
if len(self.accepted) < self.quorum:
35+
return
36+
self.send(self.peers, 'DECISION',
37+
slot=self.slot,
38+
proposal=self.proposal)
39+
self.finished(ballot_num, False)
40+
else:
41+
self.finished(ballot_num, True)

cluster/deterministic_network.py

+37-30
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import uuid
21
import time
32
import logging
43
import heapq
@@ -8,38 +7,48 @@ class Node(object):
87

98
unique_ids = xrange(1000).__iter__()
109

11-
def __init__(self):
10+
def __init__(self, network):
11+
self.network = network
1212
self.unique_id = self.unique_ids.next()
13-
14-
def set_up_node(self, address, core):
15-
self.core = core
16-
self.address = address
17-
self.core.nodes[self.address] = self
18-
self.logger = logging.getLogger('node.%s' % (self.address,))
13+
self.address = 'N%d' % self.unique_id
14+
self.components = []
15+
self.logger = logging.getLogger(self.address)
1916
self.logger.info('starting')
2017

21-
def stop(self):
22-
self.logger.error('STOPPING')
23-
if self.address in self.core.nodes:
24-
del self.core.nodes[self.address]
25-
26-
def start(self):
27-
pass
18+
def kill(self):
19+
self.logger.error('node dying')
20+
if self.address in self.network.nodes:
21+
del self.network.nodes[self.address]
2822

2923
def set_timer(self, seconds, callable):
30-
# TODO: refactor so this won't call a stopped node
31-
return self.core.set_timer(seconds, self.address, callable)
24+
return self.network.set_timer(seconds, self.address, callable)
3225

3326
def cancel_timer(self, timer):
34-
self.core.cancel_timer(timer)
27+
self.network.cancel_timer(timer)
3528

3629
def send(self, destinations, action, **kwargs):
3730
self.logger.debug("sending %s with args %s to %s" %
3831
(action, kwargs, destinations))
39-
self.core.send(destinations, action, **kwargs)
32+
self.network.send(destinations, action, **kwargs)
33+
34+
def register(self, component):
35+
self.components.append(component)
36+
37+
def unregister(self, component):
38+
self.components.remove(component)
4039

40+
def receive(self, action, kwargs):
41+
import sys
42+
for comp in self.components[:]:
43+
try:
44+
fn = getattr(comp, 'do_%s' % action)
45+
except AttributeError:
46+
continue
47+
comp.logger.debug("received %r with args %r" % (action, kwargs))
48+
fn(**kwargs)
4149

42-
class Core(object):
50+
51+
class Network(object):
4352

4453
PROP_DELAY = 0.03
4554
PROP_JITTER = 0.02
@@ -50,11 +59,14 @@ def __init__(self, seed, pause=False):
5059
self.pause = pause
5160
self.timers = []
5261
self.now = 1000.0
53-
self.logger = logging.getLogger('core')
62+
self.logger = logging.getLogger('network')
63+
64+
def new_node(self):
65+
node = Node(self)
66+
self.nodes[node.address] = node
67+
return node
5468

5569
def run(self):
56-
for node in sorted(self.nodes.values()):
57-
node.start()
5870
while self.timers:
5971
next_timer = self.timers[0][0]
6072
if next_timer > self.now:
@@ -71,6 +83,7 @@ def stop(self):
7183
self.timers = []
7284

7385
def set_timer(self, seconds, address, callable):
86+
# TODO: return an obj with 'cancel'
7487
timer = [self.now + seconds, True, address, callable]
7588
heapq.heappush(self.timers, timer)
7689
return timer
@@ -83,13 +96,7 @@ def _receive(self, address, action, kwargs):
8396
node = self.nodes[address]
8497
except KeyError:
8598
return
86-
try:
87-
fn = getattr(node, 'do_%s' % action)
88-
except AttributeError:
89-
return
90-
91-
node.logger.debug("received %r with args %r" % (action, kwargs))
92-
fn(**kwargs)
99+
node.receive(action, kwargs)
93100

94101
def send(self, destinations, action, **kwargs):
95102
for dest in destinations:

cluster/leader.py

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
from util import defaultlist, view_primary
2+
from protocol import Ballot
3+
from member import Component
4+
from scout import Scout
5+
from commander import Commander
6+
7+
class Leader(Component):
8+
9+
HEARTBEAT_INTERVAL = 1
10+
11+
def __init__(self, member, unique_id, commander_cls=Commander, scout_cls=Scout):
12+
super(Leader, self).__init__(member)
13+
self.ballot_num = Ballot(0, unique_id)
14+
self.active = False
15+
self.proposals = defaultlist()
16+
self.commander_cls = commander_cls
17+
self.commanders = {}
18+
self.scout_cls = scout_cls
19+
self.scout = None
20+
21+
def view_change(self, viewchange):
22+
is_primary = view_primary(viewchange.viewid, viewchange.peers) == self.address
23+
self.is_primary = is_primary
24+
if is_primary:
25+
if not self.scout and not self.active:
26+
self.spawn_scout()
27+
else:
28+
if self.scout:
29+
self.scout.finished(False, None)
30+
# .. which eventually calls self.preempted
31+
elif self.active:
32+
self.preempted(None)
33+
34+
def spawn_scout(self):
35+
assert not self.scout
36+
sct = self.scout = self.scout_cls(self.member, self, self.ballot_num)
37+
sct.start()
38+
39+
def scout_finished(self, adopted, ballot_num, pvals):
40+
self.scout = None
41+
if adopted:
42+
# pvals is a defaultlist of (slot, proposal) by ballot num; we need the
43+
# highest ballot number for each slot. TODO: this is super
44+
# inefficient!
45+
last_by_slot = defaultlist()
46+
for b, s in reversed(sorted(pvals.keys())):
47+
p = pvals[b, s]
48+
if last_by_slot[s] is None:
49+
last_by_slot[s] = p
50+
for s, p in enumerate(last_by_slot):
51+
if p is not None:
52+
self.proposals[s] = p
53+
for s, p in enumerate(self.proposals):
54+
if p is not None:
55+
self.spawn_commander(ballot_num, s, p)
56+
self.logger.info("leader becoming active")
57+
self.active = True
58+
else:
59+
self.preempted(ballot_num)
60+
61+
def preempted(self, ballot_num):
62+
# ballot_num is None when we are preempted by a view change
63+
if ballot_num:
64+
self.logger.info("leader preempted by %s" % (ballot_num.leader,))
65+
else:
66+
self.logger.info("leader preempted by view change")
67+
self.active = False
68+
self.ballot_num = Ballot(
69+
(ballot_num if ballot_num else self.ballot_num).n + 1, self.unique_id)
70+
# if we're the primary for this view, re-scout immediately
71+
if not self.scout and self.is_primary:
72+
self.logger.info("re-scouting as the primary for this view")
73+
self.spawn_scout()
74+
75+
def spawn_commander(self, ballot_num, slot, proposal):
76+
cmd = self.commander_cls(self.member, self, ballot_num, slot, proposal)
77+
if cmd.commander_id in self.commanders:
78+
return
79+
print "set", cmd.commander_id
80+
self.commanders[cmd.commander_id] = cmd
81+
cmd.start()
82+
83+
def commander_finished(self, commander_id, ballot_num, preempted):
84+
print "del", commander_id
85+
del self.commanders[commander_id]
86+
if preempted:
87+
self.preempted(ballot_num)
88+
89+
def do_PROPOSE(self, slot, proposal):
90+
if self.proposals[slot] is None:
91+
if self.active:
92+
self.proposals[slot] = proposal
93+
self.spawn_commander(self.ballot_num, slot, proposal)
94+
else:
95+
if not self.scout:
96+
self.logger.warning(
97+
"got PROPOSE when not active - scouting")
98+
self.spawn_scout()
99+

0 commit comments

Comments
 (0)