-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprob.py
133 lines (94 loc) · 4.67 KB
/
prob.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import random
import datetime
import uuid
# OBJECTIVES TODO:
# 1) Read the code and understand it.
# 2) Read the code again and understand it better.
# 3) Feel free to do 1 and 2 however many times you feel like.
# 4) Complete the SyncService implementation. Note that the SyncService.onMessage and SyncService.__init__ function signature must not be altered.
_DATA_KEYS = ["a","b","c"]
class Device:
def __init__(self, id):
self._id = id
self.records = []
self.sent = []
def obtainData(self) -> dict:
"""Returns a single new datapoint from the device.
Identified by type `record`. `timestamp` records when the record was sent and `dev_id` is the device id.
`data` is the data collected by the device."""
if random.random() < 0.4:
# Sometimes there's no new data
return {}
rec = {
'type': 'record', 'timestamp': datetime.datetime.now().isoformat(), 'dev_id': self._id,
'data': {kee: str(uuid.uuid4()) for kee in _DATA_KEYS}
};self.sent.append(rec)
return rec
def probe(self) -> dict:
"""Returns a probe request to be sent to the SyncService.
Identified by type `probe`. `from` is the index number from which the device is asking for the data."""
if random.random() < 0.5:
# Sometimes the device forgets to probe the SyncService
return {}
return {'type': 'probe', 'dev_id': self._id, 'from': len(self.records)}
def onMessage(self, data: dict):
"""Receives updates from the server"""
if random.random() < 0.6:
# Sometimes devices make mistakes. Let's hope the SyncService handles such failures.
return
if data['type'] == 'update':
_from = data['from']
if _from > len(self.records):
return
self.records = self.records[:_from] + data['data']
class SyncService:
def __init__(self):
self.SyncServerRecords =[]
""" (my_understanding) The task of SyncServices is to recieve data from devices and return them the data it has collected at a particular time.
To achieve that, we have to initialise a list containing all the data. The list SyncServerRecords here will contain multiple dictionaries"""
def onMessage(self, data: dict):
"""Handle messages received from devices.
Return the desired information in the correct format (type `update`, see Device.onMessage and testSyncing to understand format intricacies) in response to a `probe`.
No return value required on handling a `record`."""
if data['type'] == 'record':
ServerTimestamp = data['timestamp']
ServerDev_id = data['dev_id']
ServerRecord_data = data['data']
record = {'timestamp': ServerTimestamp, 'dev_id': ServerDev_id, 'data': ServerRecord_data}
self.SyncServerRecords.append(record)
return None
elif data['type'] == 'probe':
UpdatedRec = {
'type': 'update', 'from': len(self.SyncServerRecords), 'dev_id': data['dev_id'],
'data': self.SyncServerRecords['data']
}
return UpdatedRec
else:
raise NotImplementedError()
"""(my_undersatnding) On recieving 'type' as 'record' from a device, the server appends all the new data collected by the device in server in the form of a dictionary.
But when the 'type' is 'probe', it return a list of dictionaries containing the data server has, to the device so that it can be acessed by the device to update its records list."""
def testSyncing():
devices = [Device(f"dev_{i}") for i in range(10)]
syn = SyncService()
_N = int(1e6)
for i in range(_N):
for _dev in devices:
syn.onMessage(_dev.obtainData())
_dev.onMessage(syn.onMessage(_dev.probe()))
done = False
while not done:
for _dev in devices: _dev.onMessage(syn.onMessage(_dev.probe()))
num_recs = len(devices[0].records)
done = all([len(_dev.records) == num_recs for _dev in devices])
ver_start = [0] * len(devices)
for i,rec in enumerate(devices[0].records):
_dev_idx = int(rec['dev_id'].split("_")[-1])
assertEquivalent(rec, devices[_dev_idx].sent[ver_start[_dev_idx]])
for _dev in devices[1:]:
assertEquivalent(rec, _dev.records[i])
ver_start[_dev_idx] += 1
def assertEquivalent(d1:dict, d2:dict):
assert d1['dev_id'] == d2['dev_id']
assert d1['timestamp'] == d2['timestamp']
for kee in _DATA_KEYS:
assert d1['data'][kee] == d2['data'][kee]