diff --git a/.gitignore b/.gitignore index e9fc737..a4a5cd3 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ sdist # Vim *.sw[op] *~ +.idea \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index e345425..a49608f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,8 +8,4 @@ before_install: - sudo apt-get install nodejs; node --version install: - pip install -U coverage requests six websocket-client nose - - npm install -G socket.io@1.7.2 -before_script: - - DEBUG=* node socketIO_client/tests/serve.js & - - sleep 1 -script: nosetests +script: bash travis_test.sh diff --git a/socketIO_client/__init__.py b/socketIO_client/__init__.py index fde3a91..f3daf3e 100644 --- a/socketIO_client/__init__.py +++ b/socketIO_client/__init__.py @@ -49,6 +49,7 @@ def __init__( self._wants_to_close = False atexit.register(self._close) self._transport_lock = Lock() + self.open_extra_packets = [] if Namespace: self.define(Namespace) @@ -61,7 +62,7 @@ def _transport(self): try: self._transport_lock.acquire() if not self._opened and not self._wants_to_close: - self._engineIO_session = self._get_engineIO_session() + self._engineIO_session, self.open_extra_packets = self._get_engineIO_session() self._negotiate_transport() self._connect_namespaces() self._opened = True @@ -73,12 +74,19 @@ def _transport(self): def _get_engineIO_session(self): warning_screen = self._yield_warning_screen() + session = None + remaining_packets = [] for elapsed_time in warning_screen: transport = XHR_PollingTransport( self._http_session, self._is_secure, self._url) try: - engineIO_packet_type, engineIO_packet_data = next( - transport.recv_packet()) + for engineIO_packet_type, engineIO_packet_data in transport.recv_packet(): + if session is None: + assert engineIO_packet_type == 0 # engineIO_packet_type == open + session = parse_engineIO_session(engineIO_packet_data) + continue + + remaining_packets.append((engineIO_packet_type, engineIO_packet_data)) break except (TimeoutError, ConnectionError) as e: if not self._wait_for_connection: @@ -86,8 +94,8 @@ def _get_engineIO_session(self): warning = Exception( '[engine.io waiting for connection] %s' % e) warning_screen.throw(warning) - assert engineIO_packet_type == 0 # engineIO_packet_type == open - return parse_engineIO_session(engineIO_packet_data) + + return session, remaining_packets def _negotiate_transport(self): self._transport_instance = self._get_transport('xhr-polling') @@ -381,6 +389,15 @@ def _connect_namespaces(self): if path: self.connect(path, with_transport_instance=True) + # Dirty way to handle changed socketio sequence where along with + # open packet can come other packets. In 1.x versions open was + # always a single packet. With 2.x it is possible that the first + # XHR response can contain other packets along with open. + # Proper solution would require comprehensive rewrite. + for packet in self.open_extra_packets: + self._process_packet(packet) + self.open_extra_packets = [] + def __exit__(self, *exception_pack): self.disconnect() super(SocketIO, self).__exit__(*exception_pack) diff --git a/socketIO_client/parsers.py b/socketIO_client/parsers.py index 7e043a4..4ab5867 100644 --- a/socketIO_client/parsers.py +++ b/socketIO_client/parsers.py @@ -38,17 +38,27 @@ def encode_engineIO_content(engineIO_packets): return content -def decode_engineIO_content(content): +def decode_engineIO_content(content, bw_comp=False): content_index = 0 content_length = len(content) while content_index < content_length: try: - content_index, packet_length = _read_packet_length( - content, content_index) + if bw_comp: + content_index, packet_length = _read_packet_length_bw_comp( + content, content_index) + else: + content_index, packet_length = _read_packet_length( + content, content_index) except IndexError: break - content_index, packet_text = _read_packet_text( - content, content_index, packet_length) + + if bw_comp: + content_index, packet_text = _read_packet_text_bw_comp( + content, content_index, packet_length) + else: + content_index, packet_text = _read_packet_text( + content, content_index, packet_length) + engineIO_packet_type, engineIO_packet_data = parse_packet_text( packet_text) yield engineIO_packet_type, engineIO_packet_data @@ -121,6 +131,15 @@ def _make_packet_prefix(packet): def _read_packet_length(content, content_index): + start = content_index + while content.decode('utf-8')[content_index] != ':': + content_index += 1 + packet_length_string = content.decode('utf-8')[start:content_index] + return content_index, int(packet_length_string) + + +# Backwards compatible version to support socketIO protocol version 1.x +def _read_packet_length_bw_comp(content, content_index): while get_byte(content, content_index) != 0: content_index += 1 content_index += 1 @@ -134,7 +153,16 @@ def _read_packet_length(content, content_index): def _read_packet_text(content, content_index, packet_length): + while content.decode('utf-8')[content_index] == ':': + content_index += 1 + packet_text = content.decode('utf-8')[content_index:content_index + packet_length] + return content_index + packet_length, packet_text.encode('utf-8') + + +# Backwards compatible version to support socketIO protocol version 1.x +def _read_packet_text_bw_comp(content, content_index, packet_length): while get_byte(content, content_index) == 255: content_index += 1 packet_text = content[content_index:content_index + packet_length] return content_index + packet_length, packet_text + diff --git a/socketIO_client/tests/__init__.py b/socketIO_client/tests/__init__.py index 41d68f4..dc3c2b8 100644 --- a/socketIO_client/tests/__init__.py +++ b/socketIO_client/tests/__init__.py @@ -62,6 +62,7 @@ def test_reconnect(self): def test_reconnect_with_namespace(self): 'Reconnect with namespace' + self.socketIO.disconnect() namespace = self.socketIO.define(Namespace) self.assertFalse('reconnect' in namespace.args_by_event) self.socketIO.connect() diff --git a/socketIO_client/transports.py b/socketIO_client/transports.py index 105591c..01145db 100644 --- a/socketIO_client/transports.py +++ b/socketIO_client/transports.py @@ -21,6 +21,7 @@ pip install -U websocket-client""") from .exceptions import ConnectionError, TimeoutError +from .logs import LoggingMixin from .parsers import ( encode_engineIO_content, decode_engineIO_content, format_packet_text, parse_packet_text) @@ -31,7 +32,7 @@ TRANSPORTS = 'xhr-polling', 'websocket' -class AbstractTransport(object): +class AbstractTransport(LoggingMixin): def __init__(self, http_session, is_secure, url, engineIO_session=None): self.http_session = http_session @@ -57,6 +58,7 @@ class XHR_PollingTransport(AbstractTransport): def __init__(self, http_session, is_secure, url, engineIO_session=None): super(XHR_PollingTransport, self).__init__( http_session, is_secure, url, engineIO_session) + self._log_name = "XHR_PollingTransport" self._params = { 'EIO': ENGINEIO_PROTOCOL, 'transport': 'polling'} if engineIO_session: @@ -75,6 +77,7 @@ def __init__(self, http_session, is_secure, url, engineIO_session=None): self._http_url = '%s://%s/' % (http_scheme, url) self._request_index_lock = threading.Lock() self._send_packet_lock = threading.Lock() + self._bw_comp = False def recv_packet(self): params = dict(self._params) @@ -84,9 +87,17 @@ def recv_packet(self): self._http_url, params=params, **self._kw_get) - for engineIO_packet in decode_engineIO_content(response.content): - engineIO_packet_type, engineIO_packet_data = engineIO_packet - yield engineIO_packet_type, engineIO_packet_data + try: + for engineIO_packet in decode_engineIO_content(response.content, self._bw_comp): + yield engineIO_packet + except UnicodeDecodeError: + if self._bw_comp: + raise + + self._info('Failed to decode packet, dropping to socketIO 1.x') + self._bw_comp = True + for engineIO_packet in decode_engineIO_content(response.content, self._bw_comp): + yield engineIO_packet def send_packet(self, engineIO_packet_type, engineIO_packet_data=''): with self._send_packet_lock: @@ -115,6 +126,7 @@ class WebsocketTransport(AbstractTransport): def __init__(self, http_session, is_secure, url, engineIO_session=None): super(WebsocketTransport, self).__init__( http_session, is_secure, url, engineIO_session) + self._log_name = "WebsocketTransport" params = dict(http_session.params, **{ 'EIO': ENGINEIO_PROTOCOL, 'transport': 'websocket'}) request = http_session.prepare_request(requests.Request('GET', url)) diff --git a/travis_test.sh b/travis_test.sh new file mode 100755 index 0000000..2aa15fa --- /dev/null +++ b/travis_test.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +set -e + +VERSION=1.7.4 +echo "Installing socket.io version $VERSION" +npm install -G socket.io@$VERSION +DEBUG=* node socketIO_client/tests/serve.js & +ID=$! +sleep 1 +nosetests + +echo "Stopping node server with PID $ID" +kill $ID + +VERSION=2.1.1 +echo "Installing socket.io version $VERSION" +npm install -G socket.io@$VERSION +DEBUG=* node socketIO_client/tests/serve.js & +ID=$! +sleep 1 +nosetests + +echo "Stopping node server with PID $ID" +kill $ID