-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
We need broadcast channels, stat. #204
Comments
Interesting stuff from the
I kind of like this since it's very reminiscent of real-time DSP style overruns and would allow for detecting too-slow consumers via errors (that may allow restart logic to entail?). |
Add `ReceiveMsgStream.subscribe()` which allows allocating a broadcast receiver around the stream for use by multiple actor-local consumer tasks. Entering this context manager idempotently mutates the stream's receive machinery which for now can not be undone. Move `.clone()` to the receive stream type. Resolves #204
Add `ReceiveMsgStream.subscribe()` which allows allocating a broadcast receiver around the stream for use by multiple actor-local consumer tasks. Entering this context manager idempotently mutates the stream's receive machinery which for now can not be undone. Move `.clone()` to the receive stream type. Resolves #204
Add `ReceiveMsgStream.subscribe()` which allows allocating a broadcast receiver around the stream for use by multiple actor-local consumer tasks. Entering this context manager idempotently mutates the stream's receive machinery which for now can not be undone. Move `.clone()` to the receive stream type. Resolves #204
Add `ReceiveMsgStream.subscribe()` which allows allocating a broadcast receiver around the stream for use by multiple actor-local consumer tasks. Entering this context manager idempotently mutates the stream's receive machinery which for now can not be undone. Move `.clone()` to the receive stream type. Resolves #204
This ties in with #40 and #53 both of which heavily rely on the discussion in python-trio/trio#987.
The main gotcha is that
trio
's mem chans don't have broadcasting built-in (frankly making them mostly an SC anti-pattern if you have followed the.clone()
semantics discussion). This mis-assumption was what originally fueled #203.The obvious main use case is for broadcasting stream data on the receive side of a stream such that multiple rx-side tasks get each their own copy of a new message. NB: We already have a prototype for producer side ala
tractor.msg.pub
, though I'm not sure it's the best solution either.Depending on how we move forward with bidirectional-streaming in #53 it might be worth offering a
.clone()
or similar method which provides a broadcast reference token to consumer tasks.Definitely needs a deeper dive.
For reference here's the current short list of broadcast chan impls:
the
eventual_values
predicate matching iterator intrio-util
the "lossless", slowest consumer limiting, event queue
aioevent
gist from @basakslurry
'sPipeline.tap()
effectively does the,but, with non-zero buffer size reccomendations:
Any more implementations are greatly welcome from the lurker pack 😉
The text was updated successfully, but these errors were encountered: