diff --git a/examples/actor_spawning_and_causality.py b/examples/actor_spawning_and_causality.py index 0d577c0f4..ca793f117 100644 --- a/examples/actor_spawning_and_causality.py +++ b/examples/actor_spawning_and_causality.py @@ -1,7 +1,7 @@ import tractor -def cellar_door(): +async def cellar_door(): assert not tractor.is_root_process() return "Dang that's beautiful" diff --git a/examples/actor_spawning_and_causality_with_daemon.py b/examples/actor_spawning_and_causality_with_daemon.py index 1216af0b7..1ab0f88a3 100644 --- a/examples/actor_spawning_and_causality_with_daemon.py +++ b/examples/actor_spawning_and_causality_with_daemon.py @@ -1,7 +1,7 @@ import tractor -def movie_theatre_question(): +async def movie_theatre_question(): """A question asked in a dark theatre, in a tangent (errr, I mean different) process. """ diff --git a/examples/parallelism/concurrent_actors_primes.py b/examples/parallelism/concurrent_actors_primes.py index 3ff8dab60..a2aec90ca 100644 --- a/examples/parallelism/concurrent_actors_primes.py +++ b/examples/parallelism/concurrent_actors_primes.py @@ -29,7 +29,7 @@ ] -def is_prime(n): +async def is_prime(n): if n < 2: return False if n == 2: diff --git a/setup.py b/setup.py index c693a7950..cde066d1e 100755 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ setup( name="tractor", - version='0.1.0a0', # first ever alpha + version='0.1.0a1', # first ever alpha description='structured concurrrent "actors"', long_description=readme, license='GPLv3', diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 458607d78..42aec354d 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -120,7 +120,7 @@ async def main(): assert exc.type == AssertionError -def do_nothing(): +async def do_nothing(): pass diff --git a/tests/test_spawning.py b/tests/test_spawning.py index a8da2c1d4..fb1e1bea5 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -51,7 +51,7 @@ def test_local_arbiter_subactor_global_state(arb_addr): assert result == 10 -def movie_theatre_question(): +async def movie_theatre_question(): """A question asked in a dark theatre, in a tangent (errr, I mean different) process. """ @@ -80,7 +80,7 @@ async def test_movie_theatre_convo(start_method): await portal.cancel_actor() -def cellar_door(): +async def cellar_door(): return "Dang that's beautiful" diff --git a/tractor/_actor.py b/tractor/_actor.py index f8f2bdd1c..6333091b0 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -20,7 +20,7 @@ from async_generator import aclosing from ._ipc import Channel -from ._streaming import Context, _context +from ._streaming import Context from .log import get_logger from ._exceptions import ( pack_error, @@ -58,77 +58,69 @@ async def _invoke( cs = None cancel_scope = trio.CancelScope() ctx = Context(chan, cid, cancel_scope) - _context.set(ctx) + if getattr(func, '_tractor_stream_function', False): # handle decorated ``@tractor.stream`` async functions kwargs['ctx'] = ctx treat_as_gen = True + + # errors raised inside this block are propgated back to caller try: - is_async_partial = False - is_async_gen_partial = False - if isinstance(func, partial): - is_async_partial = inspect.iscoroutinefunction(func.func) - is_async_gen_partial = inspect.isasyncgenfunction(func.func) - - if ( - not inspect.iscoroutinefunction(func) and - not inspect.isasyncgenfunction(func) and - not is_async_partial and - not is_async_gen_partial + if not ( + inspect.isasyncgenfunction(func) or + inspect.iscoroutinefunction(func) ): - await chan.send({'functype': 'function', 'cid': cid}) + raise TypeError(f'{func} must be an async function!') + + coro = func(**kwargs) + + if inspect.isasyncgen(coro): + await chan.send({'functype': 'asyncgen', 'cid': cid}) + # XXX: massive gotcha! If the containing scope + # is cancelled and we execute the below line, + # any ``ActorNursery.__aexit__()`` WON'T be + # triggered in the underlying async gen! So we + # have to properly handle the closing (aclosing) + # of the async gen in order to be sure the cancel + # is propagated! with cancel_scope as cs: task_status.started(cs) - await chan.send({'return': func(**kwargs), 'cid': cid}) + async with aclosing(coro) as agen: + async for item in agen: + # TODO: can we send values back in here? + # it's gonna require a `while True:` and + # some non-blocking way to retrieve new `asend()` + # values from the channel: + # to_send = await chan.recv_nowait() + # if to_send is not None: + # to_yield = await coro.asend(to_send) + await chan.send({'yield': item, 'cid': cid}) + + log.debug(f"Finished iterating {coro}") + # TODO: we should really support a proper + # `StopAsyncIteration` system here for returning a final + # value if desired + await chan.send({'stop': True, 'cid': cid}) else: - coro = func(**kwargs) - - if inspect.isasyncgen(coro): + if treat_as_gen: await chan.send({'functype': 'asyncgen', 'cid': cid}) - # XXX: massive gotcha! If the containing scope - # is cancelled and we execute the below line, - # any ``ActorNursery.__aexit__()`` WON'T be - # triggered in the underlying async gen! So we - # have to properly handle the closing (aclosing) - # of the async gen in order to be sure the cancel - # is propagated! + # XXX: the async-func may spawn further tasks which push + # back values like an async-generator would but must + # manualy construct the response dict-packet-responses as + # above with cancel_scope as cs: task_status.started(cs) - async with aclosing(coro) as agen: - async for item in agen: - # TODO: can we send values back in here? - # it's gonna require a `while True:` and - # some non-blocking way to retrieve new `asend()` - # values from the channel: - # to_send = await chan.recv_nowait() - # if to_send is not None: - # to_yield = await coro.asend(to_send) - await chan.send({'yield': item, 'cid': cid}) - - log.debug(f"Finished iterating {coro}") - # TODO: we should really support a proper - # `StopAsyncIteration` system here for returning a final - # value if desired - await chan.send({'stop': True, 'cid': cid}) + await coro + if not cs.cancelled_caught: + # task was not cancelled so we can instruct the + # far end async gen to tear down + await chan.send({'stop': True, 'cid': cid}) else: - if treat_as_gen: - await chan.send({'functype': 'asyncgen', 'cid': cid}) - # XXX: the async-func may spawn further tasks which push - # back values like an async-generator would but must - # manualy construct the response dict-packet-responses as - # above - with cancel_scope as cs: - task_status.started(cs) - await coro - if not cs.cancelled_caught: - # task was not cancelled so we can instruct the - # far end async gen to tear down - await chan.send({'stop': True, 'cid': cid}) - else: - await chan.send({'functype': 'asyncfunction', 'cid': cid}) - with cancel_scope as cs: - task_status.started(cs) - await chan.send({'return': await coro, 'cid': cid}) + # regular async function + await chan.send({'functype': 'asyncfunc', 'cid': cid}) + with cancel_scope as cs: + task_status.started(cs) + await chan.send({'return': await coro, 'cid': cid}) except (Exception, trio.MultiError) as err: @@ -151,9 +143,11 @@ async def _invoke( err_msg['cid'] = cid try: await chan.send(err_msg) + except trio.ClosedResourceError: log.warning( f"Failed to ship error to caller @ {chan.uid}") + if cs is None: # error is from above code not from rpc invocation task_status.started(err) @@ -178,6 +172,10 @@ def _get_mod_abspath(module): return os.path.abspath(module.__file__) +# process-global stack closed at end on actor runtime teardown +_lifetime_stack: ExitStack = ExitStack() + + class Actor: """The fundamental concurrency primitive. @@ -192,7 +190,6 @@ class Actor: _root_n: Optional[trio.Nursery] = None _service_n: Optional[trio.Nursery] = None _server_n: Optional[trio.Nursery] = None - _lifetime_stack: ExitStack = ExitStack() # Information about `__main__` from parent _parent_main_data: Dict[str, str] @@ -397,22 +394,26 @@ async def _stream_handler( async def _push_result( self, chan: Channel, + cid: str, msg: Dict[str, Any], ) -> None: """Push an RPC result to the local consumer's queue. """ actorid = chan.uid assert actorid, f"`actorid` can't be {actorid}" - cid = msg['cid'] send_chan, recv_chan = self._cids2qs[(actorid, cid)] assert send_chan.cid == cid # type: ignore + if 'stop' in msg: log.debug(f"{send_chan} was terminated at remote end") + # indicate to consumer that far end has stopped return await send_chan.aclose() + try: log.debug(f"Delivering {msg} from {actorid} to caller {cid}") # maintain backpressure await send_chan.send(msg) + except trio.BrokenResourceError: # XXX: local consumer has closed their side # so cancel the far end streaming task @@ -488,9 +489,11 @@ async def _process_messages( log.trace( # type: ignore f"Received msg {msg} from {chan.uid}") - if msg.get('cid'): + + cid = msg.get('cid') + if cid: # deliver response to local caller/waiter - await self._push_result(chan, msg) + await self._push_result(chan, cid, msg) log.debug( f"Waiting on next msg for {chan} from {chan.uid}") continue @@ -545,8 +548,9 @@ async def _process_messages( # deadlock and other weird behaviour) if func != self.cancel: if isinstance(cs, Exception): - log.warning(f"Task for RPC func {func} failed with" - f"{cs}") + log.warning( + f"Task for RPC func {func} failed with" + f"{cs}") else: # mark that we have ongoing rpc tasks self._ongoing_rpc_tasks = trio.Event() @@ -784,7 +788,7 @@ async def _async_main( # tear down all lifetime contexts if not in guest mode # XXX: should this just be in the entrypoint? log.warning("Closing all actor lifetime contexts") - self._lifetime_stack.close() + _lifetime_stack.close() # Unregister actor from the arbiter if registered_with_arbiter and ( @@ -858,6 +862,15 @@ async def _serve_forever( # signal the server is down since nursery above terminated self._server_down.set() + def cancel_soon(self) -> None: + """Cancel this actor asap; can be called from a sync context. + + Schedules `.cancel()` to be run immediately just like when + cancelled by the parent. + """ + assert self._service_n + self._service_n.start_soon(self.cancel) + async def cancel(self) -> bool: """Cancel this actor. @@ -933,11 +946,13 @@ async def _cancel_task(self, cid, chan): return scope.cancel() + # wait for _invoke to mark the task complete log.debug( f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") await is_complete.wait() + log.debug( f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") @@ -1023,7 +1038,7 @@ def __init__(self, *args, **kwargs): self._waiters = {} super().__init__(*args, **kwargs) - def find_actor(self, name: str) -> Optional[Tuple[str, int]]: + async def find_actor(self, name: str) -> Optional[Tuple[str, int]]: for uid, sockaddr in self._registry.items(): if name in uid: return sockaddr @@ -1064,7 +1079,7 @@ async def wait_for_actor( return sockaddrs - def register_actor( + async def register_actor( self, uid: Tuple[str, str], sockaddr: Tuple[str, int] ) -> None: name, uuid = uid @@ -1077,5 +1092,5 @@ def register_actor( if isinstance(event, trio.Event): event.set() - def unregister_actor(self, uid: Tuple[str, str]) -> None: + async def unregister_actor(self, uid: Tuple[str, str]) -> None: self._registry.pop(uid, None) diff --git a/tractor/_portal.py b/tractor/_portal.py index 9ad4c2fd7..6d1c802c0 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -182,7 +182,7 @@ async def _submit( first_msg = await recv_chan.receive() functype = first_msg.get('functype') - if functype == 'function' or functype == 'asyncfunction': + if functype == 'asyncfunc': resp_type = 'return' elif functype == 'asyncgen': resp_type = 'yield'