From 80e100f8189645ea20f0b4775cfab8db5763e87f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 24 Jun 2021 18:49:51 -0400 Subject: [PATCH 01/11] Add our own "transport closed" signal This change some super old (and bad) code from the project's very early days. For some redic reason i must have thought masking `trio`'s internal stream / transport errors and a TCP EOF as `StopAsyncIteration` somehow a good idea. The reality is you probably want to know the difference between an unexpected transport error and a simple EOF lol. This begins to resolve that by adding our own special `TransportClosed` error to signal the "graceful" termination of a channel's underlying transport. Oh, and this builds on the `msgspec` integration which helped shed light on the core issues here B) --- tractor/_exceptions.py | 7 ++++++ tractor/_ipc.py | 55 ++++++++++++++++++++++++++++++------------ 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 61375904f..f6d9f4707 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -38,6 +38,10 @@ class InternalActorError(RemoteActorError): """ +class TransportClosed(trio.ClosedResourceError): + "Underlying channel transport was closed prior to use" + + class NoResult(RuntimeError): "No final result is expected for this actor" @@ -63,12 +67,15 @@ def pack_error(exc: BaseException) -> Dict[str, Any]: def unpack_error( + msg: Dict[str, Any], chan=None, err_type=RemoteActorError + ) -> Exception: """Unpack an 'error' message from the wire into a local ``RemoteActorError``. + """ tb_str = msg['error'].get('tb_str', '') return err_type( diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 9d34b3af0..5dc1a2ab0 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -10,7 +10,8 @@ from async_generator import asynccontextmanager from .log import get_logger -log = get_logger('ipc') +from ._exceptions import TransportClosed +log = get_logger(__name__) # :eyeroll: try: @@ -21,10 +22,17 @@ Unpacker = partial(msgpack.Unpacker, strict_map_key=False) -class MsgpackStream: - """A ``trio.SocketStream`` delivering ``msgpack`` formatted data. - """ - def __init__(self, stream: trio.SocketStream) -> None: +class MsgpackTCPStream: + '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data + using ``msgpack-python``. + + ''' + def __init__( + self, + stream: trio.SocketStream, + + ) -> None: + self.stream = stream assert self.stream.socket # should both be IP sockets @@ -35,7 +43,10 @@ def __init__(self, stream: trio.SocketStream) -> None: assert isinstance(rsockname, tuple) self._raddr = rsockname[:2] + # start and seed first entry to read loop self._agen = self._iter_packets() + # self._agen.asend(None) is None + self._send_lock = trio.StrictFIFOLock() async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: @@ -46,16 +57,13 @@ async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: use_list=False, ) while True: - try: - data = await self.stream.receive_some(2**10) - log.trace(f"received {data}") # type: ignore - except trio.BrokenResourceError: - log.warning(f"Stream connection {self.raddr} broke") - return + data = await self.stream.receive_some(2**10) + log.trace(f"received {data}") # type: ignore if data == b'': - log.debug(f"Stream connection {self.raddr} was closed") - return + raise TransportClosed( + f'transport {self} was already closed prior ro read' + ) unpacker.feed(data) for packet in unpacker: @@ -96,10 +104,11 @@ def __init__( on_reconnect: typing.Callable[..., typing.Awaitable] = None, auto_reconnect: bool = False, stream: trio.SocketStream = None, # expected to be active + ) -> None: self._recon_seq = on_reconnect self._autorecon = auto_reconnect - self.msgstream: Optional[MsgpackStream] = MsgpackStream( + self.msgstream: Optional[MsgpackTCPStream] = MsgpackTCPStream( stream) if stream else None if self.msgstream and destaddr: raise ValueError( @@ -112,6 +121,8 @@ def __init__( self._exc: Optional[Exception] = None self._agen = self._aiter_recv() + self._closed: bool = False + def __repr__(self) -> str: if self.msgstream: return repr( @@ -128,35 +139,47 @@ def raddr(self) -> Optional[Tuple[Any, ...]]: return self.msgstream.raddr if self.msgstream else None async def connect( - self, destaddr: Tuple[Any, ...] = None, + self, + destaddr: Tuple[Any, ...] = None, **kwargs + ) -> trio.SocketStream: + if self.connected(): raise RuntimeError("channel is already connected?") + destaddr = destaddr or self._destaddr assert isinstance(destaddr, tuple) stream = await trio.open_tcp_stream(*destaddr, **kwargs) - self.msgstream = MsgpackStream(stream) + self.msgstream = MsgpackTCPStream(stream) return stream async def send(self, item: Any) -> None: + log.trace(f"send `{item}`") # type: ignore assert self.msgstream + await self.msgstream.send(item) async def recv(self) -> Any: assert self.msgstream + try: return await self.msgstream.recv() + except trio.BrokenResourceError: if self._autorecon: await self._reconnect() return await self.recv() + raise + async def aclose(self) -> None: log.debug(f"Closing {self}") assert self.msgstream await self.msgstream.stream.aclose() + self._closed = True + log.error(f'CLOSING CHAN {self}') async def __aenter__(self): await self.connect() From 32b4ae06034d34d29bb2405781a339c3ef800ad7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 1 Jul 2021 07:44:03 -0400 Subject: [PATCH 02/11] Accept transport closed error during handshake and msg loop --- tractor/_actor.py | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 8601e8373..15fc78135 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -27,6 +27,7 @@ unpack_error, ModuleNotExposed, is_multi_cancelled, + TransportClosed, ) from . import _debug from ._discovery import get_arbiter @@ -202,7 +203,7 @@ def __init__( enable_modules: List[str] = [], uid: str = None, loglevel: str = None, - arbiter_addr: Optional[Tuple[str, int]] = None, + arbiter_addr: Optional[Tuple[str, int]] = (None, None), spawn_method: Optional[str] = None ) -> None: """This constructor is called in the parent actor **before** the spawning @@ -338,7 +339,18 @@ async def _stream_handler( # send/receive initial handshake response try: uid = await self._do_handshake(chan) - except StopAsyncIteration: + + except ( + trio.BrokenResourceError, + trio.ClosedResourceError, + TransportClosed, + ): + # XXX: This may propagate up from ``Channel._aiter_recv()`` + # and ``MsgpackStream._inter_packets()`` on a read from the + # stream particularly when the runtime is first starting up + # inside ``open_root_actor()`` where there is a check for + # a bound listener on the "arbiter" addr. the reset will be + # because the handshake was never meant took place. log.warning(f"Channel {chan} failed to handshake") return @@ -578,22 +590,39 @@ async def _process_messages( ) await self.cancel_rpc_tasks(chan) + except ( + TransportClosed, + trio.BrokenResourceError, + trio.ClosedResourceError + ): + # channels "breaking" is ok since we don't have a teardown + # handshake for them (yet) and instead we simply bail out + # of the message loop and expect the teardown sequence + # to clean up. + log.error(f"{chan} form {chan.uid} closed abruptly") + # raise + except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") + except (Exception, trio.MultiError) as err: # ship any "internal" exception (i.e. one from internal machinery # not from an rpc task) to parent log.exception("Actor errored:") if self._parent_chan: await self._parent_chan.send(pack_error(err)) - raise + # if this is the `MainProcess` we expect the error broadcasting # above to trigger an error at consuming portal "checkpoints" + raise + except trio.Cancelled: # debugging only log.debug(f"Msg loop was cancelled for {chan}") raise + finally: + # msg debugging for when he machinery is brokey log.debug( f"Exiting msg loop for {chan} from {chan.uid} " f"with last msg:\n{msg}") From b372f4c92b912ba0f63e0586afa3d9eb5b065fa3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 2 Jul 2021 11:55:16 -0400 Subject: [PATCH 03/11] Handle top level multierror that presents now? --- tests/test_cancellation.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index da181c604..5da87ce54 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -123,8 +123,15 @@ async def main(): assert exc_info.type == tractor.MultiError err = exc_info.value - assert len(err.exceptions) == num_subactors - for exc in err.exceptions: + exceptions = err.exceptions + + if len(exceptions) == 2: + # sometimes oddly now there's an embedded BrokenResourceError ? + exceptions = exceptions[1].exceptions + + assert len(exceptions) == num_subactors + + for exc in exceptions: assert isinstance(exc, tractor.RemoteActorError) assert exc.type == AssertionError From 85246d2df3fb60d61e27d1638372d024fdc0c7f9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 2 Jul 2021 11:56:14 -0400 Subject: [PATCH 04/11] Benign deps reorg --- setup.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index cde066d1e..df0e8af9c 100755 --- a/setup.py +++ b/setup.py @@ -38,13 +38,20 @@ 'tractor.testing', ], install_requires=[ + + # trio related 'trio>0.8', - 'msgpack', 'async_generator', + 'trio_typing', + + # tooling 'colorlog', 'wrapt', - 'trio_typing', 'pdbpp', + + # serialization + 'msgpack', + ], tests_require=['pytest'], python_requires=">=3.7", From a2d400583f7d25cd2eda0d04f6bd1389a6c61e20 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 2 Jul 2021 18:10:06 -0400 Subject: [PATCH 05/11] Fix tuple type --- tractor/_actor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 15fc78135..1eafa43ee 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -203,7 +203,7 @@ def __init__( enable_modules: List[str] = [], uid: str = None, loglevel: str = None, - arbiter_addr: Optional[Tuple[str, int]] = (None, None), + arbiter_addr: Optional[Tuple[str, int]] = None, spawn_method: Optional[str] = None ) -> None: """This constructor is called in the parent actor **before** the spawning From 1edf5c2f06f77432bc0d4198e177b4f420cd4be4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 3 Jul 2021 18:57:54 -0400 Subject: [PATCH 06/11] Specially remap TCP 104-connection-reset to `TransportClosed` Since we currently have no real "discovery protocol" between process trees, the current naive approach is to check via a connect and drop to see if a TCP server is bound to a particular address during root actor startup. This was a historical decision and had no real grounding beyond taking a simple approach to get something working when the project was first started. This is obviously problematic from an error handling perspective since we need to be able to avoid such quick connect-and-drops from cancelling an "arbiter"'s (registry actor's) channel-msg loop machinery (which would propagate and cancel the actor). For now we map this particular TCP error, which gets remapped by `trio` as a `trio.BrokenResourceError` to our own internal `TransportClosed` which is swallowed by channel message loop processing and indicates a graceful teardown of the far end actor. --- tractor/_actor.py | 24 ++++++++++-------------- tractor/_ipc.py | 27 ++++++++++++++++++++++++--- tractor/_root.py | 12 ++++++++++-- 3 files changed, 44 insertions(+), 19 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 1eafa43ee..95e85920a 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -263,7 +263,7 @@ def __init__( self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[ Tuple[Any, Any, Any, Any, Any]] = None - self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore + self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa async def wait_for_peer( self, uid: Tuple[str, str] @@ -341,8 +341,8 @@ async def _stream_handler( uid = await self._do_handshake(chan) except ( - trio.BrokenResourceError, - trio.ClosedResourceError, + # trio.BrokenResourceError, + # trio.ClosedResourceError, TransportClosed, ): # XXX: This may propagate up from ``Channel._aiter_recv()`` @@ -592,20 +592,16 @@ async def _process_messages( except ( TransportClosed, - trio.BrokenResourceError, - trio.ClosedResourceError ): - # channels "breaking" is ok since we don't have a teardown - # handshake for them (yet) and instead we simply bail out - # of the message loop and expect the teardown sequence - # to clean up. - log.error(f"{chan} form {chan.uid} closed abruptly") - # raise - - except trio.ClosedResourceError: - log.error(f"{chan} form {chan.uid} broke") + # channels "breaking" (for TCP streams by EOF or 104 + # connection-reset) is ok since we don't have a teardown + # handshake for them (yet) and instead we simply bail out of + # the message loop and expect the teardown sequence to clean + # up. + log.debug(f'channel from {chan.uid} closed abruptly:\n{chan}') except (Exception, trio.MultiError) as err: + # ship any "internal" exception (i.e. one from internal machinery # not from an rpc task) to parent log.exception("Actor errored:") diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 5dc1a2ab0..ec8981f92 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -57,7 +57,26 @@ async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: use_list=False, ) while True: - data = await self.stream.receive_some(2**10) + + try: + data = await self.stream.receive_some(2**10) + + except trio.BrokenResourceError as err: + msg = err.args[0] + + # XXX: handle connection-reset-by-peer the same as a EOF. + # we're currently remapping this since we allow + # a quick connect then drop for root actors when + # checking to see if there exists an "arbiter" + # on the chosen sockaddr (``_root.py:108`` or thereabouts) + if '[Errno 104]' in msg: + raise TransportClosed( + f'{self} was broken with {msg}' + ) + + else: + raise + log.trace(f"received {data}") # type: ignore if data == b'': @@ -175,11 +194,13 @@ async def recv(self) -> Any: raise async def aclose(self) -> None: - log.debug(f"Closing {self}") + log.debug( + f'Closing channel to {self.uid} ' + f'{self.laddr} -> {self.raddr}' + ) assert self.msgstream await self.msgstream.stream.aclose() self._closed = True - log.error(f'CLOSING CHAN {self}') async def __aenter__(self): await self.connect() diff --git a/tractor/_root.py b/tractor/_root.py index 8f4eb9ab3..63152b06e 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -105,6 +105,11 @@ async def open_root_actor( arbiter_found = False try: + # TODO: this connect-and-bail forces us to have to carefully + # rewrap TCP 104-connection-reset errors as EOF so as to avoid + # propagating cancel-causing errors to the channel-msg loop + # machinery. Likely it would be better to eventually have + # a "discovery" protocol with basic handshake instead. async with _connect_chan(host, port): arbiter_found = True @@ -174,8 +179,11 @@ async def open_root_actor( finally: logger.info("Shutting down root actor") - with trio.CancelScope(shield=True): - await actor.cancel() + try: + with trio.CancelScope(shield=True): + await actor.cancel() + except Exception as err: + log.warning('Root was already cancelled') finally: _state._current_actor = None logger.info("Root actor terminated") From 3f75732b0224e6ad57f36a46628372610cc92802 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 4 Jul 2021 10:25:19 -0400 Subject: [PATCH 07/11] Remap windows specific connection reset error --- tractor/_ipc.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index ec8981f92..d6afc3371 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -69,7 +69,13 @@ async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: # a quick connect then drop for root actors when # checking to see if there exists an "arbiter" # on the chosen sockaddr (``_root.py:108`` or thereabouts) - if '[Errno 104]' in msg: + if ( + # nix + '[Errno 104]' in msg or + + # windows + '[WinError 10054]' in msg + ): raise TransportClosed( f'{self} was broken with {msg}' ) From 9c9309faf86757af7deb08b6cf5205b4652a52e5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 4 Jul 2021 10:25:41 -0400 Subject: [PATCH 08/11] Handle race for tty by child actors --- tests/test_debugger.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 8f850dfa4..f7d00979b 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -297,15 +297,30 @@ def test_multi_daemon_subactors(spawn, loglevel): child.expect(r"\(Pdb\+\+\)") + # there is a race for which subactor will acquire + # the root's tty lock first + before = str(child.before.decode()) - assert "Attaching pdb to actor: ('bp_forever'" in before + + bp_forever_msg = "Attaching pdb to actor: ('bp_forever'" + name_error_msg = "NameError" + + if bp_forever_msg in before: + next_msg = name_error_msg + + elif name_error_msg in before: + next_msg = bp_forever_msg + + else: + raise ValueError("Neither log msg was found !?") child.sendline('c') # first name_error failure child.expect(r"\(Pdb\+\+\)") before = str(child.before.decode()) - assert "NameError" in before + + assert next_msg in before child.sendline('c') From caa70245e0a489c9e3ddc643d68812c3d15b9b0d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 4 Jul 2021 10:47:15 -0400 Subject: [PATCH 09/11] Try remapping all broken errs wholesale on windows --- tractor/_ipc.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index d6afc3371..efe388e7c 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -1,6 +1,7 @@ """ Inter-process comms abstractions """ +import platform import typing from typing import Any, Tuple, Optional from functools import partial @@ -13,6 +14,9 @@ from ._exceptions import TransportClosed log = get_logger(__name__) + +_is_windows = platform.system() == 'Windows' + # :eyeroll: try: import msgpack_numpy @@ -73,8 +77,9 @@ async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: # nix '[Errno 104]' in msg or - # windows - '[WinError 10054]' in msg + # on windows it seems there are a variety of errors + # to handle.. + _is_windows ): raise TransportClosed( f'{self} was broken with {msg}' From 6aab16f877464d7adab54e182145ee6ab056c40a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 4 Jul 2021 11:00:08 -0400 Subject: [PATCH 10/11] Drop added logging around root cancel --- tractor/_root.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tractor/_root.py b/tractor/_root.py index 63152b06e..f5bd778ce 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -179,11 +179,8 @@ async def open_root_actor( finally: logger.info("Shutting down root actor") - try: - with trio.CancelScope(shield=True): - await actor.cancel() - except Exception as err: - log.warning('Root was already cancelled') + with trio.CancelScope(shield=True): + await actor.cancel() finally: _state._current_actor = None logger.info("Root actor terminated") From 55760b3fe0f90f9365d228532c95de0da7e04f45 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 4 Jul 2021 12:55:36 -0400 Subject: [PATCH 11/11] Only expect further message in non-name-error first case --- tests/test_debugger.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/test_debugger.py b/tests/test_debugger.py index f7d00979b..910e37ade 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -309,7 +309,7 @@ def test_multi_daemon_subactors(spawn, loglevel): next_msg = name_error_msg elif name_error_msg in before: - next_msg = bp_forever_msg + next_msg = None else: raise ValueError("Neither log msg was found !?") @@ -320,7 +320,8 @@ def test_multi_daemon_subactors(spawn, loglevel): child.expect(r"\(Pdb\+\+\)") before = str(child.before.decode()) - assert next_msg in before + if next_msg: + assert next_msg in before child.sendline('c') @@ -331,9 +332,10 @@ def test_multi_daemon_subactors(spawn, loglevel): try: child.sendline('c') child.expect(pexpect.EOF) + except pexpect.exceptions.TIMEOUT: - # Failed to exit using continue..? + # Failed to exit using continue..? child.sendline('q') child.expect(pexpect.EOF)