-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream contexts #206
Stream contexts #206
Conversation
Move receive stream into streaming modules and rebrand as a "message stream". Factor out cancellation mechanics in `.aclose()` into the `Context` type which will soon provide the api for for cancelling portal invocations. Comment-stage a few methods on both types in anticipation of a new bi-directional streaming api. Add a `MsgStream` bidirectional channel type which will be the eventual type yielded from `Context.open_stream()`. Adjust the response/dialog types to be the set `{'asyncfun', 'asyncgen', 'context'}`. OH, and add async func checking in `Portal.run()` to catch and error on sync funcs early.
NB: this is a breaking change removing support for `Portal.run()` being able to invoke remote streaming functions and instead replacing the method call with an async context manager api `Portal.open_stream_from()` This style explicitly defines stream teardown at the call site instead of expecting the user to handle tricky things correctly themselves: eg. `async_geneartor.aclosing()`. Going forward `Portal.run()` can be used only for invoking async functions.
fdd7dd5
to
2498a49
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New API looking sexy! 🎉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds like a great motivation. I'm not good for a deep review, but nice!
if not expect: | ||
print("all values streamed, BREAKING") | ||
break | ||
# TODO: this is justification for a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh so one question is if we want to move forward with a ActorNursery.run_in_actor()
equivalent for streams. Pretty sure we go over this in #172.
raise TypeError( | ||
"The first argument to the stream function " | ||
f"{func.__name__} must be `ctx: tractor.Context`" | ||
) | ||
return func | ||
|
||
|
||
class ReceiveMsgStream(trio.abc.ReceiveChannel): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stole this name from @njsmith 🤫
tractor/_streaming.py
Outdated
self._ctx = ctx | ||
self._rx_chan = rx_chan | ||
self._portal = portal | ||
# self._chan = portal.channel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clean this?
tractor/_streaming.py
Outdated
try: | ||
msg = await self._rx_chan.receive() | ||
return msg['yield'] | ||
# return msg['yield'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and this.
Allows for invoking remote routines and receiving results through an | ||
underlying ``tractor.Channel`` as though the remote (async) | ||
function / generator was invoked locally. | ||
A portal is "opened" (and eventually closed) by one side of an |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hopefully this explanation is more acceptable 🏄🏼
) | ||
|
||
@asynccontextmanager | ||
async def open_stream_from( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the meat of the new API.
tests/test_discovery.py
Outdated
portal2 = await tn.start_actor( | ||
'consumer2', enable_modules=[__name__]) | ||
|
||
async with ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heh, yeah so turns out this style (tuple after the with) is not allowed pre-3.9 😢; though oddly doesn't seem documented anywhere?
It's def failing in ci where you can see there's no syntax error in 3.9.
So we'll have to delay this I guess until we do a 3.9+ pin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a wish list: #207
4cde872
to
8e4965e
Compare
8e4965e
to
ba7b36d
Compare
Turns out can't use the nicer syntax before python 3.9 (even though it doesn't seem documented anywhere?). Relates to #207
ba7b36d
to
b1f657e
Compare
Oh right, docs heh. I guess that's a thing if there are any not already in the examples? |
This is a breaking change to remove support for streaming using
Portal.run()
calls and instead replace it with a new@asynccontextmanager
apiPortal.open_stream_from() as stream:
.The main motivation for this is that it avoids leaving users to do there own manual stream shutdown management (normally ala
async_generator.aslosing()
or the newaclosing()
coming in the stdlib) and promotes more stringent SC style in streaming clients. That latter part is important as we move shortly to supporting an api for #53 which will require explicit cross-actor-context management.The basic gist should be clear from the changed tests.
Once this is merged it will break old code.