-
-
Notifications
You must be signed in to change notification settings - Fork 343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Broadcast channels #987
Comments
A channel that drops the oldest enqueued message when a new one arrives (instead of blocking) would be a good building block to have. The backpressure-on-the-fastest-receiver idea is interesting. But why would you need unbounded buffer space?
|
BroadcastValue is an excellent interface IMO. It could be built pretty easily on top of the "channel that drops the oldest enqueued message when a new one arrives" primitive described by @smurfix. For the reliable delivery cases, one possible issue with "just send in an arbitrary order" is that it means consumers later in the delivery order have to wait for consumers earlier in it. Theoretically it seems like it might be better to do all the sends in parallel? Either using a nursery, or if that's too high overhead, using some select() ish thing. (Which I will get back to soon... dayjob work has been particularly high-intensity lately and it's not dovetailing as well with Trio work as it did a few months ago.) |
I think it's worth distinguishing between two use cases:
I don't think we should mix these up. In particular, I'm skeptical that it ever makes sense to use a drop-oldest policy + buffer size >1... If you can afford to drop old messages, you might as well do it all the time, and exercise that case properly.
If the slowest receiver is N messages behind the fastest receiver, then you need to buffer N messages. If you don't have any way for the slowest receiver to transmit backpressure, then N can grow without bound. |
Yeah, I'm not sure it makes a huge difference – it's more-or-less equivalent to changing the buffer size by 1 – but doing the sends in parallel does feel more elegant. |
Well, in that case, sure. I was thinking of the "we have more than one receiver and at least one needs to process the message while others can take it or leave it" case. I actually may need that for a distributed network test system: at least one receiver must run each test, while others may also do so if they have spare capacity. |
The semantics of BroadcastValue fits several instances in my existing codebase (notably most of the uses of With the value optional, the question becomes what to name it. BroadcastSignal()? No one wants to overload "signal"-- but that's what it is. |
@belm0 I guess in general we want to encourage people to pass things in-band, because it makes the state tracking easier to reason about ("share memory by communicating" does have some merit). But if you wanted to reuse the machinery without any real state, I guess you could just increment a counter or something? |
Regarding Roughly: class BroadcastValue(Generic[T]):
def set(self, value: T) -> None:
...
def subscribe(self, predicate=None) -> ReceiveChannel[T]:
"""Yield value changes when predicate(value, old_value) is True. Defaults to all changes."""
... (In 90% of cases our predicates don't use |
I use a version of The crux of these ideas is that they are not cancellation safe, i.e. when that task is cancelled or otherwise exits before setting the value, how does the caller get notified? Conversely, when the caller dies, somebody please notify the callee that its work is no longer required. |
Something that just occurred to me, though maybe it was obvious to everyone else: if broadcasting is about handling arbitrary numbers of consumers, then an important special case is when there are zero consumers. For And the other thing any broadcast strategy has to think about is: can the set of consumers change over time, and if so then how do you handle consumers arriving and consumers departing? For
I guess there might be one high-level case where this style could be useful, and that's logging. Maybe if your log consumer can't keep up, you want to just discard messages, but add a note saying that that's what you're doing. But I feel like this still has a lot of subtleties that I don't fully understand. If you're logging over something like UDP, then this isn't an issue – the transport layer automatically handles message boundaries and discarding messages on overflow. So I guess this is mostly useful for when you're logging over a bytestream, like TCP or stdout. If you're logging over a bytestream, then you need a task to handle the bytestream and mediate access, because otherwise you'll get garbled messages if a task gets cancelled while it's in the middle of logging. So you want something like an in-memory channel, with multiple producers who probably only want to call synchronous methods, and a single consumer. And the channel needs a buffer, plus a flag to indicate overflow, and whenever we consume from the channel we need to do some delicate handling to make sure the "messages dropped" indication gets recorded at the right point in the message stream. That last part makes things slightly awkward, but I guess we can do it with a object that encapsulates a
And the consumer does:
... But this has a bug. Consider this sequence:
In this case the final logs look like: [message] [message] [new message] [overflow indication] But they should look like: [message] [message] [overflow indication] [new message] So this is interesting. I'm not sure how important this is and there are certainly lots of ways to solve it (e.g. it wouldn't be too hard to create a custom Some ideas for fixing this situation:
... None is this is seeming super appealing. Maybe a custom LogChannel is the best option. |
Simple solution to the message flag problem: teach the sender to attach a sequence number to each message. I'd rather not over-engineer this. A broadcast on UDP is a lossy transaction that doesn't have any loss indication and might even suffer from message reordering. If we add a broadcast channel that doesn't have these two faults it's a strict improvement. Beyond that we have some incompatible design requirements (e.g. does a lossy receiver still receive the messages before the overflow occurred, given that it needs to re-sync anyway?) which remind me of the multiple choices we have for doing a |
OK yeah that's probably better than all that stuff I wrote up above :-) Returning to I'm thinking, the basic structure of So I'm not quite sure how we should organize all these ideas into, like, Python classes. One option would be to pack it all into one class, that you can use like: # Simple value broadcasting:
broadcast_value.set(...)
async for generation, value in broadcast_value:
if generation != last_generation + 1:
print("missed some updates")
print(value)
# Representing an outside value, like a database:
broadcast_value.notify() # bumps generation counter without changing value
async for _, _ in broadcast_value:
print("database was updated") This might be trying to jam too many ideas into a single class though, and end up making it more confusing than if we had multiple classes? And I'm not really sure the generation counter is useful. Really the main reason I put it in the sketch is that I also wonder about whether we should expose a way to inspect the current value and/or generation counter directly, without subscribing. Is that helpful, or an invitation to subtle race conditions due to the values being out of sync with subscribers? |
I had this race condition recently with code using trio_util.AsyncValue:
I.e. missed updates (here just during scheduling time) can cause the await to block indefinitely even though value is A or B. |
It occurred to me that this particular race is somewhat tied to Python's implementation of async/await, and can be worked around with overexertion. In Python, the body of an async function isn't entered until the coroutine is scheduled and run. So in my previous example, even though by logic we know that My first attempt to work around this involved a wrapper function that returns the coroutine. The wrapper function can access the current value immediately and pass this to the async function. But it doesn't scale, because the wrapper will be ineffective when used in any API which defers async function calls ( So then I'm left with passing the immediate value explicitly:
(Of course |
I think Python and C# are actually the same in this regard: when you do I spent a few minutes looking at the
This looks like it's moving in the same direction as the |
@njsmith thank you for correcting me. So just to ingrain this: in the following code, no other task can get a time slice between async def foo():
print('hi')
await trio.sleep(1)
...
await foo() It seems that #987 (comment) shouldn't have a race then. I'll have to confirm more carefully one way or the other. |
Correct. |
I've found an important subtlety in how you may want As long as a given subscriber is able to keep up with broadcasted values, queueing is desirable. For example, a queue size of 2 is sufficient to ensure that no values are dropped as long as 1) the subscriber body is synchronous, and 2) However, if the subscriber is slow, the queueing is not desirable. For example, given queue size of 3, if I addressed it as follows (note that the class FrontRunnerBroadcastValue:
"""
Broadcast channel that skips to the most recent value when a subscriber's
queue has overflowed.
Rationale: as long as a subscriber keeps up with new values, we want to use
the full queue depth (currently 2, which is sufficient for up to one event
per scheduler frame). In contrast, a slow subscriber should always get the
latest value, so the queue is effectively bypassed.
"""
def __init__(self):
self.event = ParkingLot()
self.values = deque(maxlen=2) # (generation, value), newest on the right
self.head_generation = 0
def broadcast(self, value):
self.head_generation += 1
self.values.append((self.head_generation, value))
self.event.unpark_all()
async def subscribe(self):
last_generation = self.head_generation
while True:
if last_generation == self.head_generation:
await self.event.park()
# Emit the oldest value having a newer generation as long as no
# values have been missed. Otherwise emit the latest value.
# NOTE: this implementation is only efficient for short queue lengths
for generation, value in self.values:
if generation == last_generation + 1 or \
generation > last_generation and generation == self.head_generation:
yield value
last_generation = generation
# break, since self.values may have mutated during yield
break |
Further refinement of broadcast values, disambiguating these queue uses:
I think they should be separated. (1) is set according to how many broadcasts you'd like to handle per scheduler pass (assuming your subscriber body is synchronous and can consume them all), while (2) is set according to the desired behavior when the consumer is slow. For (2), I've only found reasonable use cases for queueing 0 or 1 items. For >=2, it's time to use a memory channel with backpressure instead. EDIT: don't use this-- there are bugs, and see subsequent comment by njsmith class FlexibleBroadcastValue:
"""
In-memory channel with synchronous broadcast (no backpressure)
There are two parameters that control queueing:
* max_broadcasts_per_scheduler_pass (default: 1)- a constructor option
that limits how many broadcasts can be handled within one scheduler
pass without dropping values. This is only significant when the
subscriber body is synchronous, and so can catch up atomically.
(Detail: this queueing addresses when more than one item arrives
before the generator itself awakes from blocking on the next value.)
* blocked_queue_size (default: 0) - a per-subscriber option that limits
how many values to queue while the subscriber body is blocked. Sizes
greater than 1 are questionable-- consider using a Trio memory
channel having proper backpressure instead.
"""
def __init__(self, *, max_broadcasts_per_scheduler_pass=1):
self.event = ParkingLot()
# deque of (generation, value), newest on the right
self.values = deque(maxlen=2 * max_broadcasts_per_scheduler_pass)
self.head_generation = 0
def broadcast(self, value):
self.head_generation += 1
self.values.append((self.head_generation, value))
self.event.unpark_all()
async def subscribe(self, *, blocked_queue_size=0):
assert blocked_queue_size <= self.values.maxlen
# The trick to this loop is its lack of any blocking call unless the
# subscriber is caught up. If the subscriber body is synchronous as
# well, it means the entire queue content can be relayed atomically.
last_generation = self.head_generation
while True:
if last_generation == self.head_generation:
await self.event.park()
for generation, value in self.values:
if generation > last_generation:
last_generation = generation
pre_yield_generation = self.head_generation
yield value
# If the head value changed, it implies that the subscriber
# blocked. Propagate a maximum of blocked_queue_size values
# during that time and break (since the collection we're
# iterating has mutated).
if self.head_generation > pre_yield_generation:
last_generation = max(last_generation,
self.head_generation - blocked_queue_size)
break |
In general, I would discourage everyone from writing code that depends on assumptions about the details of individual ticks and how Trio's scheduler processes them. This is not part of Trio's stable ABI. |
@belm0 is this I currently have a need for it (or something similar) in |
@goodboy I gave up on the quasi-queueing of FrontRunnerBroadcastValue However in our local version of trio-util, AsyncValue now has an I'm planning to commit that soon to the public repo. update: now published (https://trio-util.readthedocs.io/en/v0.5.0/#trio_util.AsyncValue.eventual_values) |
Thanks to @belm0 for pointing me to this issue. I've been doing broadcast messaging (slowest consumer backpressure, lossless) for a long time, originally with asyncio, then converted to Trio. I put my implementation in https://gist.github.com/basak/007da3fc2448300d037c9bb008cc5e80. It works well for me in practice. I filed #1949 because I noticed a theoretical problem today. However I believe Trio's implementation as it is today is safe from that issue. |
tokio's API for broadcasting, for potential inspiration: https://tokio-rs.github.io/tokio/doc/tokio/sync/broadcast/index.html In particular: https://tokio-rs.github.io/tokio/doc/tokio/sync/broadcast/index.html#lagging |
We have what appears to be a It's extremely nascent and in prototype testing atm; see the link above. |
About to land goodboy/tractor#229 but more then happy to contribute all of the Pretty happy with the performance and ease of use around our IPC streaming api 😎 NB: it's fastest consumer style just like |
Elixir's GenStage seems to backpressure on slowest consumer: https://hexdocs.pm/gen_stage/GenStage.BroadcastDispatcher.html It's not exactly a channel persay, but it's a thing where you can have separate "stages" -- each can either be a producer, a producer-consumer, or a consumer. A consumer will send demand up to its producer whenever it is done processing (I think). That's it, I think? I'm not sure how well this strategy works out, but GenStage is pretty much in the standard library for Elixir -- it's even developed on the Elixir org :P |
There's a fourth policy you might want to use, adjacent to "instead of applying backpressure, drop messages" - but instead of dropping individual messages, you drop the whole connection to the consumer (the consumer may then decide to reconnect, or give up entirely). Consider a consumer that is synchronising some state from the producer, with each event being a "diff" against the prior state. If just a single event was dropped, any future events would be useless to the consumer. |
[This is a speculative discussion. I don't know if we should do anything here. But I was thinking about it so I figured I'd write it down so it doesn't get lost.]
There are two kinds of multi-consumer channels: the kind where each item is distributed to one receiver (e.g., by round-robin), and the kind where each item is distributed to all receivers ("broadcast"). We already have good support for the former, via memory channels. Implementing broadcast is trickier, though, because you need to pick some policy for handling the case where consumers run at different speeds. I guess the basic options are:
Apply backpressure based on the slowest consumer: this is appropriate if all the receivers are working together in some sense, and there's no point in letting one consumer get too far ahead of the others. It's straightforward to implement by using a collection of memory channels with bounded buffers, like:
Apply backpressure based on the faster consumer: this is intuitively tempting in cases where the receivers are unrelated, but it requires unbounded buffer space, so... not necessarily a good idea. It can be approximated by re-using our
broadcast_send
from the previous bullet point, but with infinite buffers. But, this disables backpressure entirely, rather than applying backpressure based on the fastest receiver. Doing this precisely would require some more work. I'm not sure if it's ever useful.Instead of applying backpressure, drop messages: The intuition here is that one common case is where you have a value that updates occasionally, and subscribers want to keep track of the latest version. However, if multiple updates happen in quick succession while a subscriber isn't looking, then they only care about the latest value. One possible API:
The semantics of the
ReceiveChannel
would be: the first time you callreceive
(or__anext__
), it immediately returns the current value, and stores some state in the channel object keeping track of what was the last value returned by this particular channel. After that, each call toreceive
waits until there's been at least one call toset
(compared to the last time this channel returned a value), and then returns the latest value. So in practice, you'd write:and if there's a fast update rate, then the loop might skip over some intermediate values, but if the updates stop then the loop will always catch up with the latest value.
Are there any other interesting policies that I'm missing?
There's some discussion of related ideas in #637
The text was updated successfully, but these errors were encountered: