-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bidirectional streaming? #53
Comments
This is a really interesting idea. I'm not totally sure what to make of it, but it's very interesting. It seems like it would enable some kind of reactive-programming model across machines, which is a trippy thought. Like CycleJS for distributed systems. I'm not sure what the target application would be for such a system, because I'm remarkably uncreative. |
The more I think about this the more I think we should be adopting the multi-task style considerations being discussed in the following
The entire "channels" related issue list is also worth keeping and eye on: We already have this api to run an async func and have it stream but currently there's no way to have the caller send values to the calee. In essence we want the same streaming behavior as if you had to actors call each other and receive stream results but in a more compact single-context-channel thing API. The thought i had was something like the following based on our context api: @tractor.stream
async def streamer(
ctx: tractor.Context,
) -> None:
"""A simple web response streaming server.
"""
tx, rx = context.attach_stream()
while True:
url = await rx.recv()
val = await web_request(url)
# this is the same as ``yield`` in the async gen case
await tx.send(val)
async def caller(urls: List[str]):
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(streamer)
tx, rx = portal.attach_stream()
for url in urls:
await tx.send(url)
resp = await rx.recv() I guess this would give us a more mem chan style "select statement considered harmful" style api? We can then have per-side shutdown signalling just like I dunno, what do the lurkers think. |
Ok so thought some more about this designing some fairly involved persistent service system stuff and I think we probably want a bit more stringency in this api. second actor: the sub @tractor.context
async def streamer(
ctx: tractor.Context,
*args,
) -> None:
"""A simple web response streaming server.
"""
# do setup stuff
# signal to calling actor that we are synced and ready
# this value to `.started()` i returned from calling side's ``.__aenter__()``
await ctx.started(first_message_relayed_back_to_caller)
# this will error if we haven't yet called ``.started()`` above
# this entry blocks until the other side also opens a stream
async with context.open_stream() as send, recv:
while True:
url = await recv() # should this be clonable?
val = await web_request(url)
# this is the same as ``yield`` in the async gen case
await send(val) first actor: the parent / caller async def caller(urls: List[str]):
async with tractor.open_nursery() as n:
portal = n.start_actor('streamer')
# `.open_context()` here returns some kind of caller end `Context`
ctx = await portal.open_context(streamer)
async with ctx, ctx.open_stream() as (first_msg_from_started_call, (send, recv)):
for url in urls:
await send(url)
resp = await recv() # could be clonable? The nice part of enforcing a context manager style "session" on the calling side is that we guarantee setup/teardown sync stages with the remote task. In particular, we can raise appropriate errors if for whatever reason either end has "terminated" their block without having to wrangle it through channel / connection state and messages. Obviously this api also allows for just calling a function that acts like a context manager but doesn't have to initialize any streaming. In that case I'm not sure if the caller should also be required to |
Yah so the main question is how many nested ctx = await portal.open_context(stream)
async with ctx, ctx.open_stream() as send, recv:
# start streaming Might be the more explicit correct api since it distinguishes the pre-started sync point with the callee versus it being implied by Alternatively the stream could be opened without using |
Interesting simple example from import asyncio
from aiohttp import ClientSession
from arq import create_pool
from arq.connections import RedisSettings
async def download_content(ctx, url):
session: ClientSession = ctx['session']
async with session.get(url) as response:
content = await response.text()
print(f'{url}: {content:.80}...')
return len(content)
async def startup(ctx):
ctx['session'] = ClientSession()
async def shutdown(ctx):
await ctx['session'].close()
async def main():
redis = await create_pool(RedisSettings())
for url in ('https://facebook.com', 'https://microsoft.com', 'https://github.com'):
await redis.enqueue_job('download_content', url)
# WorkerSettings defines the settings to use when creating the work,
# it's used by the arq cli
class WorkerSettings:
functions = [download_content]
on_startup = startup
on_shutdown = shutdown
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main()) Basically what i've proposed but with setup/teardown inside the target func. We should probably mock this up exactly using |
Yeah I'm starting to think this might be the way to go and we should basically just build-out a send-side equivalent of our existing |
The last thing I have questions about is whether we need to support individual send vs. recv stream cloning. Cloning on a recv side stream makes obvious sense since it'll copy messages/packets to each consumer receiving from that stream (versus only one task getting the value and the others missing it) but with a send side I'm not so sure it's useful given the nature of an IPC oriented "channel". The following comes from some rubber ducking on the The many to many example in the
This is, kinda true. It definitely requires more stringent task bookkeeping since you need one set of tasks (consumers) to be distinctly separate from another sec (producers) in terms of closing each set's end of the channel. But, arguably you lose some supervision control; if you're interested in knowing which task set was the source of a failure you can't really. For example if you used the following code you don't really need cloning but it requires 2 extra nurseries and distinctly concurrent task set allocations such that either set can terminate and signal closing of their side of the channel without interfering with the other or requiring any task ordering or channel bookkeeping whatsoever: async def sender(tx):
async with tx:
async with trio.open_nursery() as t_n:
t_n.start_soon(send_stuff_with_tx, tx)
tx, rx = trio.open_memory_channel()
async with trio.open_nursery() as root_n:
root_n.start_soon(sender, tx)
async with rx:
async with trio.open_nursery() as r_n:
r_n.start_soon(recv_stuff_with_rx, rx) Following this example in detail you'll see it's the same result as with using Thanks to async def sender(tx, task_status):
async with tx, trio.open_nursery() as (_, t_n):
t_n.start_soon(send_stuff_with_tx, tx)
tx, rx = trio.open_memory_channel()
async with trio.open_nursery() as root_n:
root_n.start_soon(send, tx)
async with rx, trio.open_nursery() as (_, r_n):
r_n.start_soon(recv_stuff_with_rx, rx) which, to me, is definitely no more fragile, and I'm not convinced it's that much less simple: tx, rx = trio.open_memory_channel()
async with trio.open_nursery() as n:
n.start_soon(recv_stuff_with_rx, rx.clone())
n.start_soon(send_stuff_with_tx, tx.clone()) but obviously now the Yes, you have less nurseries and a flat task hierarchy but you also lose lifetime control / error handling on each set individually. For example if one of the producer tasks fails for a reason that can be handled, in the flat, single nursery case, both sets must be restarted versus possibly recovering one or the other side with supervision code. So afaict from a LOC perspective it's not really different minus the Further once we start thinking about multiple-actors (processes) where the send vs. recv side of the channel API are actually in completely separate memory spaces I'm not sure the argument for less nurseries applies at all actually. If you have tasks in 2 separate actors they already are running under distinct actor scoped nurseries and there should be no way to have a single-direction-side (meaning the send or recv side attached in separate processes) be oriented in a way such that you can "close one side" without also closing some top level task or nursery. Let's take the one example from above: # code which will be mostly duplicate in both processes
async with context.open_stream() as send, recv:
# yes you could spawn a bunch of tasks which send on clones
# but closing all these will teardown only one side of the 2-way stream
async with recv, trio.open_nursery() as (_, n):
# the main question is, when will you need it such that the above nursery
# **does not also close** when all these tasks are complete?
n.start_soon(send_stuff_with_tx, send.clone())
# remember this is different because each task will receive **a copy of each message**
# every time something new is sent from the other side
n.start_soon(send_stuff_with_rx, recv.clone()) The only case I can think of to justify
But, when or why would you need that? |
Thanks to @nimaje for further examples that might provide further incentive for send side cloning. Quoting from async def run_pair(send_channel, …):
async def partner(…):
…
try:
while some_condition:
…
await send_channel.send(something)
…
if some_other_condition:
await send_channel.aclose()
except trio.ClosedResourceError:
# stop that stuff when the channel was closed
pass
…
try:
# maybe send something if possible
await send_channel.send(something)
except trio.ClosedResourceError:
pass
…
async with trio.open_nursery() as nursery:
nursery.start_soon(partner, …)
nursery.start_soon(partner, …)
send_channel = …
with trio.open_nursery() as nursery:
# each of the pairs should independently stop using the channel
# without closing them for the others
nursery.start_soon(run_pair, send_channel.clone(), …)
nursery.start_soon(run_pair, send_channel.clone(), …)
nursery.start_soon(run_pair, send_channel.clone(), …)
A long running task allocated by the same nursery that creates tasks which will use and eventually close the channel when complete. # maybe every case of that is solveable with a second nursery
send_channel = …
async with send_channel, trio.open_nursery() as nursery:
# scenario:
# to signal the reciving side send_channel should be closed as soon as possible,
# but some_long_running_task will run longer than that in most cases
# (you can't swap send_channel and nursery as that would close the send_channel directly after this body)
nursery.start_soon(some_task, send_channel, …)
nursery.start_soon(some_other_task, send_channel, …)
nursery.start_soon(some_long_running_task, …)
send_channel = …
async with trio.open_nursery() as nursery, send_channel:
nursery.start_soon(some_task, send_channel.clone(), …)
nursery.start_soon(some_other_task, send_channel.clone(), …)
nursery.start_soon(some_long_running_task, …) So I guess I'll try to summarize these 2 examples above:
|
This mostly adds the api described in #53 (comment) The first draft summary: - formalize bidir steaming using the `trio.Channel` style interface which we derive as a `MsgStream` type. - add `Portal.open_context()` which provides a `trio.Nursery.start()` remote task invocation style for setting up and tearing down tasks contexts in remote actors. - add a distinct `'started'` message to the ipc protocol to facilitate `Context.start()` with a first return value. - for our `ReceiveMsgStream` type, don't cancel the remote task in `.aclose()`; this is now done explicitly by the surrounding `Context` usage: `Context.cancel()`. - streams in either direction still use a `'yield'` message keeping the proto mostly symmetric without having to worry about which side is the caller / portal opener. - subtlety: only allow sending a `'stop'` message during a 2-way streaming context from `ReceiveStream.aclose()`, detailed comment with explanation is included. Relates to #53
This mostly adds the api described in #53 (comment) The first draft summary: - formalize bidir steaming using the `trio.Channel` style interface which we derive as a `MsgStream` type. - add `Portal.open_context()` which provides a `trio.Nursery.start()` remote task invocation style for setting up and tearing down tasks contexts in remote actors. - add a distinct `'started'` message to the ipc protocol to facilitate `Context.start()` with a first return value. - for our `ReceiveMsgStream` type, don't cancel the remote task in `.aclose()`; this is now done explicitly by the surrounding `Context` usage: `Context.cancel()`. - streams in either direction still use a `'yield'` message keeping the proto mostly symmetric without having to worry about which side is the caller / portal opener. - subtlety: only allow sending a `'stop'` message during a 2-way streaming context from `ReceiveStream.aclose()`, detailed comment with explanation is included. Relates to #53
This mostly adds the api described in #53 (comment) The first draft summary: - formalize bidir steaming using the `trio.Channel` style interface which we derive as a `MsgStream` type. - add `Portal.open_context()` which provides a `trio.Nursery.start()` remote task invocation style for setting up and tearing down tasks contexts in remote actors. - add a distinct `'started'` message to the ipc protocol to facilitate `Context.start()` with a first return value. - for our `ReceiveMsgStream` type, don't cancel the remote task in `.aclose()`; this is now done explicitly by the surrounding `Context` usage: `Context.cancel()`. - streams in either direction still use a `'yield'` message keeping the proto mostly symmetric without having to worry about which side is the caller / portal opener. - subtlety: only allow sending a `'stop'` message during a 2-way streaming context from `ReceiveStream.aclose()`, detailed comment with explanation is included. Relates to #53
This mostly adds the api described in #53 (comment) The first draft summary: - formalize bidir steaming using the `trio.Channel` style interface which we derive as a `MsgStream` type. - add `Portal.open_context()` which provides a `trio.Nursery.start()` remote task invocation style for setting up and tearing down tasks contexts in remote actors. - add a distinct `'started'` message to the ipc protocol to facilitate `Context.start()` with a first return value. - for our `ReceiveMsgStream` type, don't cancel the remote task in `.aclose()`; this is now done explicitly by the surrounding `Context` usage: `Context.cancel()`. - streams in either direction still use a `'yield'` message keeping the proto mostly symmetric without having to worry about which side is the caller / portal opener. - subtlety: only allow sending a `'stop'` message during a 2-way streaming context from `ReceiveStream.aclose()`, detailed comment with explanation is included. Relates to #53
This mostly adds the api described in #53 (comment) The first draft summary: - formalize bidir steaming using the `trio.Channel` style interface which we derive as a `MsgStream` type. - add `Portal.open_context()` which provides a `trio.Nursery.start()` remote task invocation style for setting up and tearing down tasks contexts in remote actors. - add a distinct `'started'` message to the ipc protocol to facilitate `Context.start()` with a first return value. - for our `ReceiveMsgStream` type, don't cancel the remote task in `.aclose()`; this is now done explicitly by the surrounding `Context` usage: `Context.cancel()`. - streams in either direction still use a `'yield'` message keeping the proto mostly symmetric without having to worry about which side is the caller / portal opener. - subtlety: only allow sending a `'stop'` message during a 2-way streaming context from `ReceiveStream.aclose()`, detailed comment with explanation is included. Relates to #53
This mostly adds the api described in #53 (comment) The first draft summary: - formalize bidir steaming using the `trio.Channel` style interface which we derive as a `MsgStream` type. - add `Portal.open_context()` which provides a `trio.Nursery.start()` remote task invocation style for setting up and tearing down tasks contexts in remote actors. - add a distinct `'started'` message to the ipc protocol to facilitate `Context.start()` with a first return value. - for our `ReceiveMsgStream` type, don't cancel the remote task in `.aclose()`; this is now done explicitly by the surrounding `Context` usage: `Context.cancel()`. - streams in either direction still use a `'yield'` message keeping the proto mostly symmetric without having to worry about which side is the caller / portal opener. - subtlety: only allow sending a `'stop'` message during a 2-way streaming context from `ReceiveStream.aclose()`, detailed comment with explanation is included. Relates to #53
This mostly adds the api described in #53 (comment) The first draft summary: - formalize bidir steaming using the `trio.Channel` style interface which we derive as a `MsgStream` type. - add `Portal.open_context()` which provides a `trio.Nursery.start()` remote task invocation style for setting up and tearing down tasks contexts in remote actors. - add a distinct `'started'` message to the ipc protocol to facilitate `Context.start()` with a first return value. - for our `ReceiveMsgStream` type, don't cancel the remote task in `.aclose()`; this is now done explicitly by the surrounding `Context` usage: `Context.cancel()`. - streams in either direction still use a `'yield'` message keeping the proto mostly symmetric without having to worry about which side is the caller / portal opener. - subtlety: only allow sending a `'stop'` message during a 2-way streaming context from `ReceiveStream.aclose()`, detailed comment with explanation is included. Relates to #53
This mostly adds the api described in #53 (comment) The first draft summary: - formalize bidir steaming using the `trio.Channel` style interface which we derive as a `MsgStream` type. - add `Portal.open_context()` which provides a `trio.Nursery.start()` remote task invocation style for setting up and tearing down tasks contexts in remote actors. - add a distinct `'started'` message to the ipc protocol to facilitate `Context.start()` with a first return value. - for our `ReceiveMsgStream` type, don't cancel the remote task in `.aclose()`; this is now done explicitly by the surrounding `Context` usage: `Context.cancel()`. - streams in either direction still use a `'yield'` message keeping the proto mostly symmetric without having to worry about which side is the caller / portal opener. - subtlety: only allow sending a `'stop'` message during a 2-way streaming context from `ReceiveStream.aclose()`, detailed comment with explanation is included. Relates to #53
I've already left some notes in the code about how we could do two way streaming using the native
received = yield sent
generator semantics but it's probably worth looking at how other projects are approaching it and if it's even a good idea.Some projects I found through different communities recently:
faust
(though I think this is moreso a demonstration of a need for a properasyncitertools
)Another question is how to accomplish this with traditional messaging patterns like those found in
nng
. There was some suggestions in gitter about combining protocols/socket types.More to come...
Update
The api I'm currently most convinced of is at this comment. Afaict it would also suffice the needs of #122 to some degree since cancelling a "context" would effectively be like cancelling a cross-actor scope.
The text was updated successfully, but these errors were encountered: