From a13b3fe0a5e70c0cc5712ef265f0574c846781a2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 27 Apr 2021 11:35:16 -0400 Subject: [PATCH 1/7] Bump alpha version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index c693a7950..cde066d1e 100755 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ setup( name="tractor", - version='0.1.0a0', # first ever alpha + version='0.1.0a1', # first ever alpha description='structured concurrrent "actors"', long_description=readme, license='GPLv3', From be22a2526a6d5f7e1074dc72862a8a33121eba6f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 15 Nov 2020 23:54:42 -0500 Subject: [PATCH 2/7] Add `Actor.cancel_soon()` for sync self destruct Add a sync method that can be used to cancel the current actor from a synchronous context. This is useful in debugging situations where sync debugger code may need to kill the process tree. Also, make the internal "lifetime stack" a global var; easier to manage from client code that may was to add callbacks prior to the actor runtime being fully setup. --- tractor/_actor.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index f8f2bdd1c..32ca9608e 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -178,6 +178,10 @@ def _get_mod_abspath(module): return os.path.abspath(module.__file__) +# process-global stack closed at end on actor runtime teardown +_lifetime_stack: ExitStack = ExitStack() + + class Actor: """The fundamental concurrency primitive. @@ -192,7 +196,6 @@ class Actor: _root_n: Optional[trio.Nursery] = None _service_n: Optional[trio.Nursery] = None _server_n: Optional[trio.Nursery] = None - _lifetime_stack: ExitStack = ExitStack() # Information about `__main__` from parent _parent_main_data: Dict[str, str] @@ -545,8 +548,9 @@ async def _process_messages( # deadlock and other weird behaviour) if func != self.cancel: if isinstance(cs, Exception): - log.warning(f"Task for RPC func {func} failed with" - f"{cs}") + log.warning( + f"Task for RPC func {func} failed with" + f"{cs}") else: # mark that we have ongoing rpc tasks self._ongoing_rpc_tasks = trio.Event() @@ -784,7 +788,7 @@ async def _async_main( # tear down all lifetime contexts if not in guest mode # XXX: should this just be in the entrypoint? log.warning("Closing all actor lifetime contexts") - self._lifetime_stack.close() + _lifetime_stack.close() # Unregister actor from the arbiter if registered_with_arbiter and ( @@ -858,6 +862,14 @@ async def _serve_forever( # signal the server is down since nursery above terminated self._server_down.set() + def cancel_soon(self) -> None: + """Cancel this actor asap; can be called from a sync context. + + Schedules `.cancel()` to be run immediately just like when + cancelled by the parent. + """ + self._service_n.start_soon(self.cancel) + async def cancel(self) -> bool: """Cancel this actor. From c2a1612bf58b2ec7b566ec75b6fe12f3a11c7255 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 26 Apr 2021 16:14:45 -0400 Subject: [PATCH 3/7] Drop sync function support You can always wrap a sync function in an async one and there seems to be no good reason to support invoking them directly especially since cancellation won't work without some thread hackery. If it's requested we'll point users to `trio-parallel`. Resolves #77 --- tractor/_actor.py | 115 +++++++++++++++++++++++----------------------- 1 file changed, 58 insertions(+), 57 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 32ca9608e..174457b2a 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -20,7 +20,7 @@ from async_generator import aclosing from ._ipc import Channel -from ._streaming import Context, _context +from ._streaming import Context from .log import get_logger from ._exceptions import ( pack_error, @@ -58,11 +58,12 @@ async def _invoke( cs = None cancel_scope = trio.CancelScope() ctx = Context(chan, cid, cancel_scope) - _context.set(ctx) + if getattr(func, '_tractor_stream_function', False): # handle decorated ``@tractor.stream`` async functions kwargs['ctx'] = ctx treat_as_gen = True + try: is_async_partial = False is_async_gen_partial = False @@ -70,65 +71,55 @@ async def _invoke( is_async_partial = inspect.iscoroutinefunction(func.func) is_async_gen_partial = inspect.isasyncgenfunction(func.func) - if ( - not inspect.iscoroutinefunction(func) and - not inspect.isasyncgenfunction(func) and - not is_async_partial and - not is_async_gen_partial - ): - await chan.send({'functype': 'function', 'cid': cid}) + coro = func(**kwargs) + + if inspect.isasyncgen(coro): + await chan.send({'functype': 'asyncgen', 'cid': cid}) + # XXX: massive gotcha! If the containing scope + # is cancelled and we execute the below line, + # any ``ActorNursery.__aexit__()`` WON'T be + # triggered in the underlying async gen! So we + # have to properly handle the closing (aclosing) + # of the async gen in order to be sure the cancel + # is propagated! with cancel_scope as cs: task_status.started(cs) - await chan.send({'return': func(**kwargs), 'cid': cid}) + async with aclosing(coro) as agen: + async for item in agen: + # TODO: can we send values back in here? + # it's gonna require a `while True:` and + # some non-blocking way to retrieve new `asend()` + # values from the channel: + # to_send = await chan.recv_nowait() + # if to_send is not None: + # to_yield = await coro.asend(to_send) + await chan.send({'yield': item, 'cid': cid}) + + log.debug(f"Finished iterating {coro}") + # TODO: we should really support a proper + # `StopAsyncIteration` system here for returning a final + # value if desired + await chan.send({'stop': True, 'cid': cid}) else: - coro = func(**kwargs) - - if inspect.isasyncgen(coro): + if treat_as_gen: await chan.send({'functype': 'asyncgen', 'cid': cid}) - # XXX: massive gotcha! If the containing scope - # is cancelled and we execute the below line, - # any ``ActorNursery.__aexit__()`` WON'T be - # triggered in the underlying async gen! So we - # have to properly handle the closing (aclosing) - # of the async gen in order to be sure the cancel - # is propagated! + # XXX: the async-func may spawn further tasks which push + # back values like an async-generator would but must + # manualy construct the response dict-packet-responses as + # above with cancel_scope as cs: task_status.started(cs) - async with aclosing(coro) as agen: - async for item in agen: - # TODO: can we send values back in here? - # it's gonna require a `while True:` and - # some non-blocking way to retrieve new `asend()` - # values from the channel: - # to_send = await chan.recv_nowait() - # if to_send is not None: - # to_yield = await coro.asend(to_send) - await chan.send({'yield': item, 'cid': cid}) - - log.debug(f"Finished iterating {coro}") - # TODO: we should really support a proper - # `StopAsyncIteration` system here for returning a final - # value if desired - await chan.send({'stop': True, 'cid': cid}) + await coro + if not cs.cancelled_caught: + # task was not cancelled so we can instruct the + # far end async gen to tear down + await chan.send({'stop': True, 'cid': cid}) else: - if treat_as_gen: - await chan.send({'functype': 'asyncgen', 'cid': cid}) - # XXX: the async-func may spawn further tasks which push - # back values like an async-generator would but must - # manualy construct the response dict-packet-responses as - # above - with cancel_scope as cs: - task_status.started(cs) - await coro - if not cs.cancelled_caught: - # task was not cancelled so we can instruct the - # far end async gen to tear down - await chan.send({'stop': True, 'cid': cid}) - else: - await chan.send({'functype': 'asyncfunction', 'cid': cid}) - with cancel_scope as cs: - task_status.started(cs) - await chan.send({'return': await coro, 'cid': cid}) + # regular async function + await chan.send({'functype': 'asyncfunc', 'cid': cid}) + with cancel_scope as cs: + task_status.started(cs) + await chan.send({'return': await coro, 'cid': cid}) except (Exception, trio.MultiError) as err: @@ -151,9 +142,11 @@ async def _invoke( err_msg['cid'] = cid try: await chan.send(err_msg) + except trio.ClosedResourceError: log.warning( f"Failed to ship error to caller @ {chan.uid}") + if cs is None: # error is from above code not from rpc invocation task_status.started(err) @@ -400,22 +393,26 @@ async def _stream_handler( async def _push_result( self, chan: Channel, + cid: str, msg: Dict[str, Any], ) -> None: """Push an RPC result to the local consumer's queue. """ actorid = chan.uid assert actorid, f"`actorid` can't be {actorid}" - cid = msg['cid'] send_chan, recv_chan = self._cids2qs[(actorid, cid)] assert send_chan.cid == cid # type: ignore + if 'stop' in msg: log.debug(f"{send_chan} was terminated at remote end") + # indicate to consumer that far end has stopped return await send_chan.aclose() + try: log.debug(f"Delivering {msg} from {actorid} to caller {cid}") # maintain backpressure await send_chan.send(msg) + except trio.BrokenResourceError: # XXX: local consumer has closed their side # so cancel the far end streaming task @@ -491,9 +488,11 @@ async def _process_messages( log.trace( # type: ignore f"Received msg {msg} from {chan.uid}") - if msg.get('cid'): + + cid = msg.get('cid') + if cid: # deliver response to local caller/waiter - await self._push_result(chan, msg) + await self._push_result(chan, cid, msg) log.debug( f"Waiting on next msg for {chan} from {chan.uid}") continue @@ -945,11 +944,13 @@ async def _cancel_task(self, cid, chan): return scope.cancel() + # wait for _invoke to mark the task complete log.debug( f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") await is_complete.wait() + log.debug( f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") From 5798ef679676a5add27cb5ca763ddf5b1d17c225 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 27 Apr 2021 08:46:54 -0400 Subject: [PATCH 4/7] Enforce async funcs on callee side, convert arbiter methods --- tractor/_actor.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 174457b2a..3823455a7 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -64,12 +64,13 @@ async def _invoke( kwargs['ctx'] = ctx treat_as_gen = True + # errors raised inside this block are propgated back to caller try: - is_async_partial = False - is_async_gen_partial = False - if isinstance(func, partial): - is_async_partial = inspect.iscoroutinefunction(func.func) - is_async_gen_partial = inspect.isasyncgenfunction(func.func) + if not ( + inspect.isasyncgenfunction(func) or + inspect.iscoroutinefunction(func) + ): + raise TypeError(f'{func} must be an async function!') coro = func(**kwargs) @@ -1036,7 +1037,7 @@ def __init__(self, *args, **kwargs): self._waiters = {} super().__init__(*args, **kwargs) - def find_actor(self, name: str) -> Optional[Tuple[str, int]]: + async def find_actor(self, name: str) -> Optional[Tuple[str, int]]: for uid, sockaddr in self._registry.items(): if name in uid: return sockaddr @@ -1077,7 +1078,7 @@ async def wait_for_actor( return sockaddrs - def register_actor( + async def register_actor( self, uid: Tuple[str, str], sockaddr: Tuple[str, int] ) -> None: name, uuid = uid @@ -1090,5 +1091,5 @@ def register_actor( if isinstance(event, trio.Event): event.set() - def unregister_actor(self, uid: Tuple[str, str]) -> None: + async def unregister_actor(self, uid: Tuple[str, str]) -> None: self._registry.pop(uid, None) From 1f1619c7305a8be7b3531584a47adce3a9ed2514 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 27 Apr 2021 09:14:08 -0400 Subject: [PATCH 5/7] Convert all test suite sync funcs --- examples/actor_spawning_and_causality.py | 2 +- examples/actor_spawning_and_causality_with_daemon.py | 2 +- examples/parallelism/concurrent_actors_primes.py | 2 +- tests/test_cancellation.py | 2 +- tests/test_spawning.py | 4 ++-- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/examples/actor_spawning_and_causality.py b/examples/actor_spawning_and_causality.py index 0d577c0f4..ca793f117 100644 --- a/examples/actor_spawning_and_causality.py +++ b/examples/actor_spawning_and_causality.py @@ -1,7 +1,7 @@ import tractor -def cellar_door(): +async def cellar_door(): assert not tractor.is_root_process() return "Dang that's beautiful" diff --git a/examples/actor_spawning_and_causality_with_daemon.py b/examples/actor_spawning_and_causality_with_daemon.py index 1216af0b7..1ab0f88a3 100644 --- a/examples/actor_spawning_and_causality_with_daemon.py +++ b/examples/actor_spawning_and_causality_with_daemon.py @@ -1,7 +1,7 @@ import tractor -def movie_theatre_question(): +async def movie_theatre_question(): """A question asked in a dark theatre, in a tangent (errr, I mean different) process. """ diff --git a/examples/parallelism/concurrent_actors_primes.py b/examples/parallelism/concurrent_actors_primes.py index 3ff8dab60..a2aec90ca 100644 --- a/examples/parallelism/concurrent_actors_primes.py +++ b/examples/parallelism/concurrent_actors_primes.py @@ -29,7 +29,7 @@ ] -def is_prime(n): +async def is_prime(n): if n < 2: return False if n == 2: diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 458607d78..42aec354d 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -120,7 +120,7 @@ async def main(): assert exc.type == AssertionError -def do_nothing(): +async def do_nothing(): pass diff --git a/tests/test_spawning.py b/tests/test_spawning.py index a8da2c1d4..fb1e1bea5 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -51,7 +51,7 @@ def test_local_arbiter_subactor_global_state(arb_addr): assert result == 10 -def movie_theatre_question(): +async def movie_theatre_question(): """A question asked in a dark theatre, in a tangent (errr, I mean different) process. """ @@ -80,7 +80,7 @@ async def test_movie_theatre_convo(start_method): await portal.cancel_actor() -def cellar_door(): +async def cellar_door(): return "Dang that's beautiful" From 89ce1a63e48293b94af8125c7e558fd164a9ee2e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 27 Apr 2021 11:39:41 -0400 Subject: [PATCH 6/7] Only accept asyncfunc response type --- tractor/_portal.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 9ad4c2fd7..6d1c802c0 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -182,7 +182,7 @@ async def _submit( first_msg = await recv_chan.receive() functype = first_msg.get('functype') - if functype == 'function' or functype == 'asyncfunction': + if functype == 'asyncfunc': resp_type = 'return' elif functype == 'asyncgen': resp_type = 'yield' From d0eacc3fd6cd62264301b1a60adf0971bd723e45 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 27 Apr 2021 12:07:16 -0400 Subject: [PATCH 7/7] Appease mypy --- tractor/_actor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tractor/_actor.py b/tractor/_actor.py index 3823455a7..6333091b0 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -868,6 +868,7 @@ def cancel_soon(self) -> None: Schedules `.cancel()` to be run immediately just like when cancelled by the parent. """ + assert self._service_n self._service_n.start_soon(self.cancel) async def cancel(self) -> bool: