Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Socketio 2.0 #6

Open
wants to merge 15 commits into
base: release
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ sdist
# Vim
*.sw[op]
*~
.idea
6 changes: 1 addition & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,4 @@ before_install:
- sudo apt-get install nodejs; node --version
install:
- pip install -U coverage requests six websocket-client nose
- npm install -G [email protected]
before_script:
- DEBUG=* node socketIO_client/tests/serve.js &
- sleep 1
script: nosetests
script: bash travis_test.sh
27 changes: 22 additions & 5 deletions socketIO_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(
self._wants_to_close = False
atexit.register(self._close)
self._transport_lock = Lock()
self.open_extra_packets = []

if Namespace:
self.define(Namespace)
Expand All @@ -61,7 +62,7 @@ def _transport(self):
try:
self._transport_lock.acquire()
if not self._opened and not self._wants_to_close:
self._engineIO_session = self._get_engineIO_session()
self._engineIO_session, self.open_extra_packets = self._get_engineIO_session()
self._negotiate_transport()
self._connect_namespaces()
self._opened = True
Expand All @@ -73,21 +74,28 @@ def _transport(self):

def _get_engineIO_session(self):
warning_screen = self._yield_warning_screen()
session = None
remaining_packets = []
for elapsed_time in warning_screen:
transport = XHR_PollingTransport(
self._http_session, self._is_secure, self._url)
try:
engineIO_packet_type, engineIO_packet_data = next(
transport.recv_packet())
for engineIO_packet_type, engineIO_packet_data in transport.recv_packet():
if session is None:
assert engineIO_packet_type == 0 # engineIO_packet_type == open
jupe marked this conversation as resolved.
Show resolved Hide resolved
session = parse_engineIO_session(engineIO_packet_data)
continue

remaining_packets.append((engineIO_packet_type, engineIO_packet_data))
break
except (TimeoutError, ConnectionError) as e:
if not self._wait_for_connection:
raise
warning = Exception(
'[engine.io waiting for connection] %s' % e)
warning_screen.throw(warning)
assert engineIO_packet_type == 0 # engineIO_packet_type == open
return parse_engineIO_session(engineIO_packet_data)

return session, remaining_packets

def _negotiate_transport(self):
self._transport_instance = self._get_transport('xhr-polling')
Expand Down Expand Up @@ -381,6 +389,15 @@ def _connect_namespaces(self):
if path:
self.connect(path, with_transport_instance=True)

# Dirty way to handle changed socketio sequence where along with
# open packet can come other packets. In 1.x versions open was
# always a single packet. With 2.x it is possible that the first
# XHR response can contain other packets along with open.
# Proper solution would require comprehensive rewrite.
for packet in self.open_extra_packets:
self._process_packet(packet)
self.open_extra_packets = []
jupe marked this conversation as resolved.
Show resolved Hide resolved

def __exit__(self, *exception_pack):
self.disconnect()
super(SocketIO, self).__exit__(*exception_pack)
Expand Down
38 changes: 33 additions & 5 deletions socketIO_client/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,27 @@ def encode_engineIO_content(engineIO_packets):
return content


def decode_engineIO_content(content):
def decode_engineIO_content(content, bw_comp=False):
content_index = 0
content_length = len(content)
while content_index < content_length:
try:
content_index, packet_length = _read_packet_length(
content, content_index)
if bw_comp:
content_index, packet_length = _read_packet_length_bw_comp(
content, content_index)
else:
content_index, packet_length = _read_packet_length(
content, content_index)
except IndexError:
break
content_index, packet_text = _read_packet_text(
content, content_index, packet_length)

if bw_comp:
content_index, packet_text = _read_packet_text_bw_comp(
content, content_index, packet_length)
else:
content_index, packet_text = _read_packet_text(
content, content_index, packet_length)

engineIO_packet_type, engineIO_packet_data = parse_packet_text(
packet_text)
yield engineIO_packet_type, engineIO_packet_data
Expand Down Expand Up @@ -121,6 +131,15 @@ def _make_packet_prefix(packet):


def _read_packet_length(content, content_index):
start = content_index
while content.decode('utf-8')[content_index] != ':':
content_index += 1
packet_length_string = content.decode('utf-8')[start:content_index]
return content_index, int(packet_length_string)


# Backwards compatible version to support socketIO protocol version 1.x
def _read_packet_length_bw_comp(content, content_index):
while get_byte(content, content_index) != 0:
content_index += 1
content_index += 1
Expand All @@ -134,7 +153,16 @@ def _read_packet_length(content, content_index):


def _read_packet_text(content, content_index, packet_length):
while content.decode('utf-8')[content_index] == ':':
content_index += 1
packet_text = content.decode('utf-8')[content_index:content_index + packet_length]
return content_index + packet_length, packet_text.encode('utf-8')


# Backwards compatible version to support socketIO protocol version 1.x
def _read_packet_text_bw_comp(content, content_index, packet_length):
while get_byte(content, content_index) == 255:
content_index += 1
packet_text = content[content_index:content_index + packet_length]
return content_index + packet_length, packet_text

1 change: 1 addition & 0 deletions socketIO_client/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def test_reconnect(self):

def test_reconnect_with_namespace(self):
'Reconnect with namespace'
self.socketIO.disconnect()
namespace = self.socketIO.define(Namespace)
self.assertFalse('reconnect' in namespace.args_by_event)
self.socketIO.connect()
Expand Down
20 changes: 16 additions & 4 deletions socketIO_client/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
pip install -U websocket-client""")

from .exceptions import ConnectionError, TimeoutError
from .logs import LoggingMixin
from .parsers import (
encode_engineIO_content, decode_engineIO_content,
format_packet_text, parse_packet_text)
Expand All @@ -31,7 +32,7 @@
TRANSPORTS = 'xhr-polling', 'websocket'


class AbstractTransport(object):
class AbstractTransport(LoggingMixin):

def __init__(self, http_session, is_secure, url, engineIO_session=None):
self.http_session = http_session
Expand All @@ -57,6 +58,7 @@ class XHR_PollingTransport(AbstractTransport):
def __init__(self, http_session, is_secure, url, engineIO_session=None):
super(XHR_PollingTransport, self).__init__(
http_session, is_secure, url, engineIO_session)
self._log_name = "XHR_PollingTransport"
self._params = {
'EIO': ENGINEIO_PROTOCOL, 'transport': 'polling'}
if engineIO_session:
Expand All @@ -75,6 +77,7 @@ def __init__(self, http_session, is_secure, url, engineIO_session=None):
self._http_url = '%s://%s/' % (http_scheme, url)
self._request_index_lock = threading.Lock()
self._send_packet_lock = threading.Lock()
self._bw_comp = False

def recv_packet(self):
params = dict(self._params)
Expand All @@ -84,9 +87,17 @@ def recv_packet(self):
self._http_url,
params=params,
**self._kw_get)
for engineIO_packet in decode_engineIO_content(response.content):
engineIO_packet_type, engineIO_packet_data = engineIO_packet
yield engineIO_packet_type, engineIO_packet_data
try:
for engineIO_packet in decode_engineIO_content(response.content, self._bw_comp):
yield engineIO_packet
except UnicodeDecodeError:
if self._bw_comp:
raise

self._info('Failed to decode packet, dropping to socketIO 1.x')
self._bw_comp = True
for engineIO_packet in decode_engineIO_content(response.content, self._bw_comp):
yield engineIO_packet

def send_packet(self, engineIO_packet_type, engineIO_packet_data=''):
with self._send_packet_lock:
Expand Down Expand Up @@ -115,6 +126,7 @@ class WebsocketTransport(AbstractTransport):
def __init__(self, http_session, is_secure, url, engineIO_session=None):
super(WebsocketTransport, self).__init__(
http_session, is_secure, url, engineIO_session)
self._log_name = "WebsocketTransport"
params = dict(http_session.params, **{
'EIO': ENGINEIO_PROTOCOL, 'transport': 'websocket'})
request = http_session.prepare_request(requests.Request('GET', url))
Expand Down
25 changes: 25 additions & 0 deletions travis_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash

set -e

VERSION=1.7.4
echo "Installing socket.io version $VERSION"
npm install -G socket.io@$VERSION
DEBUG=* node socketIO_client/tests/serve.js &
ID=$!
sleep 1
nosetests

echo "Stopping node server with PID $ID"
kill $ID

VERSION=2.1.1
echo "Installing socket.io version $VERSION"
npm install -G socket.io@$VERSION
DEBUG=* node socketIO_client/tests/serve.js &
ID=$!
sleep 1
nosetests

echo "Stopping node server with PID $ID"
kill $ID