Skip to content

Commit

Permalink
First commit.
Browse files Browse the repository at this point in the history
  • Loading branch information
Atlas-Ma committed Mar 18, 2019
1 parent 1562c46 commit 414e7de
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 1 deletion.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.idea

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
38 changes: 38 additions & 0 deletions DHT.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# coding: utf-8

import logging
import time
from DHT_Node import DHT_Node


# configure the log with DEBUG level
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M:%S')


def main(number_nodes):
# logger for the main
logger = logging.getLogger('DHT')
# list with all the nodes
dht = []
# initial node on DHT
node = DHT_Node(('localhost', 5000))
node.start()
dht.append(node)
logger.info(node)

for i in range(number_nodes-1):
node = DHT_Node(('localhost', 5001+i), ('localhost', 5000))
node.start()
dht.append(node)
logger.info(node)

# Await for DHT to get stable
time.sleep(10)

for node in dht:
node.join()

if __name__ == '__main__':
main(3)
33 changes: 33 additions & 0 deletions DHT_Client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# coding: utf-8

import socket
import pickle
import logging


class DHT_Client():
def __init__(self, address):
self.dht_addr = address
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.logger = logging.getLogger('DHT_Client')

def put(self, key, value):
msg = {'method': 'PUT', 'args':{'key':key, 'value': value}}
p = pickle.dumps(msg)
self.socket.sendto(p, self.dht_addr)
p, addr = self.socket.recvfrom(1024)
o = pickle.loads(p)
if o['method'] != 'ACK':
self.logger.error('Invalid msg: %s', o)

def get(self, key):
msg = {'method': 'GET', 'args': {'key': key}}
p = pickle.dumps(msg)
self.socket.sendto(p, self.dht_addr)
p, addr = self.socket.recvfrom(1024)
o = pickle.loads(p)
if o['method'] != 'ACK':
self.logger.error('Invalid msg: %s', o)
return None
else:
return o['args']
148 changes: 148 additions & 0 deletions DHT_Node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# coding: utf-8

import socket
import threading
import logging
import pickle
from utils import dht_hash, contains_predecessor, contains_successor


class DHT_Node(threading.Thread):
def __init__(self, address, dht_address=None, timeout=3):
threading.Thread.__init__(self)
self.id = dht_hash(address.__str__())
self.addr = address
self.dht_address = dht_address
if dht_address is None:
self.successor_id = self.id
self.successor_addr = address
self.predecessor_id = None
self.predecessor_addr = None
self.inside_dht = True
else:
self.inside_dht = False
self.successor_id = None
self.successor_addr = None
self.predecessor_id = None
self.predecessor_addr = None
self.keystore = {}
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.settimeout(timeout)
self.logger = logging.getLogger("Node {}".format(self.id))

def send(self, address, o):
p = pickle.dumps(o)
self.socket.sendto(p, address)

def recv(self):
try:
p, addr = self.socket.recvfrom(1024)
except socket.timeout:
return None, None
else:
if len(p) == 0:
return None, addr
else:
return p, addr

def node_join(self, args):
self.logger.debug('Node join: %s', args)
addr = args['addr']
identification = args['id']
if self.id == self.successor_id:
self.successor_id = identification
self.successor_addr = addr
args = {'successor_id': self.id, 'successor_addr': self.addr}
self.send(addr, {'method': 'JOIN_REP', 'args': args})
elif contains_successor(self.id, self.successor_id, identification):
args = {'successor_id': self.successor_id, 'successor_addr': self.successor_addr}
self.successor_id = identification
self.successor_addr = addr
self.send(addr, {'method': 'JOIN_REP', 'args': args})
else:
self.logger.debug('Find Successor(%d)', args['id'])
self.send(self.successor_addr, {'method': 'JOIN_REQ', 'args':args})
self.logger.info(self)

def notify(self, args):
self.logger.debug('Notify: %s', args)
if self.predecessor_id is None or contains_predecessor(self.id, self.predecessor_id, args['predecessor_id']):
self.predecessor_id = args['predecessor_id']
self.predecessor_addr = args['predecessor_addr']
self.logger.info(self)

def stabilize(self, x, addr):
self.logger.debug('Stabilize: %s %s', x, addr)
if x is not None and contains_successor(self.id, self.successor_id, x):
self.successor_id = x
self.successor_addr = addr
args = {'predecessor_id': self.id, 'predecessor_addr': self.addr}
self.send(self.successor_addr, {'method': 'NOTIFY', 'args':args})

