Skip to content

Commit

Permalink
Merge pull request #215 from goodboy/transport_cleaning
Browse files Browse the repository at this point in the history
Transport cleaning: attempt to define our graceful channel close signal.
  • Loading branch information
goodboy authored Jul 6, 2021
2 parents 44d7988 + 55760b3 commit 4d530de
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 27 deletions.
11 changes: 9 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,20 @@
'tractor.testing',
],
install_requires=[

# trio related
'trio>0.8',
'msgpack',
'async_generator',
'trio_typing',

# tooling
'colorlog',
'wrapt',
'trio_typing',
'pdbpp',

# serialization
'msgpack',

],
tests_require=['pytest'],
python_requires=">=3.7",
Expand Down
11 changes: 9 additions & 2 deletions tests/test_cancellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,15 @@ async def main():

assert exc_info.type == tractor.MultiError
err = exc_info.value
assert len(err.exceptions) == num_subactors
for exc in err.exceptions:
exceptions = err.exceptions

if len(exceptions) == 2:
# sometimes oddly now there's an embedded BrokenResourceError ?
exceptions = exceptions[1].exceptions

assert len(exceptions) == num_subactors

for exc in exceptions:
assert isinstance(exc, tractor.RemoteActorError)
assert exc.type == AssertionError

Expand Down
23 changes: 20 additions & 3 deletions tests/test_debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,31 @@ def test_multi_daemon_subactors(spawn, loglevel):

child.expect(r"\(Pdb\+\+\)")

# there is a race for which subactor will acquire
# the root's tty lock first

before = str(child.before.decode())
assert "Attaching pdb to actor: ('bp_forever'" in before

bp_forever_msg = "Attaching pdb to actor: ('bp_forever'"
name_error_msg = "NameError"

if bp_forever_msg in before:
next_msg = name_error_msg

elif name_error_msg in before:
next_msg = None

else:
raise ValueError("Neither log msg was found !?")

child.sendline('c')

# first name_error failure
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "NameError" in before

if next_msg:
assert next_msg in before

child.sendline('c')

Expand All @@ -316,9 +332,10 @@ def test_multi_daemon_subactors(spawn, loglevel):
try:
child.sendline('c')
child.expect(pexpect.EOF)

except pexpect.exceptions.TIMEOUT:
# Failed to exit using continue..?

# Failed to exit using continue..?
child.sendline('q')
child.expect(pexpect.EOF)

