From 5ac03fe318d424749bd0fa32c1c9b8578a86517f Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 27 Sep 2014 20:57:43 +0300 Subject: [PATCH] Fix loopless trasport implementation --- CHANGES.txt | 7 ++++++- aiozmq/__init__.py | 2 +- aiozmq/core.py | 13 ++++++++++++- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 1235d3d..a3bac66 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,7 +1,12 @@ CHANGES ------- -0.5.0 (XXXX-XX-XX) +0.5.1 (2014-09-27) +^^^^^^^^^^^^^^^^^^ + +* Fix loopless transport implementation. + +0.5.0 (2014-08-23) ^^^^^^^^^^^^^^^^^^ * Support zmq devices in aiozmq.rpc.serve_rpc() diff --git a/aiozmq/__init__.py b/aiozmq/__init__.py index 718998b..e4554be 100644 --- a/aiozmq/__init__.py +++ b/aiozmq/__init__.py @@ -13,7 +13,7 @@ 'create_zmq_connection', 'version_info', 'version') -__version__ = '0.5.0' +__version__ = '0.5.1' version = __version__ + ' , Python ' + sys.version diff --git a/aiozmq/core.py b/aiozmq/core.py index 12ac63f..b68e462 100644 --- a/aiozmq/core.py +++ b/aiozmq/core.py @@ -567,8 +567,10 @@ def __init__(self, loop, zmq_type, zmq_sock, protocol, waiter): self._loop.call_soon(self._protocol.connection_made, self) self._loop.call_soon(waiter.set_result, None) + self._soon_call = None def _read_ready(self): + self._soon_call = None if self._zmq_sock is None: return events = self._zmq_sock.getsockopt(zmq.EVENTS) @@ -589,7 +591,7 @@ def _read_ready(self): else: schedule = False if schedule: - self._loop.call_soon(self._read_ready) + self._soon_call = self._loop.call_soon(self._read_ready) def _do_read(self): try: @@ -628,6 +630,9 @@ def _do_write(self): if not self._buffer and self._closing: self._loop.remove_reader(self._fd) self._call_connection_lost(None) + else: + if self._soon_call is None: + self._soon_call = self._loop.call_soon(self._read_ready) def _do_send(self, data): try: @@ -666,6 +671,12 @@ def _do_pause_reading(self): def _do_resume_reading(self): self._read_ready() + def _call_connection_lost(self, exc): + try: + super()._call_connection_lost(exc) + finally: + self._soon_call = None + class ZmqEventLoopPolicy(asyncio.AbstractEventLoopPolicy): """ZeroMQ policy implementation for accessing the event loop.