Skip to content

Commit

Permalink
Add initial bi-directional streaming
Browse files Browse the repository at this point in the history
This mostly adds the api described in
#53 (comment)

The first draft summary:
- formalize bidir steaming using the `trio.Channel` style interface
  which we derive as a `MsgStream` type.
- add `Portal.open_context()` which provides a `trio.Nursery.start()`
  remote task invocation style for setting up and tearing down tasks
  contexts in remote actors.
- add a distinct `'started'` message to the ipc protocol to facilitate
  `Context.start()` with a first return value.
- for our `ReceiveMsgStream` type, don't cancel the remote task in
  `.aclose()`; this is now done explicitly by the surrounding `Context`
   usage: `Context.cancel()`.
- streams in either direction still use a `'yield'` message keeping the
  proto mostly symmetric without having to worry about which side is the
  caller / portal opener.
- subtlety: only allow sending a `'stop'` message during a 2-way
  streaming context from `ReceiveStream.aclose()`, detailed comment
  with explanation is included.

Relates to #53
  • Loading branch information
goodboy committed May 7, 2021
1 parent 73e123b commit 1cbb35f
Show file tree
Hide file tree
Showing 3 changed files with 361 additions and 158 deletions.
95 changes: 71 additions & 24 deletions tractor/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import sys
import os
from contextlib import ExitStack
import warnings

import trio # type: ignore
from trio_typing import TaskStatus
Expand Down Expand Up @@ -57,13 +58,37 @@ async def _invoke(
treat_as_gen = False
cs = None
cancel_scope = trio.CancelScope()
ctx = Context(chan, cid, cancel_scope)
ctx = Context(chan, cid, _cancel_scope=cancel_scope)
context = False

if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions
sig = inspect.signature(func)
params = sig.parameters

# compat with old api
kwargs['ctx'] = ctx

if 'ctx' in params:
warnings.warn(
"`@tractor.stream decorated funcs should now declare "
"a `stream` arg, `ctx` is now designated for use with "
"@tractor.context",
DeprecationWarning,
stacklevel=2,
)

elif 'stream' in params:
assert 'stream' in params
kwargs['stream'] = ctx

treat_as_gen = True

elif getattr(func, '_tractor_context_function', False):
# handle decorated ``@tractor.context`` async function
kwargs['ctx'] = ctx
context = True

# errors raised inside this block are propgated back to caller
try:
if not (
Expand Down Expand Up @@ -101,26 +126,41 @@ async def _invoke(
# `StopAsyncIteration` system here for returning a final
# value if desired
await chan.send({'stop': True, 'cid': cid})

# one way @stream func that gets treated like an async gen
elif treat_as_gen:
await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must
# manualy construct the response dict-packet-responses as
# above
with cancel_scope as cs:
task_status.started(cs)
await coro

if not cs.cancelled_caught:
# task was not cancelled so we can instruct the
# far end async gen to tear down
await chan.send({'stop': True, 'cid': cid})

elif context:
# context func with support for bi-dir streaming
await chan.send({'functype': 'context', 'cid': cid})

with cancel_scope as cs:
task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid})

# if cs.cancelled_caught:
# # task was cancelled so relay to the cancel to caller
# await chan.send({'return': await coro, 'cid': cid})

else:
if treat_as_gen:
await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must
# manualy construct the response dict-packet-responses as
# above
with cancel_scope as cs:
task_status.started(cs)
await coro
if not cs.cancelled_caught:
# task was not cancelled so we can instruct the
# far end async gen to tear down
await chan.send({'stop': True, 'cid': cid})
else:
# regular async function
await chan.send({'functype': 'asyncfunc', 'cid': cid})
with cancel_scope as cs:
task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid})
# regular async function
await chan.send({'functype': 'asyncfunc', 'cid': cid})
with cancel_scope as cs:
task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid})

except (Exception, trio.MultiError) as err:

Expand Down Expand Up @@ -404,17 +444,23 @@ async def _push_result(
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
assert send_chan.cid == cid # type: ignore

if 'stop' in msg:
log.debug(f"{send_chan} was terminated at remote end")
# indicate to consumer that far end has stopped
return await send_chan.aclose()
# if 'stop' in msg:
# log.debug(f"{send_chan} was terminated at remote end")
# # indicate to consumer that far end has stopped
# return await send_chan.aclose()

try:
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
# maintain backpressure
await send_chan.send(msg)

except trio.BrokenResourceError:
# TODO: what is the right way to handle the case where the
# local task has already sent a 'stop' / StopAsyncInteration
# to the other side but and possibly has closed the local
# feeder mem chan? Do we wait for some kind of ack or just
# let this fail silently and bubble up (currently)?

# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{send_chan} consumer is already closed")
Expand Down Expand Up @@ -494,6 +540,7 @@ async def _process_messages(
if cid:
# deliver response to local caller/waiter
await self._push_result(chan, cid, msg)

log.debug(
f"Waiting on next msg for {chan} from {chan.uid}")
continue
Expand Down
70 changes: 59 additions & 11 deletions tractor/_portal.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,11 +312,20 @@ async def open_stream_from(

ctx = Context(self.channel, cid, _portal=self)
try:
async with ReceiveMsgStream(ctx, recv_chan, self) as rchan:
# deliver receive only stream
async with ReceiveMsgStream(ctx, recv_chan) as rchan:
self._streams.add(rchan)
yield rchan

finally:

# cancel the far end task on consumer close
# NOTE: this is a special case since we assume that if using
# this ``.open_fream_from()`` api, the stream is one a one
# time use and we couple the far end tasks's lifetime to
# the consumer's scope; we don't ever send a `'stop'`
# message right now since there shouldn't be a reason to
# stop and restart the stream, right?
try:
await ctx.cancel()
except trio.ClosedResourceError:
Expand All @@ -326,16 +335,55 @@ async def open_stream_from(

self._streams.remove(rchan)

# @asynccontextmanager
# async def open_context(
# self,
# func: Callable,
# **kwargs,
# ) -> Context:
# # TODO
# elif resptype == 'context': # context manager style setup/teardown
# # TODO likely not here though
# raise NotImplementedError
@asynccontextmanager
async def open_context(
self,
func: Callable,
**kwargs,
) -> Context:
"""Open an inter-actor task context.
This is a synchronous API which allows for deterministic
setup/teardown of a remote task. The yielded ``Context`` further
allows for opening bidirectional streams - see
``Context.open_stream()``.
"""
# conduct target func method structural checks
if not inspect.iscoroutinefunction(func) and (
getattr(func, '_tractor_contex_function', False)
):
raise TypeError(
f'{func} must be an async generator function!')

fn_mod_path, fn_name = func_deats(func)

cid, recv_chan, functype, first_msg = await self._submit(
fn_mod_path, fn_name, kwargs)

assert functype == 'context'

msg = await recv_chan.receive()
try:
# the "first" value here is delivered by the callee's
# ``Context.started()`` call.
first = msg['started']

except KeyError:
assert msg.get('cid'), ("Received internal error at context?")

if msg.get('error'):
# raise the error message
raise unpack_error(msg, self.channel)
else:
raise
try:
ctx = Context(self.channel, cid, _portal=self)
yield ctx, first

finally:
await recv_chan.aclose()
await ctx.cancel()


@dataclass
Expand Down
Loading

0 comments on commit 1cbb35f

Please sign in to comment.