Expand Down
35 changes: 30 additions & 5 deletions tractor/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
unpack_error,
ModuleNotExposed,
is_multi_cancelled,
TransportClosed,
)
from . import _debug
from ._discovery import get_arbiter
Expand Down Expand Up @@ -262,7 +263,7 @@ def __init__(
self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[
Tuple[Any, Any, Any, Any, Any]] = None
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa

async def wait_for_peer(
self, uid: Tuple[str, str]
Expand Down Expand Up @@ -338,7 +339,18 @@ async def _stream_handler(
# send/receive initial handshake response
try:
uid = await self._do_handshake(chan)
except StopAsyncIteration:

except (
# trio.BrokenResourceError,
# trio.ClosedResourceError,
TransportClosed,
):
# XXX: This may propagate up from ``Channel._aiter_recv()``
# and ``MsgpackStream._inter_packets()`` on a read from the
# stream particularly when the runtime is first starting up
# inside ``open_root_actor()`` where there is a check for
# a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place.
log.warning(f"Channel {chan} failed to handshake")
return

Expand Down Expand Up @@ -578,22 +590,35 @@ async def _process_messages(
)
await self.cancel_rpc_tasks(chan)

except trio.ClosedResourceError:
log.error(f"{chan} form {chan.uid} broke")
except (
TransportClosed,
):
# channels "breaking" (for TCP streams by EOF or 104
# connection-reset) is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out of
# the message loop and expect the teardown sequence to clean
# up.
log.debug(f'channel from {chan.uid} closed abruptly:\n{chan}')

except (Exception, trio.MultiError) as err:

# ship any "internal" exception (i.e. one from internal machinery
# not from an rpc task) to parent
log.exception("Actor errored:")
if self._parent_chan:
await self._parent_chan.send(pack_error(err))
raise

# if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints"
raise

except trio.Cancelled:
# debugging only
log.debug(f"Msg loop was cancelled for {chan}")
raise

finally:
# msg debugging for when he machinery is brokey
log.debug(
f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}")
Expand Down
7 changes: 7 additions & 0 deletions tractor/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class InternalActorError(RemoteActorError):
"""


class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use"


class NoResult(RuntimeError):
"No final result is expected for this actor"

Expand All @@ -63,12 +67,15 @@ def pack_error(exc: BaseException) -> Dict[str, Any]:


def unpack_error(

msg: Dict[str, Any],
chan=None,
err_type=RemoteActorError

) -> Exception:
"""Unpack an 'error' message from the wire
into a local ``RemoteActorError``.
"""
tb_str = msg['error'].get('tb_str', '')
return err_type(
Expand Down
85 changes: 70 additions & 15 deletions tractor/_ipc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Inter-process comms abstractions
"""
import platform
import typing
from typing import Any, Tuple, Optional
from functools import partial
Expand All @@ -10,7 +11,11 @@
from async_generator import asynccontextmanager

from .log import get_logger
log = get_logger('ipc')
from ._exceptions import TransportClosed
log = get_logger(__name__)


_is_windows = platform.system() == 'Windows'

# :eyeroll:
try:
Expand All @@ -21,10 +26,17 @@
Unpacker = partial(msgpack.Unpacker, strict_map_key=False)


class MsgpackStream:
"""A ``trio.SocketStream`` delivering ``msgpack`` formatted data.
"""
def __init__(self, stream: trio.SocketStream) -> None:
class MsgpackTCPStream:
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgpack-python``.
'''
def __init__(
self,
stream: trio.SocketStream,

) -> None:

self.stream = stream
assert self.stream.socket
# should both be IP sockets
Expand All @@ -35,7 +47,10 @@ def __init__(self, stream: trio.SocketStream) -> None:
assert isinstance(rsockname, tuple)
self._raddr = rsockname[:2]

# start and seed first entry to read loop
self._agen = self._iter_packets()
# self._agen.asend(None) is None

self._send_lock = trio.StrictFIFOLock()

async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
Expand All @@ -46,16 +61,39 @@ async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
use_list=False,
)
while True:

try:
data = await self.stream.receive_some(2**10)
log.trace(f"received {data}") # type: ignore
except trio.BrokenResourceError:
log.warning(f"Stream connection {self.raddr} broke")
return

except trio.BrokenResourceError as err:
msg = err.args[0]

# XXX: handle connection-reset-by-peer the same as a EOF.
# we're currently remapping this since we allow
# a quick connect then drop for root actors when
# checking to see if there exists an "arbiter"
# on the chosen sockaddr (``_root.py:108`` or thereabouts)
if (
# nix
'[Errno 104]' in msg or

# on windows it seems there are a variety of errors
# to handle..
_is_windows
):
raise TransportClosed(
f'{self} was broken with {msg}'
)

else:
raise

log.trace(f"received {data}") # type: ignore

if data == b'':
log.debug(f"Stream connection {self.raddr} was closed")
return
raise TransportClosed(
f'transport {self} was already closed prior ro read'
)

unpacker.feed(data)
for packet in unpacker:
Expand Down Expand Up @@ -96,10 +134,11 @@ def __init__(
on_reconnect: typing.Callable[..., typing.Awaitable] = None,
auto_reconnect: bool = False,
stream: trio.SocketStream = None, # expected to be active

) -> None:
self._recon_seq = on_reconnect
self._autorecon = auto_reconnect
self.msgstream: Optional[MsgpackStream] = MsgpackStream(
self.msgstream: Optional[MsgpackTCPStream] = MsgpackTCPStream(
stream) if stream else None
if self.msgstream and destaddr:
raise ValueError(
Expand All @@ -112,6 +151,8 @@ def __init__(
self._exc: Optional[Exception] = None
self._agen = self._aiter_recv()

self._closed: bool = False

def __repr__(self) -> str:
if self.msgstream:
return repr(
Expand All @@ -128,35 +169,49 @@ def raddr(self) -> Optional[Tuple[Any, ...]]:
return self.msgstream.raddr if self.msgstream else None

async def connect(
self, destaddr: Tuple[Any, ...] = None,
self,
destaddr: Tuple[Any, ...] = None,
**kwargs

) -> trio.SocketStream:

if self.connected():
raise RuntimeError("channel is already connected?")

destaddr = destaddr or self._destaddr
assert isinstance(destaddr, tuple)
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
self.msgstream = MsgpackStream(stream)
self.msgstream = MsgpackTCPStream(stream)
return stream

async def send(self, item: Any) -> None:

log.trace(f"send `{item}`") # type: ignore
assert self.msgstream

await self.msgstream.send(item)

async def recv(self) -> Any:
assert self.msgstream

try:
return await self.msgstream.recv()

except trio.BrokenResourceError:
if self._autorecon:
await self._reconnect()
return await self.recv()

raise

async def aclose(self) -> None:
log.debug(f"Closing {self}")
log.debug(
f'Closing channel to {self.uid} '
f'{self.laddr} -> {self.raddr}'
)
assert self.msgstream
await self.msgstream.stream.aclose()
self._closed = True

async def __aenter__(self):
await self.connect()
Expand Down
5 changes: 5 additions & 0 deletions tractor/_root.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ async def open_root_actor(
arbiter_found = False

try:
# TODO: this connect-and-bail forces us to have to carefully
# rewrap TCP 104-connection-reset errors as EOF so as to avoid
# propagating cancel-causing errors to the channel-msg loop
# machinery. Likely it would be better to eventually have
# a "discovery" protocol with basic handshake instead.
async with _connect_chan(host, port):
arbiter_found = True

Expand Down

0 comments on commit 4d530de

Please sign in to comment.