diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 414485953..e3daff5dd 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -326,16 +326,19 @@ async def spawn_and_error(breadth, depth) -> None: ) kwargs = { 'name': f'{name}_errorer_{i}', + # 'delay': 1, } await nursery.run_in_actor(*args, **kwargs) @tractor_test async def test_nested_multierrors(loglevel, start_method): - """Test that failed actor sets are wrapped in `trio.MultiError`s. + ''' + Test that failed actor sets are wrapped in `trio.MultiError`s. This test goes only 2 nurseries deep but we should eventually have tests for arbitrary n-depth actor trees. - """ + + ''' if start_method == 'trio': depth = 3 subactor_breadth = 2 @@ -359,6 +362,7 @@ async def test_nested_multierrors(loglevel, start_method): breadth=subactor_breadth, depth=depth, ) + except trio.MultiError as err: assert len(err.exceptions) == subactor_breadth for subexc in err.exceptions: @@ -394,16 +398,13 @@ async def test_nested_multierrors(loglevel, start_method): assert isinstance(subexc, tractor.RemoteActorError) if depth > 0 and subactor_breadth > 1: - # XXX not sure what's up with this.. - # on windows sometimes spawning is just too slow and - # we get back the (sent) cancel signal instead - if platform.system() == 'Windows': - if isinstance(subexc, tractor.RemoteActorError): - assert subexc.type in (trio.MultiError, tractor.RemoteActorError) - else: - assert isinstance(subexc, trio.MultiError) + # XXX it's race whether or not a parent containing + # a nursery *may* get multiple child failures before + # it cancels and tears down. + if isinstance(subexc, tractor.RemoteActorError): + assert subexc.type in (trio.MultiError, tractor.RemoteActorError) else: - assert subexc.type is trio.MultiError + assert isinstance(subexc, trio.MultiError) else: assert subexc.type in (tractor.RemoteActorError, trio.Cancelled) @@ -486,9 +487,11 @@ def test_cancel_while_childs_child_in_sync_sleep( start_method, spawn_backend, ): - """Verify that a child cancelled while executing sync code is torn - down even when that cancellation is triggered by the parent + """ + Verify that a child cancelled while executing sync code is torn down + even when that cancellation is triggered by the parent 2 nurseries "up". + """ if start_method == 'forkserver': pytest.skip("Forksever sux hard at resuming from sync sleep...") @@ -500,7 +503,7 @@ async def main(): spawn, name='spawn', ) - await trio.sleep(1) + await trio.sleep(0.5) assert 0 with pytest.raises(AssertionError): diff --git a/tractor/_spawn.py b/tractor/_spawn.py index ead91df09..fa7137f57 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -57,7 +57,7 @@ from ._portal import Portal from ._actor import Actor from ._entry import _mp_main -from ._exceptions import ActorFailure +from ._exceptions import ActorFailure, NoResult log = get_logger('tractor') @@ -136,47 +136,74 @@ async def exhaust_portal( # always be established and shutdown using a context manager api final = await portal.result() - except (Exception, trio.MultiError) as err: + except ( + Exception, + trio.MultiError + ) as err: # we reraise in the parent task via a ``trio.MultiError`` return err + except trio.Cancelled as err: # lol, of course we need this too ;P # TODO: merge with above? log.warning(f"Cancelled result waiter for {portal.actor.uid}") return err + else: log.debug(f"Returning final result: {final}") return final +async def pack_and_report_errors( + portal: Portal, + subactor: Actor, + errors: Dict[Tuple[str, str], Exception], + +) -> Any: + + # if this call errors we store the exception for later + # in ``errors`` which will be reraised inside + # a MultiError and we still send out a cancel request + result = await exhaust_portal(portal, subactor) + + uid = portal.channel.uid + if ( + isinstance(result, Exception) + # or isinstance(result, trio.MultiError) + ): + errors[subactor.uid] = result + log.warning(f"{uid} received remote error:\n{result}") + raise result + + elif isinstance(result, trio.Cancelled): + errors[subactor.uid] = result + log.runtime(f"{uid} was cancelled before result") + + else: + log.runtime( f"{uid} received final result:\n{result}") + + return result + + async def cancel_on_completion( portal: Portal, - actor: Actor, + subactor: Actor, errors: Dict[Tuple[str, str], Exception], ) -> None: ''' - Cancel actor gracefully once it's "main" portal's + Cancel subactor gracefully once it's "main" portal's result arrives. Should only be called for actors spawned with `run_in_actor()`. ''' - # if this call errors we store the exception for later - # in ``errors`` which will be reraised inside - # a MultiError and we still send out a cancel request - result = await exhaust_portal(portal, actor) - if isinstance(result, Exception): - errors[actor.uid] = result - log.warning( - f"Cancelling {portal.channel.uid} after error {result}" - ) - - else: - log.runtime( - f"Cancelling {portal.channel.uid} gracefully " - f"after result {result}") + await pack_and_report_errors( + portal, + subactor, + errors, + ) # cancel the process now that we have a final result await portal.cancel_actor() @@ -344,8 +371,9 @@ async def new_proc( with trio.CancelScope(shield=True): await actor_nursery._join_procs.wait() + cancel_on_complete = portal in actor_nursery._cancel_after_result_on_exit async with trio.open_nursery() as nursery: - if portal in actor_nursery._cancel_after_result_on_exit: + if cancel_on_complete: nursery.start_soon( cancel_on_completion, portal, @@ -369,6 +397,11 @@ async def new_proc( f"{subactor.uid}") nursery.cancel_scope.cancel() + # if errors: + # log.warning( + # f'Remote errors retreived from child: {subactor.uid}') + # actor_nursery._ria_nursery.cancel_scope.cancel() + finally: # The "hard" reap since no actor zombies are allowed! # XXX: do this **after** cancellation/tearfown to avoid @@ -398,11 +431,30 @@ async def new_proc( else: log.warning('Nursery cancelled before sub-proc started') + uid = subactor.uid if not cancelled_during_spawn: - # pop child entry to indicate we no longer managing this - # subactor - actor_nursery._children.pop(subactor.uid) - + subactor, _, portal = actor_nursery._children.pop(uid) + + # check for a late delivery of an error from + # the target remote task and overwrite any cancel + # that was captured as part of teardown. + if cancel_on_complete: + error = errors.get(uid) + if type(error) is trio.Cancelled: + # actor was cancelled before it's final result was + # retreived so check now for any result and pack as + # an error to be raised in the surrounding + # nursery's multierror handling. + errors.pop(uid) + with trio.move_on_after(0.001) as cs: + cs.shield = True + err = await pack_and_report_errors( + portal, + subactor, + errors, + ) + if type(err) is trio.Cancelled: + errors.pop(uid) else: # `multiprocessing` # async with trio.open_nursery() as nursery: