forked from nicotine-plus/nicotine-plus
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathslskproto.py
2265 lines (1675 loc) · 83.7 KB
/
slskproto.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# COPYRIGHT (C) 2020-2022 Nicotine+ Contributors
# COPYRIGHT (C) 2008-2012 quinox <[email protected]>
# COPYRIGHT (C) 2007-2009 daelstorm <[email protected]>
# COPYRIGHT (C) 2003-2004 Hyriand <[email protected]>
# COPYRIGHT (C) 2001-2003 Alexander Kanavin
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
This module implements Soulseek networking protocol.
"""
import errno
import selectors
import socket
import struct
import sys
import time
from threading import Thread
from pynicotine.events import events
from pynicotine.logfacility import log
from pynicotine.slskmessages import DISTRIBUTED_MESSAGE_CLASSES
from pynicotine.slskmessages import DISTRIBUTED_MESSAGE_CODES
from pynicotine.slskmessages import NETWORK_MESSAGE_EVENTS
from pynicotine.slskmessages import PEER_MESSAGE_CLASSES
from pynicotine.slskmessages import PEER_MESSAGE_CODES
from pynicotine.slskmessages import PEER_INIT_MESSAGE_CLASSES
from pynicotine.slskmessages import PEER_INIT_MESSAGE_CODES
from pynicotine.slskmessages import SERVER_MESSAGE_CLASSES
from pynicotine.slskmessages import SERVER_MESSAGE_CODES
from pynicotine.slskmessages import AcceptChildren
from pynicotine.slskmessages import BranchLevel
from pynicotine.slskmessages import BranchRoot
from pynicotine.slskmessages import CheckPrivileges
from pynicotine.slskmessages import CloseConnection
from pynicotine.slskmessages import CloseConnectionIP
from pynicotine.slskmessages import ConnectionType
from pynicotine.slskmessages import ConnectToPeer
from pynicotine.slskmessages import DistribBranchLevel
from pynicotine.slskmessages import DistribBranchRoot
from pynicotine.slskmessages import DistribEmbeddedMessage
from pynicotine.slskmessages import DistribSearch
from pynicotine.slskmessages import DownloadFile
from pynicotine.slskmessages import EmbeddedMessage
from pynicotine.slskmessages import FileOffset
from pynicotine.slskmessages import FileDownloadInit
from pynicotine.slskmessages import FileUploadInit
from pynicotine.slskmessages import FileSearchResponse
from pynicotine.slskmessages import GetPeerAddress
from pynicotine.slskmessages import GetUserStats
from pynicotine.slskmessages import GetUserStatus
from pynicotine.slskmessages import HaveNoParent
from pynicotine.slskmessages import InitPeerConnection
from pynicotine.slskmessages import Login
from pynicotine.slskmessages import MessageType
from pynicotine.slskmessages import PossibleParents
from pynicotine.slskmessages import ParentMinSpeed
from pynicotine.slskmessages import ParentSpeedRatio
from pynicotine.slskmessages import PeerInit
from pynicotine.slskmessages import PierceFireWall
from pynicotine.slskmessages import Relogged
from pynicotine.slskmessages import ResetDistributed
from pynicotine.slskmessages import RoomList
from pynicotine.slskmessages import SendNetworkMessage
from pynicotine.slskmessages import ServerConnect
from pynicotine.slskmessages import ServerDisconnect
from pynicotine.slskmessages import SetDownloadLimit
from pynicotine.slskmessages import SetUploadLimit
from pynicotine.slskmessages import SetWaitPort
from pynicotine.slskmessages import SharedFileListResponse
from pynicotine.slskmessages import UploadFile
from pynicotine.slskmessages import UserInfoResponse
from pynicotine.slskmessages import UserStatus
from pynicotine.slskmessages import increment_token
from pynicotine.upnp import UPnP
# Set the maximum number of open files to the hard limit reported by the OS.
# Our MAXSOCKETS value needs to be lower than the file limit, otherwise our open
# sockets in combination with other file activity can exceed the file limit,
# effectively halting the program.
if sys.platform == "win32":
# For Windows, FD_SETSIZE is set to 512 in the Python source.
# This limit is hardcoded, so we'll have to live with it for now.
MAXSOCKETS = 512
else:
import resource # pylint: disable=import-error
if sys.platform == "darwin":
# Maximum number of files a process can open is 10240 on macOS.
# macOS reports INFINITE as hard limit, so we need this special case.
MAXFILELIMIT = 10240
else:
_SOFTLIMIT, MAXFILELIMIT = resource.getrlimit(resource.RLIMIT_NOFILE) # pylint: disable=no-member
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (MAXFILELIMIT, MAXFILELIMIT)) # pylint: disable=no-member
except Exception as rlimit_error:
log.add("Failed to set RLIMIT_NOFILE: %s", rlimit_error)
# Set the maximum number of open sockets to a lower value than the hard limit,
# otherwise we just waste resources.
# The maximum is 3072, but can be lower if the file limit is too low.
MAXSOCKETS = min(max(int(MAXFILELIMIT * 0.75), 50), 3072)
SIOCGIFADDR = 0x8915 if sys.platform == "linux" else 0xc0206921 # 0xc0206921 for *BSD, macOS
UINT_UNPACK = struct.Struct("<I").unpack
DOUBLE_UINT_UNPACK = struct.Struct("<II").unpack
class Connection:
""" Holds data about a connection. sock is a socket object,
addr is (ip, port) pair, ibuf and obuf are input and output msgBuffer,
init is a PeerInit object (see slskmessages docstrings). """
__slots__ = ("sock", "addr", "selector_events", "ibuf", "obuf", "lastactive", "lastreadlength")
def __init__(self, sock=None, addr=None, selector_events=None):
self.sock = sock
self.addr = addr
self.selector_events = selector_events
self.ibuf = bytearray()
self.obuf = bytearray()
self.lastactive = time.time()
self.lastreadlength = 100 * 1024
class ServerConnection(Connection):
__slots__ = ("login",)
def __init__(self, sock=None, addr=None, selector_events=None, login=None):
super().__init__(sock, addr, selector_events)
self.login = login
class PeerConnection(Connection):
__slots__ = ("init", "fileinit", "filedown", "fileupl", "has_post_init_activity", "lastcallback")
def __init__(self, sock=None, addr=None, selector_events=None, init=None):
super().__init__(sock, addr, selector_events)
self.init = init
self.fileinit = None
self.filedown = None
self.fileupl = None
self.has_post_init_activity = False
self.lastcallback = time.time()
class SoulseekNetworkThread(Thread):
""" This is a networking thread that actually does all the communication.
It sends data to the core via a callback function and receives data via a deque object. """
""" The server and peers send each other small binary messages that start
with length and message code followed by the actual message data. """
IN_PROGRESS_STALE_AFTER = 2
CONNECTION_MAX_IDLE = 60
CONNECTION_MAX_IDLE_GHOST = 10
CONNECTION_BACKLOG_LENGTH = 4096
SOCKET_READ_BUFFER_SIZE = 1048576
SOCKET_WRITE_BUFFER_SIZE = 1048576
def __init__(self, queue, user_addresses):
""" queue is deque object that holds network messages from Core. """
super().__init__(name="SoulseekNetworkThread")
self.listenport = None
self.upnp = None
self._queue = queue
self._user_addresses = user_addresses
self._pending_init_msgs = {}
self._token_init_msgs = {}
self._username_init_msgs = {}
self._should_process_queue = False
self._want_abort = False
self._selector = None
self._listen_socket = None
self._listen_port_range = None
self._bound_ip = None
self._interface = None
self._server_socket = None
self._server_address = None
self._server_username = None
self._server_timer = None
self._server_timeout_value = -1
self._manual_server_disconnect = False
self._server_relogged = False
self._parent_socket = None
self._potential_parents = {}
self._distrib_parent_min_speed = 0
self._distrib_parent_speed_ratio = 1
self._max_distrib_children = 10
self._numsockets = 1
self._last_conn_stat_time = 0
self._conns = {}
self._connsinprogress = {}
self._out_indirect_conn_request_times = {}
self._conn_timeouts_timer_id = None
self._token = 0
self._calc_upload_limit_function = self._calc_upload_limit_none
self._upload_limit = 0
self._download_limit = 0
self._upload_limit_split = 0
self._download_limit_split = 0
self._ulimits = {}
self._dlimits = {}
self._total_uploads = 0
self._total_downloads = 0
self._total_download_bandwidth = 0
self._total_upload_bandwidth = 0
self._last_cycle_time = 0
self._current_cycle_loop_count = 0
self._last_cycle_loop_count = 0
self._loops_per_second = 0
for event_name, callback in (
("enable-message-queue", self._enable_message_queue),
("quit", self._quit),
("start", self.start)
):
events.connect(event_name, callback)
def _enable_message_queue(self):
self._queue.clear()
self._should_process_queue = True
def _quit(self):
self._want_abort = True
""" General """
def _create_listen_socket(self):
self._listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.SOCKET_READ_BUFFER_SIZE)
self._listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.SOCKET_WRITE_BUFFER_SIZE)
self._listen_socket.setblocking(False)
if not self._bind_listen_port():
return False
self._selector.register(self._listen_socket, selectors.EVENT_READ)
return True
def _close_listen_socket(self):
if self._listen_socket is None:
return
self._selector.unregister(self._listen_socket)
self._close_socket(self._listen_socket, shutdown=False)
self._listen_socket = None
self.listenport = None
def _bind_listen_port(self):
if self._interface and not self._bound_ip:
try:
self._bind_to_network_interface(self._listen_socket, self._interface)
except OSError:
log.add(_("Specified network interface '%s' is not available"), self._interface,
title=_("Unknown Network Interface"))
return False
ip_address = self._bound_ip or "0.0.0.0"
for listenport in range(int(self._listen_port_range[0]), int(self._listen_port_range[1]) + 1):
try:
self._listen_socket.bind((ip_address, listenport))
self._listen_socket.listen(self.CONNECTION_BACKLOG_LENGTH)
self.listenport = listenport
log.add(_("Listening on port: %i"), listenport)
log.add_debug("Maximum number of concurrent connections (sockets): %i", MAXSOCKETS)
return True
except OSError as error:
log.add_debug("Cannot listen on port %(port)s: %(error)s", {"port": listenport, "error": error})
continue
log.add(_("No listening port is available in the specified port range %s–%s"), self._listen_port_range,
title=_("Listening Port Unavailable"))
return False
@staticmethod
def _get_interface_ip_address(if_name):
try:
import fcntl
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ip_if = fcntl.ioctl(sock.fileno(),
SIOCGIFADDR,
struct.pack("256s", if_name.encode()[:15]))
ip_address = socket.inet_ntoa(ip_if[20:24])
except ImportError:
ip_address = None
return ip_address
def _bind_to_network_interface(self, sock, if_name):
try:
if sys.platform == "linux":
sock.setsockopt(socket.SOL_SOCKET, 25, if_name.encode())
self._bound_ip = None
return
if sys.platform == "darwin":
sock.setsockopt(socket.IPPROTO_IP, 25, socket.if_nametoindex(if_name))
self._bound_ip = None
return
except PermissionError:
pass
# System does not support changing the network interface
# Retrieve the IP address of the interface, and bind to it instead
self._bound_ip = self._get_interface_ip_address(if_name)
def _find_local_ip_address(self):
# Create a UDP socket
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as local_socket:
# Use the interface we have selected
if self._bound_ip:
local_socket.bind((self._bound_ip, 0))
elif self._interface:
self._bind_to_network_interface(local_socket, self._interface)
try:
# Send a broadcast packet on a local address (doesn't need to be reachable,
# but MacOS requires port to be non-zero)
local_socket.connect(("10.255.255.255", 1))
# This returns the "primary" IP on the local box, even if that IP is a NAT/private/internal IP
ip_address = local_socket.getsockname()[0]
except OSError:
# Fall back to localhost
ip_address = "127.0.0.1"
return ip_address
def _server_connect(self, msg_obj):
""" We're connecting to the server """
if self._server_socket:
return
if sys.platform == "win32":
# TODO: support custom network interface on Windows
self._interface = None
else:
self._interface = msg_obj.interface
self._bound_ip = msg_obj.bound_ip
self._listen_port_range = msg_obj.listen_port_range
if not self._create_listen_socket():
self._should_process_queue = False
return
self._manual_server_disconnect = False
events.cancel_scheduled(self._server_timer)
ip_address, port = msg_obj.addr
log.add(_("Connecting to %(host)s:%(port)s"), {"host": ip_address, "port": port})
self._init_server_conn(msg_obj)
def _server_disconnect(self):
""" We're disconnecting from the server, clean up """
self._should_process_queue = False
self._bound_ip = self._interface = self._listen_port_range = self._server_socket = None
self._close_listen_socket()
self.upnp.cancel_timer()
for sock in self._conns.copy():
self._close_connection(self._conns, sock, callback=False)
for sock in self._connsinprogress.copy():
self._close_connection(self._connsinprogress, sock, callback=False)
self._queue.clear()
self._pending_init_msgs.clear()
self._token_init_msgs.clear()
self._username_init_msgs.clear()
events.cancel_scheduled(self._conn_timeouts_timer_id)
self._out_indirect_conn_request_times.clear()
if self._want_abort:
return
# Reset connection stats
events.emit_main_thread("set-connection-stats")
if not self._server_address:
# We didn't successfully establish a connection to the server
return
ip_address, port = self._server_address
log.add(
_("Disconnected from server %(host)s:%(port)s"), {
"host": ip_address,
"port": port
})
if self._server_relogged:
log.add(_("Someone logged in to your Soulseek account elsewhere"))
self._server_relogged = False
if not self._manual_server_disconnect:
self._set_server_timer()
self._server_address = None
self._server_username = None
events.emit_main_thread("server-disconnect", self._manual_server_disconnect)
def _server_timeout(self):
events.emit_main_thread("server-timeout")
def _set_server_timer(self):
if self._server_timeout_value == -1:
self._server_timeout_value = 15
elif 0 < self._server_timeout_value < 600:
self._server_timeout_value = self._server_timeout_value * 2
self._server_timer = events.schedule(delay=self._server_timeout_value, callback=self._server_timeout)
log.add(_("The server seems to be down or not responding, retrying in %i seconds"),
self._server_timeout_value)
""" File Transfers """
@staticmethod
def _is_upload(conn_obj):
return conn_obj.__class__ is PeerConnection and conn_obj.fileupl is not None
@staticmethod
def _is_download(conn_obj):
return conn_obj.__class__ is PeerConnection and conn_obj.filedown is not None
def _calc_upload_limit(self, limit_disabled=False, limit_per_transfer=False):
limit = self._upload_limit
loop_limit = 1024 # 1 KB/s is the minimum upload speed per transfer
if limit_disabled or limit < loop_limit:
self._upload_limit_split = 0
return
if not limit_per_transfer and self._total_uploads > 1:
limit = limit // self._total_uploads
self._upload_limit_split = int(limit)
def _calc_upload_limit_by_transfer(self):
return self._calc_upload_limit(limit_per_transfer=True)
def _calc_upload_limit_none(self):
return self._calc_upload_limit(limit_disabled=True)
def _calc_download_limit(self):
limit = self._download_limit
loop_limit = 1024 # 1 KB/s is the minimum download speed per transfer
if limit < loop_limit:
# Download limit disabled
self._download_limit_split = 0
return
if self._total_downloads > 1:
limit = limit // self._total_downloads
self._download_limit_split = int(limit)
def _calc_loops_per_second(self, current_time):
""" Calculate number of loops per second. This value is used to split the
per-second transfer speed limit evenly for each loop. """
if current_time - self._last_cycle_time >= 1:
self._loops_per_second = (self._last_cycle_loop_count + self._current_cycle_loop_count) // 2
self._last_cycle_loop_count = self._current_cycle_loop_count
self._last_cycle_time = current_time
self._current_cycle_loop_count = 0
else:
self._current_cycle_loop_count = self._current_cycle_loop_count + 1
def _set_conn_speed_limit(self, sock, limit, limits):
limit = limit // (self._loops_per_second or 1)
if limit > 0:
limits[sock] = limit
""" Connections """
def _check_indirect_connection_timeouts(self):
curtime = time.time()
if self._out_indirect_conn_request_times:
for init, request_time in self._out_indirect_conn_request_times.copy().items():
username = init.target_user
conn_type = init.conn_type
if (curtime - request_time) >= 20 and self._out_indirect_conn_request_times.pop(init, None):
log.add_conn(("Indirect connect request of type %(type)s to user %(user)s with "
"token %(token)s expired, giving up"), {
"type": conn_type,
"user": username,
"token": init.token
})
events.emit_main_thread("peer-connection-error", username, init.outgoing_msgs)
self._token_init_msgs.pop(init.token, None)
init.outgoing_msgs.clear()
@staticmethod
def _connection_still_active(conn_obj):
init = conn_obj.init
if init is not None and init.conn_type != "P":
# Distributed and file connections are critical, always assume they are active
return True
return len(conn_obj.obuf) > 0 or len(conn_obj.ibuf) > 0
def _has_existing_user_socket(self, user, conn_type):
prev_init = self._username_init_msgs.get(user + conn_type)
if prev_init is not None and prev_init.sock is not None:
return True
return False
def _add_init_message(self, init):
conn_type = init.conn_type
if conn_type == ConnectionType.FILE:
# File transfer connections are not unique or reused later
return
self._username_init_msgs[init.target_user + conn_type] = init
@staticmethod
def _pack_network_message(msg_obj):
try:
return msg_obj.make_network_message()
except Exception:
from traceback import format_exc
log.add("Unable to pack message type %(msg_type)s. %(error)s",
{"msg_type": msg_obj.__class__, "error": format_exc()})
return None
@staticmethod
def _unpack_network_message(msg_class, msg_buffer, msg_size, conn_type, conn=None):
try:
if conn is not None:
msg = msg_class(conn)
else:
msg = msg_class()
msg.parse_network_message(msg_buffer)
return msg
except Exception as error:
log.add_debug(("Unable to parse %(conn_type)s message type %(msg_type)s size %(size)i "
"contents %(msg_buffer)s: %(error)s"), {
"conn_type": conn_type,
"msg_type": msg_class,
"size": msg_size,
"msg_buffer": msg_buffer,
"error": error
})
return None
@staticmethod
def _unpack_embedded_message(msg):
""" This message embeds a distributed message. We unpack the distributed message and process it. """
if msg.distrib_code not in DISTRIBUTED_MESSAGE_CLASSES:
return None
distrib_class = DISTRIBUTED_MESSAGE_CLASSES[msg.distrib_code]
distrib_msg = distrib_class()
distrib_msg.parse_network_message(msg.distrib_message)
return distrib_msg
def emit_network_message_event(self, msg):
if msg is None:
return
log.add_msg_contents(msg)
event_name = NETWORK_MESSAGE_EVENTS.get(msg.__class__)
if event_name:
events.emit_main_thread(event_name, msg)
def _modify_connection_events(self, conn_obj, selector_events):
if conn_obj.selector_events != selector_events:
log.add_conn("Modifying selector events for connection to %(addr)s: %(events)s", {
"addr": conn_obj.addr,
"events": selector_events
})
self._selector.modify(conn_obj.sock, selector_events)
conn_obj.selector_events = selector_events
def _process_conn_messages(self, init):
""" A connection is established with the peer, time to queue up our peer
messages for delivery """
msgs = init.outgoing_msgs
for j in msgs:
j.init = init
self._queue.append(j)
msgs.clear()
@staticmethod
def _verify_peer_connection_type(conn_type):
if conn_type not in (ConnectionType.PEER, ConnectionType.FILE, ConnectionType.DISTRIBUTED):
log.add_conn("Unknown connection type %s", str(conn_type))
return False
return True
def _send_message_to_peer(self, user, message):
conn_type = message.msgtype
if not self._verify_peer_connection_type(conn_type):
return
# Check if there's already a connection for the specified username
init = self._username_init_msgs.get(user + conn_type)
if init is None and conn_type != ConnectionType.FILE:
# Check if we have a pending PeerInit message (currently requesting user IP address)
pending_init_msgs = self._pending_init_msgs.get(user, [])
for msg in pending_init_msgs:
if msg.conn_type == conn_type:
init = msg
break
log.add_conn("Sending message of type %(type)s to user %(user)s", {
"type": message.__class__,
"user": user
})
if init is not None:
log.add_conn("Found existing connection of type %(type)s for user %(user)s, using it.", {
"type": conn_type,
"user": user
})
init.outgoing_msgs.append(message)
if init.sock is not None:
# We have initiated a connection previously, and it's ready
self._process_conn_messages(init)
else:
# This is a new peer, initiate a connection
self._initiate_connection_to_peer(user, conn_type, message)
def _initiate_connection_to_peer(self, user, conn_type, message=None, in_address=None):
""" Prepare to initiate a connection with a peer """
init = PeerInit(init_user=self._server_username, target_user=user, conn_type=conn_type)
user_address = self._user_addresses.get(user)
if in_address is not None:
user_address = in_address
elif user_address is not None:
_ip_address, port = user_address
if port == 0:
# Port 0 means the user is likely bugged, ask the server for a new address
user_address = None
if message is not None:
init.outgoing_msgs.append(message)
if user_address is None:
if user not in self._pending_init_msgs:
self._pending_init_msgs[user] = []
self._pending_init_msgs[user].append(init)
self._queue.append(GetPeerAddress(user))
log.add_conn("Requesting address for user %(user)s", {
"user": user
})
else:
init.addr = user_address
self._connect_to_peer(user, user_address, init)
def _connect_to_peer(self, user, addr, init):
""" Initiate a connection with a peer """
conn_type = init.conn_type
if not self._verify_peer_connection_type(conn_type):
return
if self._has_existing_user_socket(user, conn_type):
log.add_conn(("Direct connection of type %(type)s to user %(user)s %(addr)s requested, "
"but existing connection already exists"), {
"type": conn_type,
"user": user,
"addr": addr
})
return
if not init.indirect:
# Also request indirect connection in case the user's port is closed
self._connect_to_peer_indirect(init)
self._add_init_message(init)
self._queue.append(InitPeerConnection(addr, init))
log.add_conn("Attempting direct connection of type %(type)s to user %(user)s %(addr)s", {
"type": conn_type,
"user": user,
"addr": addr
})
def _connect_error(self, error, conn_obj):
if conn_obj.sock is self._server_socket:
server_address, port = conn_obj.addr
log.add(
_("Cannot connect to server %(host)s:%(port)s: %(error)s"), {
"host": server_address,
"port": port,
"error": error
}
)
self._set_server_timer()
return
if not conn_obj.init.indirect:
log.add_conn("Direct connection of type %(type)s to user %(user)s failed. Error: %(error)s", {
"type": conn_obj.init.conn_type,
"user": conn_obj.init.target_user,
"error": error
})
return
if conn_obj.init in self._out_indirect_conn_request_times:
return
log.add_conn(
"Cannot respond to indirect connection request from user %(user)s. Error: %(error)s", {
"user": conn_obj.init.target_user,
"error": error
})
def _connect_to_peer_indirect(self, init):
""" Send a message to the server to ask the peer to connect to us (indirect connection) """
self._token = increment_token(self._token)
username = init.target_user
conn_type = init.conn_type
init.token = self._token
self._token_init_msgs[self._token] = init
self._out_indirect_conn_request_times[init] = time.time()
self._queue.append(ConnectToPeer(self._token, username, conn_type))
log.add_conn("Attempting indirect connection to user %(user)s with token %(token)s", {
"user": username,
"token": self._token
})
def _establish_outgoing_peer_connection(self, conn_obj):
sock = conn_obj.sock
self._conns[sock] = conn_obj
init = conn_obj.init
user = init.target_user
conn_type = init.conn_type
token = init.token
init.sock = sock
log.add_conn(("Established outgoing connection of type %(type)s with user %(user)s. List of "
"outgoing messages: %(messages)s"), {
"type": conn_type,
"user": user,
"messages": init.outgoing_msgs
})
if init.indirect:
log.add_conn(("Responding to indirect connection request of type %(type)s from "
"user %(user)s, token %(token)s"), {
"type": conn_type,
"user": user,
"token": token
})
self._queue.append(PierceFireWall(sock, token))
else:
# Direct connection established
log.add_conn("Sending PeerInit message of type %(type)s to user %(user)s", {
"type": conn_type,
"user": user
})
self._queue.append(init)
# Direct and indirect connections are attempted at the same time, clean up
self._token_init_msgs.pop(token, None)
if self._out_indirect_conn_request_times.pop(init, None):
log.add_conn(("Stopping indirect connection attempt of type %(type)s to user "
"%(user)s"), {
"type": conn_type,
"user": user
})
self._process_conn_messages(init)
def _establish_outgoing_server_connection(self, conn_obj):
self._conns[self._server_socket] = conn_obj
addr = conn_obj.addr
log.add(
_("Connected to server %(host)s:%(port)s, logging in…"), {
"host": addr[0],
"port": addr[1]
}
)
login, password = conn_obj.login
self._user_addresses[login] = (self._find_local_ip_address(), self.listenport)
conn_obj.login = True
self._server_address = addr
self._server_username = login
self._server_timeout_value = -1
self._queue.append(
Login(
login, password,
# Soulseek client version
# NS and SoulseekQt use 157
# We use a custom version number for Nicotine+
160,
# Soulseek client minor version
# 17 stands for 157 ns 13c, 19 for 157 ns 13e
# SoulseekQt seems to go higher than this
# We use a custom minor version for Nicotine+
1
)
)
self._queue.append(SetWaitPort(self.listenport))
def _replace_existing_connection(self, init):
user = init.target_user
conn_type = init.conn_type
if user == self._server_username or not self._has_existing_user_socket(user, conn_type):
return
log.add_conn("Discarding existing connection of type %(type)s to user %(user)s", {
"type": init.conn_type,
"user": user
})
prev_init = self._username_init_msgs[user + conn_type]
init.outgoing_msgs = prev_init.outgoing_msgs
prev_init.outgoing_msgs = []
self._close_connection(self._conns, prev_init.sock, callback=False)
@staticmethod
def _close_socket(sock, shutdown=True):
# In certain cases, a shutdown isn't possible, e.g. if a connection wasn't established
if shutdown:
try:
log.add_conn("Shutting down socket %s", sock)
sock.shutdown(socket.SHUT_RDWR)
except OSError as error:
log.add_conn("Failed to shut down socket %(sock)s: %(error)s", {
"sock": sock,
"error": error
})
try:
log.add_conn("Closing socket %s", sock)
sock.close()
except OSError as error:
log.add_conn("Failed to close socket %(sock)s: %(error)s", {
"sock": sock,
"error": error
})
def _close_connection(self, connection_list, sock, callback=True):
conn_obj = connection_list.pop(sock, None)
if conn_obj is None:
# Already removed
return
self._selector.unregister(sock)
self._close_socket(sock, shutdown=(connection_list != self._connsinprogress))
self._numsockets -= 1
if conn_obj.__class__ is ServerConnection:
# Disconnected from server, clean up connections and queue
self._server_disconnect()
return
init = conn_obj.init
if sock is self._parent_socket and self._should_process_queue:
self._send_have_no_parent()
elif self._is_download(conn_obj):
self._total_downloads -= 1
if not self._total_downloads:
self._total_download_bandwidth = 0
if callback: