-
-
Notifications
You must be signed in to change notification settings - Fork 343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MemorySendChannel: replace the last queued item #1744
Comments
For your case, would something like this work? def send_nowait_discarding_old_values(send_handle, receive_handle, obj):
try:
send_handle.send_nowait(obj)
except trio.WouldBlock:
receive_handle.receive_nowait() # discard oldest item, then try again
send_handle.send_nowait(obj) I'm hesitant to add the |
@njsmith thank you for the suggestion. Discarding the oldest value isn't suitable for this use case, because at allows an over-eager producer task to blow away the entire queue of values in a single scheduler cycle. With replace_on_overflow, the worst a task could do is affect the most recent value. Another way to think about it, assuming the items queued in the channel are temporal: an overflow scenario should only affect items local in time. If say, the channel size is 100 and has items spanning 1 minute, and in the last 5 seconds there is a surge of incoming items, that shouldn't affect items that are a minute old.
The intention is to use this in refactoring and enhancement of trio_util So we see that the memory channel API is flexible enough to implement "discard oldest value" (though I can't think of a case where one would want that). It should be general enough to implement "discard most recent value" too, which I know has use cases. |
I'm just trying to wrap my head around the problem that this is trying to solve. Another concern: it's unclear how this should interact with task that are blocked in |
Doesn't happen in his usecase, which is another reason why a memory channel is a bad fit. @belm0 if I had your job I'd grab the old |
@njsmith Not in general. It was a poor attempt at an example, to avoid a dive into AsyncValue API and design. But that's not going to fly with this audience 😇 AsyncValue and background
x = AsyncValue(MyEnum.NONE)
# task A
await sleep(5)
x.value = MyEnum.FOO
# task B
await x.wait_value(MyEnum.FOO) # returns immediately if already satisfied
new_val, old_val = await x.wait_transition() # next change in value
await x.wait_transition(MyEnum.FOO) # next change to a specific value
new_val, _ = await x.wait_transition(lambda val, _: val in (MyEnum.FOO, MyEnum.BAR)) So that's all fine. It's vetted: we know it's very useful, and moreover approachable by casual programmers (the return values of while True:
await x.wait_transition(MyEnum.FOO)
await some_action() Now, that will miss transitions which occur while the async work is happening. In many cases, that is what you want. And that happens to be the simplest semantics for casual programmers, because they don't need to think about queueing and all it entails-- for example, what happens if their action takes 20 seconds and someone just made 100 transitions to the value. However, in some cases we do want to avoid missing transitions. These suffice right now for "best effort": while True:
await x.wait_transition(MyEnum.FOO)
some_synchronous_action()
async with trio.open_nursery() as nursery:
while True:
await x.wait_transition(MyEnum.FOO)
nursery.start_soon(some_action) By "best effort", I mean it's good enough to deal with writes to value every other scheduler frame or so. So yes, that pattern is encroaching on memory channels, and "you should just use a channel". But I have reasons for not wanting to promote every developer using the memory channel API in our app, like 1) it's a more complicated thing to learn than AsyncValue, with non-trivial options like channel size, and 2) the person creating the signal needs to think about all potential use cases and "do I use AsyncValue or this queueing thing?". AsyncValue.transitions()It started with a harmless enhancement: add a In our codebase it would replace about 50 instances of these manual So I'd like to re-implement |
At that point I'd probably just use the |
After having a go at an actual
After another read of the broadcast channels discussion (#987), I made something that again employs class _Channel:
"""Broadcast channel that drops oldest value on overflow"""
def __init__(self):
self.event = ParkingLot()
# queue size of 2 should suffice for up to one event per scheduler frame
self.values = deque(maxlen=2) # (generation, value), newest on the right
self.generation = 0
def broadcast(self, value):
self.generation += 1
self.values.append((self.generation, value))
self.event.unpark_all()
async def subscribe(self):
last_generation = self.generation
while True:
await self.event.park()
# assumes a short queue length
for generation, value in self.values:
if generation > last_generation:
yield value
last_generation = generation So far the unit tests I've thrown at the |
closing as there isn't adequate interest in this enhancement, and I've found I don't need it myself thank you for the discussion! |
If the goal is to avoid losing wakeups, then I think you can do that 100% reliably without any queue or dropping anything? I would keep state in each subscriber generator tracking which events they've seen, and when events are fed in, make a note of which generators are supposed to wake up, so they either wake up if they're asleep, or else they'll return immediately the next time they're iterated. E.g.: import trio
import weakref
import attrs
@attr.s
class Subscription:
predicate = attr.ib()
next_event = attr.ib(factory=trio.Event)
@attr.s
class AsyncValue:
_value = attr.ib()
_state_subscriptions = attr.ib(factory=weakref.WeakSet)
async def subscribe_value(self, predicate):
sub = Subscription(predicate)
self._state_subscriptions.add(sub)
while True:
await sub.next_event.wait()
# Re-arm the event *before* we yield, so if any events arrive
# while we're yielded, the next loop will see them immediately.
sub.next_event = trio.Event()
yield
@property
def value(self):
return self._value
@value.setter
def value(self, new_value):
self.value = new_value
for sub in self._state_subscriptions:
if sub.predicate(self.value):
sub._next_event.set() |
I see-- that's an interesting pattern. Currently I'm using However in |
Can't you just stash that value next to the Event, in the Subscription
object? Or a list of values, I guess, since there may be multiple triggers
between each wakeup.
…On Mon, Oct 5, 2020, 20:45 John Belmonte ***@***.***> wrote:
I see-- that's an interesting pattern.
However in AsyncValue, the waitable methods and generator return the
exact value(s) that triggered the predicate, which is sometimes required.
This may be why I need a queue. Otherwise, even though next_event may
already be triggered, wait() has a schedule point during which value may
be modified again.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#1744 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAEU42C3QLLO7RPJIZVNOBLSJKHF5ANCNFSM4SALXPCQ>
.
|
So for
|
Yes, but then I want to know if the subscriber is actually behind and I should jump to the most recent value, as opposed to being fast enough but merely out of phase (see post I made to broadcast channel issue #987 (comment)). So then I'm putting a generation number in the subscription object too, and on top of the desire to share subscriptions, at that point I don't see a win over the revised broadcast object (FrontRunnerBroadcastValue) I posted to the other issue. |
The idea is optional support for best-effort synchronous send. I can pick whatever memory channel size is suitable (1, 10). In the worst case values might get dropped, but retaining the most recent value would be ensured.
proposed API
add
replace_on_overflow=False
keyword-only arg tosend_nowait()
If
replace_on_overflow
is True, rather than raisingWouldBlock
, the last queued item will be replaced with the given value. This is expected to be combined withmax_buffer_size
> 0, since otherwise there can be no queued item to replace.implementation
In existing
WouldBlock
case, ifreplace_on_overflow
is True andmax_buffer_size
> 0, simply replace the last item in the deque. This is O(1).use case detail
I'd like to explore using it for trio_util AsyncValue. While that API has overwrite semantics (you can keep writing synchronously to the value attribute and nothing gets queued, but last value is always retained), on the other hand I do want to support subscribing to value transitions without any events getting dropped (assuming that the subscriber reschedules himself immediately after receive).
So I might use a memory channel of size 1 (which would support at most 1 write per scheduler frame without dropping events), or optionally larger sizes. But in all cases I need writes to be synchronous, and the last value to be retained on overflow.
The text was updated successfully, but these errors were encountered: