Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MemorySendChannel: replace the last queued item #1744

Closed
belm0 opened this issue Oct 1, 2020 · 14 comments
Closed

MemorySendChannel: replace the last queued item #1744

belm0 opened this issue Oct 1, 2020 · 14 comments

Comments

@belm0
Copy link
Member

belm0 commented Oct 1, 2020

The idea is optional support for best-effort synchronous send. I can pick whatever memory channel size is suitable (1, 10). In the worst case values might get dropped, but retaining the most recent value would be ensured.

proposed API

add replace_on_overflow=False keyword-only arg to send_nowait()

If replace_on_overflow is True, rather than raising WouldBlock, the last queued item will be replaced with the given value. This is expected to be combined with max_buffer_size > 0, since otherwise there can be no queued item to replace.

implementation

In existing WouldBlock case, if replace_on_overflow is True and max_buffer_size > 0, simply replace the last item in the deque. This is O(1).

use case detail

I'd like to explore using it for trio_util AsyncValue. While that API has overwrite semantics (you can keep writing synchronously to the value attribute and nothing gets queued, but last value is always retained), on the other hand I do want to support subscribing to value transitions without any events getting dropped (assuming that the subscriber reschedules himself immediately after receive).

So I might use a memory channel of size 1 (which would support at most 1 write per scheduler frame without dropping events), or optionally larger sizes. But in all cases I need writes to be synchronous, and the last value to be retained on overflow.

@njsmith
Copy link
Member

njsmith commented Oct 3, 2020

For your case, would something like this work?

def send_nowait_discarding_old_values(send_handle, receive_handle, obj):
    try:
        send_handle.send_nowait(obj)
    except trio.WouldBlock:
        receive_handle.receive_nowait()  # discard oldest item, then try again
        send_handle.send_nowait(obj)

I'm hesitant to add the replace_on_overflow because it feels pretty arbitrary and specific to your particular project...

@belm0
Copy link
Member Author

belm0 commented Oct 4, 2020

@njsmith thank you for the suggestion. Discarding the oldest value isn't suitable for this use case, because at allows an over-eager producer task to blow away the entire queue of values in a single scheduler cycle. With replace_on_overflow, the worst a task could do is affect the most recent value.

Another way to think about it, assuming the items queued in the channel are temporal: an overflow scenario should only affect items local in time. If say, the channel size is 100 and has items spanning 1 minute, and in the last 5 seconds there is a surge of incoming items, that shouldn't affect items that are a minute old.

it feels pretty arbitrary and specific to your particular project...

The intention is to use this in refactoring and enhancement of trio_util AsyncValue, which is somewhat general and used by at least one other member of the Trio community (mozesa). It's an API with overwrite semantics, which I don't think is that unusual and has served us well in 2 years of active use. (AsyncValue.value setter corresponds roughly to "accept this new value and trigger an event". )

So we see that the memory channel API is flexible enough to implement "discard oldest value" (though I can't think of a case where one would want that). It should be general enough to implement "discard most recent value" too, which I know has use cases.

@mozesa
Copy link

mozesa commented Oct 4, 2020

I am actively following this feature request, enchantment proposal since, as @belm0 mentioned, I have used trio_util for a year with a great pleasure and thus I am interested in anything that could make trio_util even better.

At the same time I do understand @njsmith's concern as well.

@njsmith
Copy link
Member

njsmith commented Oct 4, 2020

I'm just trying to wrap my head around the problem that this is trying to solve. AsyncValue is great for sure, but it's all about a single value, so I don't know how buffers come into anything. It sounds like you want to use a memory channel as a kind of regularly-sampled history of some value's history? I'm not sure memory channels are a great fit for that...

Another concern: it's unclear how this should interact with task that are blocked in send.

@smurfix
Copy link
Contributor

smurfix commented Oct 4, 2020

how this should interact with task that are blocked in send

Doesn't happen in his usecase, which is another reason why a memory channel is a bad fit.

@belm0 if I had your job I'd grab the old Queue implementation off the archive (it's simpler than the current memory channel implementation) and mangle it to implement the exact semantics you need.