def put(self, key, value, address):
key_hash = dht_hash(key)
self.logger.debug('Put: %s %s', key, key_hash)
if self.id < key_hash <= self.successor_id:
self.keystore[key] = value
self.send(address, {'method': 'ACK'})
else:
# send to DHT
# Fill here
self.send(address, {'method': 'NACK'})

def get(self, key, address):
key_hash = dht_hash(key)
self.logger.debug('Get: %s %s', key, key_hash)
if self.id < key_hash <= self.successor_id:
value = self.keystore[key]
self.send(address, {'method': 'ACK', 'args': value})
else:
# send to DHT
# Fill here
self.send(address, {'method': 'NACK'})

def run(self):
self.socket.bind(self.addr)

while not self.inside_dht:
o = {'method': 'JOIN_REQ', 'args': {'addr':self.addr, 'id':self.id}}
self.send(self.dht_address, o)
p, addr = self.recv()
if p is not None:
o = pickle.loads(p)
self.logger.debug('O: %s', o)
if o['method'] == 'JOIN_REP':
args = o['args']
self.successor_id = args['successor_id']
self.successor_addr = args['successor_addr']
self.inside_dht = True
self.logger.info(self)

done = False
while not done:
p, addr = self.recv()
if p is not None:
o = pickle.loads(p)
self.logger.info('O: %s', o)
if o['method'] == 'JOIN_REQ':
self.node_join(o['args'])
elif o['method'] == 'NOTIFY':
self.notify(o['args'])
elif o['method'] == 'PUT':
self.put(o['args']['key'], o['args']['value'], addr)
elif o['method'] == 'GET':
self.get(o['args']['key'], addr)
elif o['method'] == 'PREDECESSOR':
self.send(addr, {'method': 'STABILIZE', 'args': self.predecessor_id})
elif o['method'] == 'STABILIZE':
self.stabilize(o['args'], addr)
else:
# Ask for predecessor to start the stabilize process
self.send(self.successor_addr, {'method': 'PREDECESSOR'})

def __str__(self):
return 'Node ID: {}; DHT: {}; Successor: {}; Predecessor: {}'\
.format(self.id, self.inside_dht, self.successor_id, self.predecessor_id)

def __repr__(self):
return self.__str__()
29 changes: 28 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,28 @@
# chord
# CHORD (DHT)

This repository implement a simple version of the [CHORD](https://en.wikipedia.org/wiki/Chord_(peer-to-peer)) algorithm.
The provided code already setups the ring network properly.
1. Supports Node Joins
2. Finds the correct successor for a node
3. Run Stabilize periodically to correct the network


## Running the example
Run in two different terminal:

DHT (setups a CHORD DHT):
```console
$ python3 DHT.py
```
example (put and get objects from the DHT):
```console
$ python3 example.py
```

## Authors

* **Mário Antunes** - [mariolpantunes](https://github.com/mariolpantunes)

## License

This project is licensed under the MIT License - see the [LICENSE.md](LICENSE.md) file for details
25 changes: 25 additions & 0 deletions example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# coding: utf-8

import logging
from DHT_Client import DHT_Client

# configure the log with DEBUG level
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M:%S')

def main():
client = DHT_Client(('localhost', 5000))
# add object to DHT (this key is in first node -> local search)
client.put('1', [0, 1, 2])
# retrieve from DHT (this key is in first node -> local search)
print(client.get('1'))

# add object to DHT (this key is not on the first node -> remote search)
client.put('2', ('xpto'))
# retrieve from DHT (this key is not on the first node -> remote search)
print(client.get('2'))


if __name__ == '__main__':
main()
29 changes: 29 additions & 0 deletions utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# coding: utf-8


# FNV-1a Hash Function
def dht_hash(text, seed=0, maximum=1024):
FNV_prime = 16777619
offset_basis = 2166136261
h = offset_basis + seed
for char in text:
h = h ^ ord(char)
h = h * FNV_prime
return h % maximum


def contains_predecessor(identification, predecessor, node):
if predecessor < node < identification:
return True
elif node > predecessor > identification:
return True
return False


def contains_successor(identification, successor, node):
if identification < node <= successor:
return True
elif successor < identification and (node > identification or node < successor):
return True
return False

0 comments on commit 414e7de

Please sign in to comment.