-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathDHT_Node.py
148 lines (133 loc) · 5.62 KB
/
DHT_Node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# coding: utf-8
import socket
import threading
import logging
import pickle
from utils import dht_hash, contains_predecessor, contains_successor
class DHT_Node(threading.Thread):
def __init__(self, address, dht_address=None, timeout=3):
threading.Thread.__init__(self)
self.id = dht_hash(address.__str__())
self.addr = address
self.dht_address = dht_address
if dht_address is None:
self.successor_id = self.id
self.successor_addr = address
self.predecessor_id = None
self.predecessor_addr = None
self.inside_dht = True
else:
self.inside_dht = False
self.successor_id = None
self.successor_addr = None
self.predecessor_id = None
self.predecessor_addr = None
self.keystore = {}
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.settimeout(timeout)
self.logger = logging.getLogger("Node {}".format(self.id))
def send(self, address, o):
p = pickle.dumps(o)
self.socket.sendto(p, address)
def recv(self):
try:
p, addr = self.socket.recvfrom(1024)
except socket.timeout:
return None, None
else:
if len(p) == 0:
return None, addr
else:
return p, addr
def node_join(self, args):
self.logger.debug('Node join: %s', args)
addr = args['addr']
identification = args['id']
if self.id == self.successor_id:
self.successor_id = identification
self.successor_addr = addr
args = {'successor_id': self.id, 'successor_addr': self.addr}
self.send(addr, {'method': 'JOIN_REP', 'args': args})
elif contains_successor(self.id, self.successor_id, identification):
args = {'successor_id': self.successor_id, 'successor_addr': self.successor_addr}
self.successor_id = identification
self.successor_addr = addr
self.send(addr, {'method': 'JOIN_REP', 'args': args})
else:
self.logger.debug('Find Successor(%d)', args['id'])
self.send(self.successor_addr, {'method': 'JOIN_REQ', 'args':args})
self.logger.info(self)
def notify(self, args):
self.logger.debug('Notify: %s', args)
if self.predecessor_id is None or contains_predecessor(self.id, self.predecessor_id, args['predecessor_id']):
self.predecessor_id = args['predecessor_id']
self.predecessor_addr = args['predecessor_addr']
self.logger.info(self)
def stabilize(self, x, addr):
self.logger.debug('Stabilize: %s %s', x, addr)
if x is not None and contains_successor(self.id, self.successor_id, x):
self.successor_id = x
self.successor_addr = addr
args = {'predecessor_id': self.id, 'predecessor_addr': self.addr}
self.send(self.successor_addr, {'method': 'NOTIFY', 'args':args})
def put(self, key, value, address):
key_hash = dht_hash(key)
self.logger.debug('Put: %s %s', key, key_hash)
if contains_successor(self.id, self.successor_id, key_hash):
self.keystore[key] = value
self.send(address, {'method': 'ACK'})
else:
# send to DHT
# Fill here
self.send(address, {'method': 'NACK'})
def get(self, key, address):
key_hash = dht_hash(key)
self.logger.debug('Get: %s %s', key, key_hash)
if contains_successor(self.id, self.successor_id, key_hash):
value = self.keystore[key]
self.send(address, {'method': 'ACK', 'args': value})
else:
# send to DHT
# Fill here
self.send(address, {'method': 'NACK'})
def run(self):
self.socket.bind(self.addr)
while not self.inside_dht:
o = {'method': 'JOIN_REQ', 'args': {'addr':self.addr, 'id':self.id}}
self.send(self.dht_address, o)
p, addr = self.recv()
if p is not None:
o = pickle.loads(p)
self.logger.debug('O: %s', o)
if o['method'] == 'JOIN_REP':
args = o['args']
self.successor_id = args['successor_id']
self.successor_addr = args['successor_addr']
self.inside_dht = True
self.logger.info(self)
done = False
while not done:
p, addr = self.recv()
if p is not None:
o = pickle.loads(p)
self.logger.info('O: %s', o)
if o['method'] == 'JOIN_REQ':
self.node_join(o['args'])
elif o['method'] == 'NOTIFY':
self.notify(o['args'])
elif o['method'] == 'PUT':
self.put(o['args']['key'], o['args']['value'], addr)
elif o['method'] == 'GET':
self.get(o['args']['key'], addr)
elif o['method'] == 'PREDECESSOR':
self.send(addr, {'method': 'STABILIZE', 'args': self.predecessor_id})
elif o['method'] == 'STABILIZE':
self.stabilize(o['args'], addr)
else:
# Ask for predecessor to start the stabilize process
self.send(self.successor_addr, {'method': 'PREDECESSOR'})
def __str__(self):
return 'Node ID: {}; DHT: {}; Successor: {}; Predecessor: {}'\
.format(self.id, self.inside_dht, self.successor_id, self.predecessor_id)
def __repr__(self):
return self.__str__()