@belm0
Copy link
Member Author

belm0 commented Oct 5, 2020

It sounds like you want to use a memory channel as a kind of regularly-sampled history

@njsmith Not in general. It was a poor attempt at an example, to avoid a dive into AsyncValue API and design. But that's not going to fly with this audience 😇

AsyncValue and background

AsyncValue has a synchronous setter/getter. It also has async wait on a value or transition.

x = AsyncValue(MyEnum.NONE)

# task A
await sleep(5)
x.value = MyEnum.FOO

# task B
await x.wait_value(MyEnum.FOO)  # returns immediately if already satisfied
new_val, old_val = await x.wait_transition()  # next change in value
await x.wait_transition(MyEnum.FOO)  # next change to a specific value
new_val, _ = await x.wait_transition(lambda val, _: val in (MyEnum.FOO, MyEnum.BAR))

So that's all fine. It's vetted: we know it's very useful, and moreover approachable by casual programmers (the return values of wait_value() and wait_transition() are rarely used). Current implementation uses ParkingLot. A typical use might be to do some async work whenever the state matches a specific value:

while True:
    await x.wait_transition(MyEnum.FOO)
    await some_action()

Now, that will miss transitions which occur while the async work is happening. In many cases, that is what you want. And that happens to be the simplest semantics for casual programmers, because they don't need to think about queueing and all it entails-- for example, what happens if their action takes 20 seconds and someone just made 100 transitions to the value.

However, in some cases we do want to avoid missing transitions. These suffice right now for "best effort":

while True:
    await x.wait_transition(MyEnum.FOO)
    some_synchronous_action()

async with trio.open_nursery() as nursery:
    while True:
        await x.wait_transition(MyEnum.FOO)
        nursery.start_soon(some_action)

By "best effort", I mean it's good enough to deal with writes to value every other scheduler frame or so. So yes, that pattern is encroaching on memory channels, and "you should just use a channel". But I have reasons for not wanting to promote every developer using the memory channel API in our app, like 1) it's a more complicated thing to learn than AsyncValue, with non-trivial options like channel size, and 2) the person creating the signal needs to think about all potential use cases and "do I use AsyncValue or this queueing thing?".

AsyncValue.transitions()

It started with a harmless enhancement: add a transitions() async generator to the AsyncValue API that accepts the same args as the single-use wait_transition(). As a first pass it would just encapsulate the while True loop.

In our codebase it would replace about 50 instances of these manual while loops and make the code more clear. But trio_util is a library for general use, and I'm not satisfied with publishing a generator API that is only "good enough to deal with writes to value every other scheduler frame or so".

So I'd like to re-implement AsyncValue on top of memory channels with replace_on_overflow=True. With a channel size of 1, transitions() could handle a write to value every scheduler frame without dropping events. When that limit is exceeded, the behavior would be the same as it is now. And by adding an option for larger channel sizes, it could also prevent dropping events in scenarios with multiple writes per scheduler frame (e.g. multiple writing tasks).

@belm0
Copy link
Member Author

belm0 commented Oct 5, 2020

