Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.

Commit 466384a

Browse files
committed
rewrite to use MultiPaxos
* add deterministic networking * add run.py to run three cluster members and a client This is still incredibly inefficient: * leaders battle it out for leadership on every invocation * message sizes are unbounded * the size of local state is unbounded
1 parent d37b93c commit 466384a

8 files changed

+495
-231
lines changed

cluster/README.txt

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ that can also feed clients information on cluster membership.
77

88
* statemachine.py -- illustrates the state machine client API
99
* network.py -- handles network communication
10+
* deterministic_network.py -- similar, but deterministic
1011
* client.py -- a simple client
1112
* member_single.py -- a non-clustered member server, talking to the client
1213
* member_replicated.py -- a replicated state machine with consensus and a fixed membership
14+
This is based on "Paxos Made Moderately Complex" (van Renesse, 2011)

cluster/client.py

+31-10
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,46 @@
1+
import time
12
import sys
23
import logging
3-
from network import Node
4+
from deterministic_network import Node
45

56

67
class Client(Node):
78

89
def __init__(self, member_address):
910
super(Client, self).__init__()
1011
self.member_address = member_address
12+
self.cid = 1000000
13+
self.current_request = None
1114

12-
def invoke(self, input):
15+
def start(self):
16+
def re_invoke(n):
17+
if n < 10:
18+
self.invoke(n, lambda output: re_invoke(n+1))
19+
self.set_timer(1, lambda: re_invoke(1))
20+
21+
def invoke(self, input, callback):
22+
assert self.current_request is None
1323
self.output = None
14-
self.send([self.member_address], 'INVOKE',
15-
input=input, caller=self.address)
16-
self.run()
24+
self.current_request = (self.cid, input, callback)
25+
self.cid += 1
26+
self.send_invoke()
1727
return self.output
1828

19-
def do_INVOKED(self, output):
29+
def send_invoke(self):
30+
cid, input, callback = self.current_request
31+
self.send([self.member_address], 'INVOKE',
32+
caller=self.address, cid=cid, input=input)
33+
self.invoke_timer = self.set_timer(3, self.send_invoke)
34+
35+
def do_INVOKED(self, cid, output):
36+
if not self.current_request or cid != self.current_request[0]:
37+
return
38+
print "GOT", cid, output
2039
self.logger.debug("received output %r" % (output,))
21-
self.output = output
22-
self.stop()
40+
callback = self.current_request[2]
41+
self.current_request = None
42+
self.cancel_timer(self.invoke_timer)
43+
callback(output)
2344

2445
if __name__ == "__main__":
2546
logging.basicConfig(
@@ -36,8 +57,8 @@ def do_INVOKED(self, output):
3657

3758
class FakeMember(Node):
3859

39-
def do_INVOKE(self, caller, input):
40-
self.send([caller], 'INVOKED', output=input * 10)
60+
def do_INVOKE(self, caller, cid, input):
61+
self.send([caller], 'INVOKED', cid=cid, output=input * 10)
4162
self.stop()
4263

4364

cluster/deterministic_network.py

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import uuid
2+
import logging
3+
import heapq
4+
import random
5+
6+
class Node(object):
7+
8+
unique_ids = xrange(1000).__iter__()
9+
10+
def __init__(self):
11+
self.unique_id = self.unique_ids.next()
12+
13+
def set_up_node(self, address, core):
14+
self.core = core
15+
self.address = address
16+
self.core.nodes[self.address] = self
17+
self.logger = logging.getLogger('node.%s' % (self.address,))
18+
19+
def start(self):
20+
pass
21+
22+
def set_timer(self, seconds, callable):
23+
return self.core.set_timer(seconds, callable)
24+
25+
def cancel_timer(self, timer):
26+
self.core.cancel_timer(timer)
27+
28+
def send(self, destinations, action, **kwargs):
29+
self.logger.debug("sending %s with args %s to %s" %
30+
(action, kwargs, destinations))
31+
self.core.send(destinations, action, **kwargs)
32+
33+
def receive(self, action, kwargs):
34+
self.logger.debug("received %r with args %r" % (action, kwargs))
35+
getattr(self, 'do_%s' % action)(**kwargs)
36+
37+
38+
class Core(object):
39+
40+
PROP_DELAY = 0.03
41+
PROP_JITTER = 0.02
42+
43+
def __init__(self, seed):
44+
self.nodes = {}
45+
self.rnd = random.Random(seed)
46+
self.timers = []
47+
self.now = 0.0
48+
self.logger = logging.getLogger('core')
49+
50+
def run(self):
51+
for node in sorted(self.nodes.values()):
52+
node.start()
53+
while self.timers:
54+
next_timer = self.timers[0][0]
55+
if next_timer > self.now:
56+
self.now = next_timer
57+
when, do, callable = heapq.heappop(self.timers)
58+
if do:
59+
callable()
60+
61+
def stop(self):
62+
self.timers = []
63+
64+
def set_timer(self, seconds, callable):
65+
timer = [self.now + seconds, True, callable]
66+
heapq.heappush(self.timers, timer)
67+
return timer
68+
69+
def cancel_timer(self, timer):
70+
timer[1] = False
71+
72+
def send(self, destinations, action, **kwargs):
73+
for dest in destinations:
74+
node = self.nodes[dest]
75+
delay = self.PROP_DELAY + self.rnd.uniform(-self.PROP_JITTER, self.PROP_JITTER)
76+
self.set_timer(delay, lambda node=node: node.receive(action, kwargs))
77+

0 commit comments

Comments
 (0)