-
Notifications
You must be signed in to change notification settings - Fork 0
/
ecs-server.py
418 lines (356 loc) · 18.7 KB
/
ecs-server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
import argparse
import logging
import socketserver
import os
import threading
import socket
import metaData
import signal
import time
import sys
import asyncio
import websockets
import queue
import json
# Global variables
meta_data = metaData.MetaData()
addRemoveLock = threading.Lock() # lock for adding and removing servers
server = None
channels = {'operation': {}}
eventQueue = queue.Queue() # queue for storing messages to be published
async def subscribe(websocket, subscription_type, name):
print(f"subscribing to {subscription_type}: {name}")
if name not in channels[subscription_type]:
channels[subscription_type][name] = set()
channels[subscription_type][name].add(websocket)
print(channels)
async def unsubscribe(websocket, subscription_type, name):
print("inside unsbs")
if name in channels[subscription_type]:
channels[subscription_type][name].discard(websocket)
print(channels)
async def handle_client(websocket, path):
# Assume clients will send a JSON message to subscribe/unsubscribe
async for message in websocket:
data = json.loads(message)
print(f"handle {data}")
if data.get('action') == 'subscribe':
if data.get('type') == 'operation' and data.get('name') == 'keyrange':
await send_initial_keyrange(websocket)
await subscribe(websocket, data.get('type'), data.get('name'))
elif data.get('action') == 'unsubscribe':
await unsubscribe(websocket, data.get('type'), data.get('name'))
async def send_initial_keyrange(websocket):
data = {
'type': 'operation',
'operation': 'keyrange',
'data': meta_data.ranges,
}
await websocket.send(json.dumps(data))
def run_async_server():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
start_server = websockets.serve(handle_client, '0.0.0.0', int(args.port) + 100)
loop.run_until_complete(start_server)
loop.run_until_complete(publish_data())
loop.run_forever()
async def publish_data():
# eventQueue.put(('get', 'test', 'test'))
while True:
# print("at the beginning")
await asyncio.sleep(1) # Publish data every 1 second
# print("after wait")
if eventQueue.empty():
continue
# print("publishing data")
operation, queueData = eventQueue.get()
# Simulate some data to be published to clients
data = {
'operation': operation,
'data' : queueData
}
print(data)
print(channels)
# if channels.get('operations', {}).get(operation) or channels.get('keys', {}).get(key):
# There's at least one websocket to send data to
data['type'] = "operation"
# Send data to clients subscribed to this operation
for websocket in channels['operation'].get(operation, set()):
# print("sending data")
try:
await websocket.send(json.dumps(data))
except:
pass
eventQueue.task_done()
def configure_logging(file_path, level=logging.INFO):
# if not os.path.exists(os.path.join(os.getcwd(),file_path)):
# open(os.path.join(os.getcwd(),file_path), 'w').close()
if args.loglevel in ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FINEST"):
pass
else:
print("Invalid log level. Use DEBUG, INFO, WARNING, ERROR or CRITICAL.")
exit(1)
log_dir = os.path.dirname(file_path)
os.makedirs(os.path.join(os.getcwd(), log_dir), exist_ok=True)
logging.basicConfig(
level="DEBUG",
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(file_path), # Specify the file name here
# logging.StreamHandler() # Optionally, log messages can also be displayed in the console
]
)
def calculateWhoSendsWhom(ip, port, operation):
#TODO calculate who sends data to whom
if operation == 'join':
# TODO calculate who sends data to whom for join operation
# access formatted meta_data as meta_data.data
if len(meta_data.ranges) == 1:
return None, None, meta_data.ranges
else:
index=0
for i in meta_data.ranges:
if i['ip'] == ip and i['port'] == port:
index=meta_data.ranges.index(i)
# print("Index: " + str(index))
if index == len(meta_data.ranges)-1:
# print("")
# print("Index: " + str(index))
# print("")
# print(meta_data.ranges[0]['ip'], meta_data.ranges[0]['port'], {"start": meta_data.ranges[index]['start'], "end": meta_data.ranges[index]['end']})
return meta_data.ranges[0]['ip'], meta_data.ranges[0]['port'], {"start": meta_data.ranges[index]['start'], "end": meta_data.ranges[index]['end']}
else:
return meta_data.ranges[index+1]['ip'], meta_data.ranges[index+1]['port'], {"start": meta_data.ranges[index]['start'], "end": meta_data.ranges[index]['end']}
elif operation == 'shutdown':
# TODO calculate who sends data to whom for shutdown operation
if len(meta_data.ranges) == 1:
return None, None, meta_data.ranges # return dataSenderIp, dataSenderPort, keyRange
else:
index=0
for i in meta_data.ranges:
if i['ip'] == ip and i['port'] == port:
index=meta_data.ranges.index(i)
if index == len(meta_data.ranges)-1:
return meta_data.ranges[0]['ip'], meta_data.ranges[0]['port'], {"start": meta_data.ranges[index]['start'], "end": meta_data.ranges[index]['end']}
else:
return meta_data.ranges[index+1]['ip'], meta_data.ranges[index+1]['port'], {"start": meta_data.ranges[index]['start'], "end": meta_data.ranges[index]['end']}
def updateMetaData(ip, port, operation):
#TODO update meta data
if(operation == 'join'):
meta_data.add_server(ip, port)
# print("")
# print("Data: " + str(meta_data.data))
# print("")
# print("Ranges: " + str(meta_data.ranges))
# print("")
return calculateWhoSendsWhom(ip, port, operation)
elif(operation == 'shutdown'):
# print("Shutdown started")
dataReceiverIp, dataReceiverPort, keyRange = calculateWhoSendsWhom(ip, port, operation)
# print("Calculation done. removing server")
meta_data.remove_server(ip, port)
# print("Server removed")
return dataReceiverIp, dataReceiverPort, keyRange
def sendUpdatedMetaDataToAllServers():
#TODO send updated meta data to all servers
# time.sleep(2)
# global client_socket
# put into eventQueue -> 'keyrange' and meta_data.ranges as JSON
eventQueue.put(('keyrange', json.dumps(meta_data.ranges)))
for i in meta_data.ranges:
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((i['ip'], int(i['port'])))
ignoreWelcomeMessage(client_socket) # ignore welcome message
client_socket.sendall(f'metadata_broadcast {str(meta_data.ranges)} \r\n'.encode())
# confirmation = client_socket.recv(1024).decode('ascii')
# logging.error(confirmation + ' by ' + i['ip'] + ' ' + i['port'])
def parseIpPort(command):
ip = command.split(' ')[1]
port = command.split(' ')[2]
return ip, port
def ignoreWelcomeMessage(socket):
buffer_size = 1024
received_data = b""
while True:
data = socket.recv(buffer_size)
received_data += data
if '\r\n' in data.decode(): # receive data until it contains \r\n at the end
break
def startPingingServer(ip, port):
#TODO start pinging server
# global client_socket
global meta_data
# print("Trying to connect to server ", ip, port)
cur_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cur_client_socket.connect((ip, int(port)))
# print("Connected to server ", ip, port)
ignoreWelcomeMessage(cur_client_socket) # ignore welcome message
# print("welcom message ignored")
#ping pong with server to check if it is alive
while any(ip == d['ip'] and port == int(d['port'])+1000 for d in meta_data.ranges): # while server is in meta data
# print("Pinging server ", ip, port)
cur_client_socket.sendall(f'ping\r\n'.encode())
#if pong is not received in 700ms, server is dead
cur_client_socket.settimeout(10)
confirmation = cur_client_socket.recv(1024).decode('ascii').rstrip()
# print(confirmation, "from ", ip, port)
if confirmation == 'pong':
time.sleep(1)
else:
# print("Server is dead. Shutting down")
with addRemoveLock:
# print("Lock acquired for non-responsive kvserver. Server is removing from the list.")
updateMetaData(ip, str(port-1000), 'shutdown') # delete unresponsive server from meta data
client_thread = threading.Thread(target=sendUpdatedMetaDataToAllServers)
client_thread.start()
# sendUpdatedMetaDataToAllServers() # send updated meta data to all servers
break
class MyRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
# print(f"New Server Connection {self.client_address[0]}:{self.client_address[1]}")
logging.info(f"New Server Connection {self.client_address[0]}:{self.client_address[1]}")
# Receive data from the KVServer
data = self.request.recv(1024)
if not data:
return #TODO handle this
command = data.decode('ascii')
command = command.rstrip() # remove trailing spaces
operation = command.split(' ')[0] # get the operation
if operation in ('join', 'shutdown'):
with addRemoveLock: # lock for adding and removing servers (only one server at a time)
logging.info(f"'{operation}' operation called")
if operation == 'join': # new KVServer joins to cluster -> join IP PORT
try:
ip, port = parseIpPort(command) # get the ip and port of new server #do we need this ? dont we already have the ip and port
# print('ip', ip, port)
dataSenderIp, dataSenderPort, keyRange = updateMetaData(ip, port, 'join') # get which server will send data to new server with the key range
# print(dataSenderIp, dataSenderPort, keyRange)
# print(dataSenderIp, dataSenderPort, keyRange)
if (dataSenderIp and dataSenderPort): # if there is a server that will send data to new server
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: # create connection to server that will send the data to new server and say transfer data
s.connect((dataSenderIp, int(dataSenderPort))) # connect to server that will send the data to new server
ignoreWelcomeMessage(s) # ignore welcome message
# print(ip)
# print(port)
# print(keyRange)
# print(keyRange['start'])
# print(keyRange['end'])
s.sendall(f"send_data {ip} {port} {keyRange['start']} {keyRange['end']} \r\n".encode()) # send_data newServerIp newServerPort keyRange
# start socket with dataSenderIp and dataSenderPort
responseFromNewServer = self.request.recv(1024) # wait from new server to say that it received the all the data
responseFromNewServer = responseFromNewServer.decode('ascii').rstrip() # remove trailing spaces
if responseFromNewServer != 'received_data':
logging.error(f"Error while sending data to new server")
#TODO return 'error while sending data to new server'
# send updated meta_data to all servers
client_thread = threading.Thread(target=sendUpdatedMetaDataToAllServers)
client_thread.start()
# sendUpdatedMetaDataToAllServers() # send updated meta data to all servers
# start pinging server
print(f"ping port is {int(port)+1000}")
print(f"ping ip is {ip}")
client_thread = threading.Thread(target=startPingingServer, args=(ip,int(port)+1000))
client_thread.start()
except Exception as e:
print(e)
# return 'join_error'
elif operation == 'shutdown': # KVServer wants to shut down -> shutdown IP PORT
try:
# print("Shutdown request received")
ip, port = parseIpPort(command) # get the ip and port of new server
dataReceiverIp, dataReceiverPort, keyRange = updateMetaData(ip, port, 'shutdown') # get which server will receive data from the server that will shut down with the key range
#
#
# IF new keyrange-array size EQUALS or GREATER than 2
# Then: DO NOT TRANSFER DATA (ASSUME YOUR DATA IS ALREADY REPLICATED)
# ELIF new keyrange-array size SMALLER than 2 (it can be 1 or 0)
# Then: Data can be transferred
# (it will be transferred only if count equals to 1)
# (if it is 0 then updateMetaData returns None values so transfer will not happen)
#
#
meta_data_new_count = len(meta_data.data)
if meta_data_new_count >= 2:
dataReceiverIp = None
dataReceiverPort = None
# print("Sending data to server")
# print(dataReceiverIp)
# print(dataReceiverPort)
# print(keyRange)
# print(keyRange['start'])
# print(keyRange['end'])
if(dataReceiverIp and dataReceiverPort): # if there is a server that will receive data from the server that will shut down
self.request.sendall(f"send_data {dataReceiverIp} {dataReceiverPort} {keyRange['start']} {keyRange['end']} \r\n".encode()) # send to server that will shut down that it will send data to another existsing server
# print("Data sent to " + dataReceiverIp + " " + dataReceiverPort)
responseFromShuttingDownServer = self.request.recv(1024) # wait from shutting down server to say that it sent the all the data to another server
# print("Response received")
responseFromShuttingDownServer = responseFromShuttingDownServer.decode('ascii').rstrip() # remove trailing spaces
if responseFromShuttingDownServer != 'transfer_done': # if server that will shut down did not send the data to another server
logging.error(f"Error while sending data to another server")
#TODO return 'error while sending data to new server'
# sendUpdatedMetaDataToAllServers() # send updated meta data to all servers
else: # if there is no server that will receive data from the server that will shut down
self.request.sendall(f"no_data_to_send\r\n".encode())
# print("Data sent to all servers")
# send updated meta_data to all servers
client_thread = threading.Thread(target=sendUpdatedMetaDataToAllServers)
client_thread.start()
except Exception as e:
print(e)
else:
logging.error(f"Invalid operation '{operation}' requested")
return 'error unknown command!'
# Close the client connection
self.request.close()
# print(f"Connection with {self.client_address[0]}:{self.client_address[1]} closed.")
def create_parser():
# Create an argument parser
parser = argparse.ArgumentParser(description='ECS Server')
# Add arguments
parser.add_argument('-p', '--port', default=8000, type=int, help='Port that the server should use.')
parser.add_argument('-a', '--address', default='localhost', help='Adress that server should use')
parser.add_argument('-l', '--log', default='logs/log.txt', help='Relative path of the logfile')
parser.add_argument('-ll', '--loglevel', default='INFO', help='Set the log level of the server')
return parser
def shutdown_hook():
pass
# print("shutdown hook started")
class GracefulThreadingTCPServer(socketserver.ThreadingTCPServer):
allow_reuse_address = True
daemon_threads = True
def run_server(server):
try:
server.serve_forever()
except Exception as e:
# print(f"Server error: {e}")
logging.error(f"Server error: {e}")
def handle_shutdown_signal(signum, frame):
# Call your shutdown_hook here
shutdown_hook(args.bootstrap, args.address, args.port)
# Stop the server
# print("Stopping server")
server.shutdown()
# print("Server shutdown done")
server.server_close()
# print("Server closed")
sys.exit(0)
# Handle signals
signal.signal(signal.SIGINT, handle_shutdown_signal)
signal.signal(signal.SIGTERM, handle_shutdown_signal)
if __name__ == "__main__":
parser = create_parser()
args = parser.parse_args()
# print(args)
configure_logging(args.log, level=args.loglevel)
# print("Started")
async_server_thread = threading.Thread(target=run_async_server)
async_server_thread.start()
with GracefulThreadingTCPServer(("0.0.0.0", int(args.port)), MyRequestHandler) as server:
# Start the server in a separate thread
server_thread = threading.Thread(target=run_server, args=(server,))
server_thread.daemon = True # This ensures that the thread will exit when the main program exits
server_thread.start()
# Keep the main thread alive, waiting for termination signals
while True:
server_thread.join(1)
async_server_thread.join(1)