grab the old Queue implementation off the archive (it's simpler than the current memory channel implementation) and mangle it to implement the exact semantics you need

At that point I'd probably just use the wait_task_rescheduled API directly. But that is a good reference, thank you.

@belm0
Copy link
Member Author

belm0 commented Oct 5, 2020

After having a go at an actual AsyncValue.transitions() implementation, I've realized:

  • most of AsyncValue is fine as it is (using ParkingLot), I just need to add some queueing thing for transitions()
  • broadcast semantics is preferable (for efficiency, since there can be multiple subscribers to the same predicate), which rules out use of memory channel
  • for very small queue size (like 2), which end gets dropped on overflow may not be significant

After another read of the broadcast channels discussion (#987), I made something that again employs ParkingLot. For now, it drops oldest values on overflow, since that seemed more efficient to implement.

class _Channel:
    """Broadcast channel that drops oldest value on overflow"""

    def __init__(self):
        self.event = ParkingLot()
        # queue size of 2 should suffice for up to one event per scheduler frame
        self.values = deque(maxlen=2)  # (generation, value), newest on the right
        self.generation = 0

    def broadcast(self, value):
        self.generation += 1
        self.values.append((self.generation, value))
        self.event.unpark_all()

    async def subscribe(self):
        last_generation = self.generation
        while True:
            await self.event.park()
            # assumes a short queue length
            for generation, value in self.values:
                if generation > last_generation:
                    yield value
                    last_generation = generation

So far the unit tests I've thrown at the transitions() implementation seem to be holding up.

@belm0
Copy link
Member Author

belm0 commented Oct 6, 2020

closing as there isn't adequate interest in this enhancement, and I've found I don't need it myself

thank you for the discussion!

@belm0 belm0 closed this as completed Oct 6, 2020
@njsmith
Copy link
Member

njsmith commented Oct 6, 2020

If the goal is to avoid losing wakeups, then I think you can do that 100% reliably without any queue or dropping anything? I would keep state in each subscriber generator tracking which events they've seen, and when events are fed in, make a note of which generators are supposed to wake up, so they either wake up if they're asleep, or else they'll return immediately the next time they're iterated. E.g.:

import trio
import weakref
import attrs

@attr.s
class Subscription:
    predicate = attr.ib()
    next_event = attr.ib(factory=trio.Event)


@attr.s
class AsyncValue:
    _value = attr.ib()
    _state_subscriptions = attr.ib(factory=weakref.WeakSet)

    async def subscribe_value(self, predicate):
        sub = Subscription(predicate)
        self._state_subscriptions.add(sub)
        while True:
            await sub.next_event.wait()
            # Re-arm the event *before* we yield, so if any events arrive
            # while we're yielded, the next loop will see them immediately.
            sub.next_event = trio.Event()
            yield

    @property
    def value(self):
        return self._value

    @value.setter
    def value(self, new_value):
        self.value = new_value
        for sub in self._state_subscriptions:
            if sub.predicate(self.value):
                sub._next_event.set()

@belm0
Copy link
Member Author

belm0 commented Oct 6, 2020

I see-- that's an interesting pattern. Currently I'm using ParkingLot so doing something like that wasn't obvious.

However in AsyncValue, the waitable methods and generator return the exact value(s) that triggered the predicate, which is sometimes required. This may be why I need a queue, but I'm not sure. I'll have to see if I can apply this pattern.

@njsmith
Copy link
Member

njsmith commented Oct 6, 2020 via email

@belm0
Copy link
Member Author

belm0 commented Oct 6, 2020

So for AsyncValue I think there are two reasons that pattern isn't suitable (or in hindsight only one, because the 2nd has a workaround):

  1. it doesn't meet the desired goal of handling one event per scheduler pass when the subscriber is synchronous. Because inevitably the subscriber and task assigning to value will be in the same scheduler pass in a pipelined fashion (subscriber task to consume the value of the last pass, and writer task to write the next value). And when that happens the execution order isn't guaranteed. If the writer executes first, the values of both the current and previous match need to be stored somewhere (hence queue size of 2).
  2. AsyncValue tries to be efficient about multiple subscribers to the same predicate. If the predicate is hashable (e.g. matching "all transitions" or a hashable constant, especially True or False), then corresponding matches can be shared by subscribers, hence the need for broadcast. The pattern with generations of trio.Event only works for a single subscriber, though I suppose there could be a map from predicate to subscribers, and the setter would have to iterate on the subscribers of a matching predicate.

@belm0
Copy link
Member Author

belm0 commented Oct 6, 2020

Can't you just stash that value next to the Event, in the Subscription
object? Or a list of values, I guess, since there may be multiple triggers
between each wakeup.

Yes, but then I want to know if the subscriber is actually behind and I should jump to the most recent value, as opposed to being fast enough but merely out of phase (see post I made to broadcast channel issue #987 (comment)). So then I'm putting a generation number in the subscription object too, and on top of the desire to share subscriptions, at that point I don't see a win over the revised broadcast object (FrontRunnerBroadcastValue) I posted to the other issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants