-
Notifications
You must be signed in to change notification settings - Fork 6
/
ipfs_peers.py
321 lines (273 loc) · 11.6 KB
/
ipfs_peers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
""""""
from threading import Event
import threading
import time
from threading import Thread, Lock
import os
import json
import ipfs_api
from datetime import datetime, timedelta
# default values for various settings, can all be overridden
FORGET_AFTER_HOURS = 200
SUCCESSIVE_REGISTER_IGNORE_DUR_SEC = 60
CONNECTION_ATTEMPT_INTERVAL_SEC = 5
FILE_WRITE_INTERVAL_SEC = 1
class Peer:
"""Object for representing an IPFS peer and contact information collected
from it."""
__peer_id = None
__multiaddrs = [] # list((multiaddr, datetime))
__last_seen = None # datetime
__multi_addrs_lock = Lock()
__terminate = False
def __init__(self, peer_id="", serial=None):
if peer_id and not serial:
self.__peer_id = peer_id
elif serial:
if isinstance(serial, str):
data = json.loads(serial)
else:
data = serial
if not isinstance(serial, dict):
raise TypeError(
f"Parameter serial must be of type dict or str, not {type(serial)}")
self.__peer_id = data['peer_id']
self.__last_seen = string_to_time(data['last_seen'])
self.__multiaddrs = [(addr, string_to_time(t))
for addr, t in data['multiaddrs']]
else:
raise ValueError(
"You must specify exactly one parameter to this constructor: peer_id OR serial")
def register_contact_event(self, successive_register_ignore_dur_sec=SUCCESSIVE_REGISTER_IGNORE_DUR_SEC):
"""
Returns:
bool: whether or not the event was registered
"""
# skip registering if last register wasn't very long ago
if self.last_seen() and (datetime.utcnow() - self.last_seen()).total_seconds() < successive_register_ignore_dur_sec:
return False
with self.__multi_addrs_lock:
multiaddrs = ipfs_api.get_peer_multiaddrs(self.__peer_id)
if not ipfs_api.is_peer_connected(self.__peer_id):
return False
now = datetime.utcnow()
if multiaddrs:
self.__last_seen = now
else:
return False
# update last_seen dates of known multiaddrs, removing them from
# the local multiaddrs list
for i, (multiaddr, last_seen) in enumerate(self.__multiaddrs):
if multiaddr in multiaddrs:
self.__multiaddrs[i] = (multiaddr, now)
multiaddrs.remove(multiaddr)
# add new multiaddrs to known multiaddrs
for multiaddr in multiaddrs:
self.__multiaddrs.append((multiaddr, now))
return True
def forget_old_entries(self, date):
with self.__multi_addrs_lock:
indeces_to_delete = []
# redefine self.__multiaddrs, selecting only those old entries that have the correct date
self.__multiaddrs = [(multiaddr, last_seen)
for multiaddr, last_seen in self.__multiaddrs if last_seen > date]
def last_seen(self):
"""Returns the date at which this peer was last seen.
Returns:
datetime: the date at which this peer was last seen or None
"""
return self.__last_seen
def connect(self, successive_register_ignore_dur_sec=SUCCESSIVE_REGISTER_IGNORE_DUR_SEC):
"""Tries to connect to this peer.
Returns:
bool: whether or not we managed to connect to this peer
"""
for multiaddr, date in self.__multiaddrs:
if self.__terminate:
return False
success = ipfs_api.connect_to_peer(
f"{multiaddr}/p2p/{self.__peer_id}")
if success and ipfs_api.is_peer_connected(self.__peer_id):
self.register_contact_event(successive_register_ignore_dur_sec)
return True
if self.__terminate:
return False
# if none of the known multiaddresses worked, try a general findpeer
if ipfs_api.find_peer(self.__peer_id) and ipfs_api.is_peer_connected(self.__peer_id):
self.register_contact_event(successive_register_ignore_dur_sec)
return True
return False
def multiaddrs(self):
return self.__multiaddrs
def peer_id(self):
return self.__peer_id
def serialise(self):
last_seen = None
last_seen = time_to_string(self.__last_seen)
data = {
'peer_id': self.__peer_id,
'last_seen': last_seen,
'multiaddrs': [[addr, time_to_string(t)] for addr, t in self.__multiaddrs],
}
return data
def terminate(self):
self.__terminate = True
class PeerMonitor:
"""A class for managing peer contact information for a certain app
Args:
filepath (str): path of the configuration file in which this
PeerMonitor's data is/should be stored
forget_after_hrs (int): after how many hours of no communication
a peer should be forgotten
connection_attempt_interval_sec (int): in the loop that constantly
tries to connect to known peers, how many seconds should be
paused between consecutive connection attempts
successive_register_ignore_dur_sec (int): minimum duration between
successive registrations of the same peer
"""
forget_after_hrs = FORGET_AFTER_HOURS
connection_attempt_interval_sec = CONNECTION_ATTEMPT_INTERVAL_SEC
successive_register_ignore_dur_sec = SUCCESSIVE_REGISTER_IGNORE_DUR_SEC
# in which intervall the updated data should be written the the file
file_write_interval_sec = FILE_WRITE_INTERVAL_SEC
__peers = [] # list(Peer)
__terminate = False
__save_lock = Lock()
__file_manager_thread = None # Thread
__save_event = Event()
__peers_lock = Lock() # for adding & removing peers
def __init__(self,
filepath,
forget_after_hrs=FORGET_AFTER_HOURS,
connection_attempt_interval_sec=CONNECTION_ATTEMPT_INTERVAL_SEC,
successive_register_ignore_dur_sec=SUCCESSIVE_REGISTER_IGNORE_DUR_SEC):
self.__filepath = filepath
self.forget_after_hrs = forget_after_hrs
self.connection_attempt_interval_sec = connection_attempt_interval_sec
self.successive_register_ignore_dur_sec = successive_register_ignore_dur_sec
if os.path.exists(filepath):
with open(filepath, 'r') as file:
data = file.read()
if data.strip("\n"): # if file isn't empty
data = json.loads(data)
peers = data['peers']
for peer_data in peers:
if self.get_peer_by_id(peer_data['peer_id']):
# TODO how to warn user about duplicate entries?
# Function to merge peers?
# Ever necessary?
continue
self.__peers.append(Peer(serial=peer_data))
self.__peer_finder_thread = Thread(target=self.__connect_to_peers, args=(),
name="PeerMonitor.__connect_to_peers")
self.__peer_finder_thread.start()
self.__file_manager_thread = Thread(
target=self.__file_manager, args=(), name="PeerMonitor.__file_manager")
self.__file_manager_thread.start()
def register_contact_event(self, peer_id):
# get peer, create if new
with self.__peers_lock:
peer = self.get_peer_by_id(peer_id, already_locked=True)
if not peer:
peer = Peer(peer_id)
self.__peers.append(peer)
# try register, and if data is recorded, save to file
if peer.register_contact_event(successive_register_ignore_dur_sec=self.successive_register_ignore_dur_sec):
self.save()
def get_peer_by_id(self, peer_id, already_locked=False):
if not already_locked:
self.__peers_lock.acquire()
found_peer = None
for peer in self.__peers:
if peer.peer_id() == peer_id:
found_peer = peer
break
if not already_locked:
self.__peers_lock.release()
return found_peer
def peers(self):
return self.__peers
__save = False
def __file_manager(self):
while True:
if self.__terminate:
return
# if self.__save_event.wait(1):
# self.save()
# self.__save_event.clear()
if self.__save:
self._save()
self.__save = False
time.sleep(self.file_write_interval_sec)
def save(self):
self.__save = True
def _save(self):
with self.__save_lock:
try:
with open(self.__filepath, 'w+') as file:
data = {
'peers': [peer.serialise() for peer in self.__peers]
}
file.write(json.dumps(data))
except OSError as e:
if "Too many open files" in str(e):
print(e)
else:
raise e
# self.__save_event.clear()
self.__save = False
def __connect_to_peers(self):
while not self.__terminate:
# try to connect to all peers on separate threads
for peer in self.__peers:
# Thread(target=peer.connect, args=(self.successive_register_ignore_dur_sec,),
# name="PeerMonitor-Peer.connnect").start()
peer.connect(self.successive_register_ignore_dur_sec)
# wait a bit to save processing power
# and reduce up congestion of ipfs http client
for i in range(self.connection_attempt_interval_sec):
if self.__terminate:
self.save()
return
time.sleep(1)
# make peers forget old multiaddresses
threshhold_time = datetime.utcnow() - timedelta(hours=self.forget_after_hrs)
for peer in self.__peers:
peer.forget_old_entries(threshhold_time)
# forget old peers
with self.__peers_lock:
self.__peers = [
peer for peer in self.__peers if peer.multiaddrs()]
time.sleep(1)
self.save()
def find_all_peers(self):
"""Try to connect to all peers now.
Blocks until all connection attempts have been finished."""
threads = []
for peer in self.__peers:
thread = Thread(target=peer.connect, args=(self.successive_register_ignore_dur_sec,))
threads.append(thread)
thread.start()
# wait for all threads to finish
for thread in threads:
thread.join()
def terminate(self, wait=False):
"""Stop this PeerMonitor's activities.
Args:
wait (bool): whether or not this function should block until all
activity has been stopped and resources have been cleaned up
"""
self.__terminate = True
for peer in self.__peers:
peer.terminate()
self.__peer_finder_thread.join()
self.__file_manager_thread.join()
TIME_FORMAT = '%Y.%m.%d_%H.%M.%S'
def time_to_string(_time: datetime):
if not _time:
return None
return _time.strftime(TIME_FORMAT)
def string_to_time(string):
if not string:
return None
return datetime.strptime(string, TIME_FORMAT)