-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdiaz_chord.py
321 lines (223 loc) · 18.8 KB
/
diaz_chord.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
import sys
import zmq
from time import time
import threading
import hashlib
from utils import request, bcolors
from random import Random
from functools import reduce
import argparse
from socket import gethostbyname, gethostname
import re
#Node class of the chord ring. Each node represents an active index in the ring.
#An index that store data about other index, maintinig only a simple invariant:
#each node, knows its succesor.
class Node:
def __init__(self, addr, introduction_node = None, verbose_option = False):
self.addr = addr
self.domain_addr = lambda value : reduce((lambda x,y : x + y), [x for x in value.split(":")[0].split(".") + [value.split(":")[1] ] ])
self.turn_in_hash = lambda input_to_id : int(hashlib.sha1(bytes(self.domain_addr(input_to_id), 'utf-8') ).hexdigest(), 16 )
self.verbose_option = verbose_option
self.id = self.turn_in_hash(self.addr)
self.context_sender = zmq.Context()
self.m = 64
self.length_succ_list = 3
self.succ_list = [(self.id, self.addr) for i in range(self.length_succ_list)]
self.start = lambda i : (self.id + 2**(i)) % 2**self.m
self.finger_table = [None for i in range(self.m)]
self.waiting_time = 10
self.commands = {"JOIN": self.answer_to_join, "FIND_SUCC": self.find_succesor_wrapper, "FIND_PRED" : self.find_predecesor_wrapper, "GET_SUCC_LIST": self.get_succ_list, "CLOSEST_PRED_FING": self.closest_pred_fing_wrap, "ALIVE": self.alive, "GET_PARAMS": self.get_params, "GET_PROP": self.get_prop, "GET_PRED": self.get_pred, "STAB": self.stabilize, "RECT": self.rectify }
self.commands_that_need_request = {"RECT", "FIND_SUCC", "FIND_PRED", "CLOSEST_PRED_FING", "STAB"}
if self.verbose_option: print("Started node ", (self.id, self.addr))
client_requester = request(context = self.context_sender, verbose_option = self.verbose_option)
if introduction_node:
introduction_id = self.turn_in_hash(introduction_node)
recieved_json = client_requester.make_request(json_to_send = {"command_name" : "JOIN", "method_params" : {}, "procedence_addr" : self.addr}, destination_addr = introduction_node, destination_id = introduction_id)
while recieved_json is client_requester.error_json:
client_requester.action_for_error(introduction_node)
print("Enter address to retry ")
introduction_node = input()
introduction_id = self.turn_in_hash(self.domain_addr(introduction_node))
print("Connecting now to ", (introduction_node, introduction_id))
recieved_json = client_requester.make_request(json_to_send = {"command_name" : "JOIN", "method_params" : {}, "procedence_addr" : self.addr}, destination_id = introduction_id, destination_addr = introduction_node)
while not self.execute_join(introduction_node, introduction_id, self.start(0), client_requester):
client_requester.action_for_error(introduction_node)
print("Enter address to retry ")
introduction_node = input()
introduction_id = self.turn_in_hash(self.domain_addr(introduction_node))
print("Connecting now to ", (introduction_id, introduction_node))
recieved_json = client_requester.make_request(json_to_send = {"command_name" : "JOIN", "method_params" : {}, "procedence_addr" : self.addr}, destination_id = introduction_id, destination_addr = introduction_node)
else:
self.predeccesor_addr, self.predeccesor_id = self.addr, self.id
self.isPrincipal = True
self.wrapper_action(client_requester)
#The stabilize step ask to the succesor of the node (self), for its predecessor.
#If the predecessor_id of the succesor is between self.id and succesor.id, then
#self has a new succesor: the predecessor of its succesor. In that case, self also
#needs to actualize its succesor list, putting after its new succesor the elements
#of the new succesor succ_list, but the last one.
def stabilize(self, sock_req : request):
#If the succesor query fails, then we pop the death succesor, and
#return. It is important to know that if the length of the succ_list
#is and we have k succesive fails in the Chord, we can't expect a good
#working of the network.
recv_json_pred = sock_req.make_request(json_to_send = {"command_name" : "GET_PRED", "method_params" : {}, "procedence_addr" : self.addr, "procedence_method": "stabilize_95"}, requester_object = self, asked_properties = ('predeccesor_id', 'predeccesor_addr'), destination_id = self.succ_list[0][0], destination_addr = self.succ_list[0][1])
if recv_json_pred is sock_req.error_json:
if self.verbose_option:
sock_req.action_for_error(self.succ_list[0][1])
self.succ_list.pop(0)
self.succ_list += [(self.id, self.addr)]
return
recv_json_succ_list = sock_req.make_request(json_to_send = {'command_name' : "GET_SUCC_LIST", 'method_params' : {}, 'procedence_addr' : self.addr, "procedence_method": "stabilize_104"}, requester_object = self, asked_properties = ("succ_list",), destination_id = self.succ_list[0][0], destination_addr = self.succ_list[0][1])
if recv_json_succ_list is sock_req.error_json: return
self.succ_list = [self.succ_list[0]] + recv_json_succ_list['return_info']["succ_list"][:-1]
if self.between(recv_json_pred['return_info']['predeccesor_id'], interval = (self.id, self.succ_list[0][0]) ):
recv_json_pred_succ_list = sock_req.make_request( json_to_send = {"command_name" : "GET_SUCC_LIST", "method_params" : {}, "procedence_addr" : self.addr, "procedence_method": "stabilize_109"}, requester_object = self, asked_properties = ('succ_list',), destination_id = recv_json_pred['return_info'][ 'predeccesor_id'], destination_addr = recv_json_pred['return_info'][ 'predeccesor_addr'])
if not recv_json_pred_succ_list is sock_req.error_json:
#If it's true that self has a new succesor and this new succesor is alive, then self has to actualize its succ_list
self.succ_list = [[recv_json_pred['return_info']['predeccesor_id'], recv_json_pred['return_info']['predeccesor_addr']]] + recv_json_pred_succ_list['return_info']['succ_list'][:-1]
else:
self.verbose_option: sock_req.action_for_error(recv_json_pred['return_info'][ 'predeccesor_addr'])
def between(self, id, interval):
if interval[0] < interval[1]:
return id > interval[0] and id < interval[1]
return id > interval[0] or id < interval[1]
#The rectify step is executed by a notification of the predeccesor of the node.
def rectify(self, predeccesor_id, predeccesor_addr, sock_req):
if self.between(predeccesor_id, interval = (self.predeccesor_id, self.id)) or self.id == self.predeccesor_id:
if self.predeccesor_id == self.id:
self.succ_list[0] = (predeccesor_id, predeccesor_addr)
self.predeccesor_id, self.predeccesor_addr = predeccesor_id, predeccesor_addr
else:
recv_json_alive = sock_req.make_request(json_to_send = {"command_name" : "ALIVE", "method_params" : {}, "procedence_addr" : self.addr, "procedence_method": "rectify"}, destination_id = self.predeccesor_id, destination_addr = self.predeccesor_addr)
if recv_json_alive is sock_req.error_json:
if self.verbose_option: sock_req.action_for_error(self.predeccesor_addr)
self.predeccesor_id, self.predeccesor_addr = predeccesor_id, predeccesor_addr
sock_req.action_for_error(self.predeccesor_addr)
self.sock_rep.send_json( { "response": "ACK" } )
def answer_to_join(self):
self.sock_rep.send_json({"response": "ACK_to_join", "return_info": {}})
#The execute_join function introduce self in the Chord ring, founding the predecessor
#of the self.id, and assigning it to self.predecessor value.
def execute_join(self, introduction_node, introduction_id, id_to_found_pred, sock_req):
recv_json = sock_req.make_request(json_to_send = {"command_name" : "FIND_PRED", "method_params" : {"id" : id_to_found_pred}, "procedence_addr" : self.addr}, requester_object = self, method_for_wrap = "find_predecesor", destination_id = introduction_id, destination_addr = introduction_node)
if recv_json is sock_req.error_json:
return False
self.predeccesor_id, self.predeccesor_addr = recv_json['return_info']['pred_id'], recv_json['return_info']['pred_addr']
recv_json = sock_req.make_request(json_to_send = {"command_name" : "GET_SUCC_LIST", "method_params" : {}, "procedence_addr" : self.addr}, requester_object = self, asked_properties = ("succ_list",), destination_id = recv_json['return_info']['pred_id'], destination_addr = recv_json['return_info']['pred_addr'] )
if recv_json is sock_req.error_json:
return False
self.succ_list = recv_json['return_info']['succ_list']
return True
#Those are auxliar methods for see some properties from the client side
def get_params(self):
self.sock_rep.send_json({"response": "ACK", "return_info": {"finger_table" : self.finger_table, "predeccesor_addr" : self.predeccesor_addr, "predeccesor_id" : self.predeccesor_id, "succ_list" : self.succ_list, "id": self.id, "address": self.addr } })
def get_prop(self, prop_name):
if prop_name == "start_indexes":
self.sock_rep.send_json({'response': "ACK", "return_info" : [self.start(i) for i in range(self.m)] })
self.sock_rep.send_json({'response': 'ACK', "return_info": self.__dict__[prop_name] })
def get_pred(self):
self.sock_rep.send_json({"response": "ACK", "return_info": {"predeccesor_id" : self.predeccesor_id, "predeccesor_addr" : self.predeccesor_addr } } )
def alive(self):
self.sock_rep.send_json({"response": "ACK", "procedence_addr": self.addr})
def get_succ_list(self):
self.sock_rep.send_json( {"response": "ACK", "return_info": {"succ_list" : self.succ_list} } )
#In here I created the threads for the activies of the node.
#The main reason why I did it is because of avialability.
#The node must to stabilize frecuently, after a period of time (self.waiting_time),
#and during that time it needs to be open to request from other nodes.
def wrapper_action(self, client_requester):
thr_stabilize = threading.Thread(target = self.wrapper_loop_stabilize, args =() )
thr_stabilize.start()
self.waiting_for_command(client_requester)
def wrapper_loop_stabilize(self):
countdown = time()
rand = Random()
rand.seed()
requester = request(context = self.context_sender, verbose_option = self.verbose_option)
choices = [i for i in range(self.m)]
while True:
if abs (countdown - time( ) ) > self.waiting_time:
if self.predeccesor_id != self.id:
#Periodically, the node stabilize its information about the network,
#and actuallize a finger table, that is an optmizitation for found succesors,
#in a better way.
self.stabilize(sock_req = requester)
#Independetly of the result of the stabilize, the node sends a notify message to its succesor, asking for a rectification of the predecessor values.
if requester.make_request(json_to_send = {"command_name" : "RECT", "method_params" : { "predeccesor_id": self.id, "predeccesor_addr" : self.addr }, "procedence_addr" : self.addr, "procedence_method": "wrapper_loop_stabilize", "time": time()}, destination_id = self.succ_list[0][0], destination_addr = self.succ_list[0][1]) is requester.error_json and self.verbose_option:
requester.action_for_error(self.succ_list[0][1])
index = rand.choice( choices )
self.finger_table[ index ] = self.find_succesor(self.start(index), sock_req = requester)
countdown = time()
#In this method the nodes bind its address for possible connections, and recieve messages.
def waiting_for_command(self, client_requester):
self.sock_rep = self.context_sender.socket(zmq.REP)
self.sock_rep.bind("tcp://" + self.addr)
while True:
if self.verbose_option:
print("waiting")
buff = self.sock_rep.recv_json()
if buff['command_name'] in self.commands:
if self.verbose_option:
print(buff)
if buff['command_name'] in self.commands_that_need_request:
self.commands[buff["command_name"]](**buff["method_params"], sock_req = client_requester)
else:
self.commands[buff["command_name"]](**buff["method_params"])
self.sock_rep.close()
def find_succesor_wrapper(self, id, sock_req):
info = self.find_succesor(id, sock_req)
self.sock_rep.send_json({"response": "ACK", "return_info": info})
pass
def find_succesor(self, id, sock_req):
tuple_info = self.find_predecesor(id, sock_req)
if tuple_info:
destination_id, destination_addr = tuple_info
recv_json = sock_req.make_request(json_to_send = {"command_name" : "GET_SUCC_LIST", "method_params": {}, "procedence_addr": self.addr, "procedence_method": "find_succesor_286"}, requester_object= self, asked_properties = ('succ_list', ), destination_id = destination_id, destination_addr = destination_addr )
if recv_json is sock_req.error_json: return None
return recv_json['return_info']['succ_list'][0]
return None
def find_predecesor_wrapper(self, id, sock_req):
pred_id, pred_addr = self.find_predecesor(id, sock_req)
self.sock_rep.send_json({"response": "ACK", "return_info": {"pred_id": pred_id, "pred_addr": pred_addr}, "procedence_addr": self.addr } )
#In this method the node executes a query about an input id.
#There is a particular case if the query id is equal to current_succ_id.
#In that case, without the and clause in line 256, the while loop executes infinity, because the id never be
#between current_id and current_succ_id in future iterations.
def find_predecesor(self, id, sock_req):
current_id = self.id
current_succ_id, current_succ_addr = self.succ_list[0]
self.finger_table[0] = self.succ_list[0]
current_addr = self.addr
while not self.between(id, interval = (current_id, current_succ_id)) and current_succ_id != id :
recv_json_closest = sock_req.make_request(json_to_send = {"command_name" : "CLOSEST_PRED_FING", "method_params" : {"id": id}, "procedence_addr" : self.addr, "procedence_method": "find_predecesor"}, method_for_wrap = 'closest_pred_fing', requester_object = self, destination_id = current_id, destination_addr = current_addr)
if recv_json_closest is sock_req.error_json : return None
recv_json_succ = sock_req.make_request(json_to_send = {"command_name" : "GET_SUCC_LIST", "method_params" : {}, "procedence_addr" : self.addr, "procedence_method" : "find_predecesor" }, requester_object = self, asked_properties = ("succ_list", ), destination_id = recv_json_closest['return_info'][0], destination_addr = recv_json_closest['return_info'][1] )
if recv_json_succ is sock_req.error_json:
return None
current_id, current_addr = recv_json_closest['return_info'][0], recv_json_closest['return_info'][1]
current_succ_id, current_succ_addr = recv_json_succ['return_info']['succ_list'][0]
return current_id, current_addr
def closest_pred_fing_wrap (self, id, sock_req):
closest_id, closest_addr = self.closest_pred_fing(id, sock_req)
self.sock_rep.send_json({"response" : "ACK", "return_info" : (closest_id, closest_addr), "procedence": self.addr})
def closest_pred_fing(self, id, sock_req):
#In here is where we use the finger_table as an optimization of the search of the succesor of an id.
for i in range(self.m-1, -1, -1):
if self.finger_table[i] is None : continue
if self.between(self.finger_table[i][0], (self.id, id) ) :
return self.finger_table[i]
return (self.id, self.addr)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description= "This is the code of a Chord node made by DiazRock")
parser.add_argument('--addr_id', default = gethostbyname(gethostname()) + ":8080", help= "This is the address of the node that identifies it in the hash space.\nIf no address is set, this automatically set the local address asigned from the local network.")
parser.add_argument('--addr_known', default = None, help = "This is an IP address that identifies reference a node in the network.\nIf you wanna join new nodes to an existing network, you have to enter this value, otherwise your node never bee connected to the network.")
parser.add_argument('--v', action = "store_true", help = "This is the verbose option. You can see the activity of the node if you enter it.")
matcher = re.compile("\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,6}")
args = parser.parse_args()
error_message = "The %s must have an IP:port like format first, and after that, because the code uses this info for the hash function, if you want\n to avoide colisions, you must enter an unique string. This %s is a bad input"
if not matcher.fullmatch(args.addr_id.split()[0]) :
parser.error(error_message %("addr_id", args.addr_id))
if args.addr_known and not matcher.fullmatch(args.addr_known.split()[0]):
parser.error(error_message %("addr_known", args.addr_known))
n = Node(addr = args.addr_id, introduction_node = args.addr_known, verbose_option = args.v)