Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SendChannel.send() doesn't have a defined ordering #1949

Open
basak opened this issue Apr 5, 2021 · 16 comments
Open

SendChannel.send() doesn't have a defined ordering #1949

basak opened this issue Apr 5, 2021 · 16 comments

Comments

@basak
Copy link
Member

basak commented Apr 5, 2021

I hope I've understood the nuances of checkpoints correctly here. I had thought I spotted a race condition in my Trio-using code. I concluded that there's not currently any way to avoid this in Trio's API as defined at the moment, but Trio's current implementation means that the race won't occur. I wonder if the API could be defined to be more deterministic to match the code, to make it possible to reliably avoid my race?

In my code the order that values get sent into a memory channel are important. Trio guarantees that a call to send() includes a checkpoint. But I don't think it defines whether or not such a checkpoint will occur before values are "serialized" into the channel, thus defining their order.

What I mean by "serialized": if two tasks write to a channel "concurrently", then the final ordering may go either way depending on the order in which tasks are scheduled, but at some point that order becomes determined.

If send() were to do this serialization before any checkpoint, then the caller can, with some care, precisely control the resulting ordering by managing its own checkpoints. If send() were not to guarantee this, then I think this level of control becomes impossible. I might try to work around this by coalescing the values myself, but the obvious way to do so would be to use a memory channel, at which point I'd be back to where I started with the same problem.

Looking at the implementation, I think it does happen to be the case that if the call is not cancelled, no checkpoint will occur before serialization. However, if this changes it will break me. So could this be defined to be the case in the API, please?

For example, consider the following contrived code:

import trio

send_channel, receive_channel = trio.open_memory_channel(0)

last_value_sent = None
all_values_received = []

async def receive_values():
    for i in range(2):
        value = await receive_channel.receive()
        all_values_received.append(value)

async def send_value(value):
    global last_value_sent
    last_value_sent = value
    await send_channel.send(value)

async def test():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(receive_values)
        nursery.start_soon(send_value, 1)
        nursery.start_soon(send_value, 2)

    assert last_value_sent == all_values_received[-1]

trio.run(test)

The assertion passes in practice, I think due to Trio's current implementation, but I don't think that Trio currently guarantees that this won't change. If the send_channel.send() call were to checkpoint immediately when it is called, then it's possible that the values would come out of receive_channel in the opposite order from which I sent them, depending on the order in which the two tasks resumed following their respective checkpoints. Then last_value_sent would mismatch the order in which the values come out of the channel.

In my real world use case, I am sending values into a channel, but also processing those values immediately. I would like to ensure that the ordering of the values coming out of the channel is the same as the order in which I processed them. I can do this by carefully considering checkpoints - ensuring that there is no checkpoint between processing a value and sending it into the channel. However, this depends on Trio's currently undefined behaviour of the guaranteed checkpoint never occuring before the ordering.

My request is therefore:

  1. Document that values will come out of the channel in the same order that send() calls are awaited (which implies that the guaranteed checkpoint will occur only after the ordering is recorded).

  2. Consider it API breakage if this changes in the future.

Thank you for reading this far to consider this!

@belm0
Copy link
Member

belm0 commented Apr 5, 2021

The test code is assuming that tasks are scheduled in start_soon() order, which isn't guaranteed.

Perhaps the test could be clarified by using testing.Sequencer.

You haven't mentioned whether your memory channel is bounded. If it is, certainly it can become full, at which point multiple tasks could become blocked on send(), and the order in which they'll be rescheduled is arbitrary.

@basak
Copy link
Member Author

basak commented Apr 6, 2021

The test code is assuming that tasks are scheduled in start_soon() order, which isn't guaranteed.

No, it doesn't assume this. I think you misunderstand. The test code assumes that there is no checkpoint between last_value_sent = value and the send() call fixing its ordering. It doesn't matter which order the multiple send_value() invocations begin.

Perhaps the test could be clarified by using testing.Sequencer.

I can't do this without injecting Sequencer calls into Trio's send() implementation itself. It isn't my test code that could run in a different order that's the problem. It's that the memory channel send() call might checkpoint, and therefore insert items into the channel in a different order than my appends into all_values_received my bindings into last_value_sent. I say "might" because it isn't defined that it won't, even though it doesn't in the current implementation.

You haven't mentioned whether your memory channel is bounded.

This doesn't matter. It's the order in which the send() calls are awaited that is important; the order in which they complete is not.

If it is, certainly it can become full, at which point multiple tasks could become blocked on send(), and the order in which they'll be rescheduled is arbitrary.

On the contrary, I think it is well-defined in the current implementation, just not the API. In https://github.com/python-trio/trio/blob/master/trio/_channel.py#L170, self._state.send_tasks is an OrderedDict (from https://github.com/python-trio/trio/blob/master/trio/_channel.py#L96). The value is placed there before a checkpoint occurs (except in the early cancellation case, which isn't relevant to successful channel insertion). Therefore, I conclude that in the current implementation, values will come out of the channel in the same order as send() is awaited, regardless of whether the task gets blocked or not.

@belm0
Copy link
Member

belm0 commented Apr 6, 2021

It doesn't matter which order the multiple send_value() invocations begin.

I see it now, sorry for the noise.

In my code the order that values get sent into a memory channel are important.

It would help to clarify your use case.

Since you can't control the order that sending tasks are scheduled in the first place, it seems odd to care about the channel ordering of the concurrent send() calls. Unless you're additionally storing your sequence of send() calls out-of-band somewhere-- so it would help to have the whole picture.

@basak
Copy link
Member Author

basak commented Apr 6, 2021

It would help to clarify your use case.

I have an async "value wrapper" that represents something in the real world that can change its value. It has an immediate "what's its value now" property which you can retrieve non-async. It also provides an async iterable you can use to follow value changes. Value changes are sent through a memory channel. It is expected that the "what's its value now property" will always match the most recent value sent through the async iterable (after outstanding tasks have settled). However, if I can't control channel ordering of previous concurrent send() calls, then I can end up in a state where the "what's its value now property" doesn't match the final value sent through the async iterable because the final two values sent were inverted in order.

Unless you're additionally storing your sequence of send() calls out-of-band somewhere...

If I understand what you mean here, that's exactly what I'm doing - I'm storing the last value I sent in a property, just as I'm storing into last_value_sent in my example above. The reason is that it's an abstraction for the purposes of an API layered on top of Trio. It's convenient to just get the most recent value, since this is a really common case. Other times, I want to follow a value as it changes. If they don't match up, I'm going to get weird edge case bugs.

@belm0
Copy link
Member

belm0 commented Apr 6, 2021

I have an async "value wrapper" that represents something in the real world that can change its value. It has an immediate "what's its value now" property which you can retrieve non-async. It also provides an async iterable you can use to follow value changes

Have you looked at trio-util AsyncValue? (Disclosure: I wrote it.) It hasn't had a great story for iteration up until now, but in the next few days I'll be adding our eventual_values() iterator (see groove-x/trio-util#8 (comment)).

However, if I can't control channel ordering of previous concurrent send() calls, then I can end up in a state where the "what's its value now property" doesn't match the final value sent through the async iterable because the final two values sent were inverted in order.

Thank you for explaining. Some things to consider:

  • do you really want the iterable to relay all values? What happens if a consumer is slower than producer over long time periods and thus continuously behind?
  • is memory channel really needed or suitable for this? Memory channel doesn't support broadcast.

@basak
Copy link
Member Author

basak commented Apr 6, 2021

Have you looked at trio-util AsyncValue? (Disclosure: I wrote it.)

I discovered it fairly recently! My model is quite a bit older, but I have yet to publish it. There's quite a bit that I'm doing that AsyncValue doesn't currently do.

do you really want the iterable to relay all values? What happens if a consumer is slower than producer over long time periods and thus continuously behind?

Yes, that's my intention. I'm currently experimenting with using 0-size memory buffers, so everything provides backpressure. So far, it works in practice for me. It does mean that consumers are required to keep up, or they slow the producer down. But in my case (home automation), my server is always faster than the house :)

is memory channel really needed or suitable for this? Memory channel doesn't support broadcast.

I have created a "pubsub" type abstraction that relies on memory channels to work. Hence my need for deterministic ordering in memory channel send() calls.

If you're interested I'm happy to share more about what I'm doing, seeing as you have similar ideas about abstractions. I don't have anything written up though as my experimentation continues, and it's at quite a slow pace (my code history goes back to 2016, originally in asyncio). Maybe a realtime chat would be better?

Anyway, I hope that explains this particular issue :)

@belm0
Copy link
Member

belm0 commented Apr 6, 2021

So far, it works in practice for me. It does mean that consumers are required to keep up, or they slow the producer down. But in my case (home automation), my server is always faster than the house :)

I would think eventual consistency is more suitable for this domain most of the time. For example, you only care about the current state of the light switch, door latch, etc.-- not whether there was a transition 5 seconds ago.

I have created a "pubsub" type abstraction that relies on memory channels to work

(trio-util AsyncValue certainly supports pubsub, via predicates in all the wait functions and iterators.)

You may have thought it out well already to get here, but I'd just look again whether memory channel is the right thing for this. Have you seen this long thread on broadcast channels? #987

@belm0
Copy link
Member

belm0 commented Apr 6, 2021

Have you seen this long thread on broadcast channels? #987

To summarize it lightly, there's all the big thinkers in Trio pondering broadcast, and the one thing everyone agrees on is not to implement it with memory channels...

@basak
Copy link
Member Author

basak commented Apr 6, 2021

I would think eventual consistency is more suitable for this domain most of the time. For example, you only care about the current state of the light switch, door latch, etc.-- not whether there was a transition 5 seconds ago.

Not all of the time, though. For example, when the motion sensor switches to True and then later to False, a notification should be guaranteed to go out whether or not the system was busy through that transition. If, further down the line and in particular circumstances, we want to drop some messages when they can't be processed in time, or exceed some particular rate, that's easy to do. But since there are cases where skipping transitions is unacceptable, the underlying abstraction must support being lossless.

Have you seen this long thread on broadcast channels? #987

I hadn't, no. Thank you for the link. I'll read up.

Anyway, could we stick to this issue in this issue please? I'd be happy to chat more but I'd appreciate doing it somewhere else.

@basak
Copy link
Member Author

basak commented Apr 6, 2021

I'd be happy to chat more but I'd appreciate doing it somewhere else.

I moved to gitter.

@belm0
Copy link
Member

belm0 commented Apr 6, 2021

Not all of the time, though. For example, when the motion sensor switches to True and then later to False, a notification should be guaranteed to go out whether or not the system was busy through that transition.

You could guarantee not to miss the transition if you have a task dedicated to watching it (by "it", I mean the transition event set synchronously by the async value's setter). Now if that task needs to do some blocking work in response, there is the question of whether you want to queue those particular transitions. Usually you don't-- but if you did, just have the receiver append events to a list or memory channel synchronously.

Anyway, could we stick to this issue in this issue please? I'd be happy to chat more but I'd appreciate doing it somewhere else.

"What is the use case" is relevant, and from experience Trio's author would ask the same. If something is unspecified in the API, it may be because depending on it is too low of a level and breaks Trio's intended abstraction.

@basak
Copy link
Member Author

basak commented Apr 6, 2021

"What is the use case" is relevant, and from experience Trio's author would ask the same.

Sure, but I think I've made my use case clear. I think we've crossed the point where we're now discussing design decisions in my code that uses Trio in a way that's no longer really relevant to this issue. You said earlier "you're additionally storing your sequence of send() calls out-of-band somewhere" and I think that's exactly what I'm doing. This is the crux of the issue for this API design question in Trio. Is this something that should be supported, or not? Whether or not a memory channel is appropriate to implement a broadcast mechanism is I think besides the point.

@basak
Copy link
Member Author

basak commented Apr 6, 2021

the one thing everyone agrees on is not to implement it with memory channels...

I've read that issue carefully now, and I don't see this agreement anywhere. Where are you forming this impression from?

In any case, even if a broadcast channel were implemented some other way, I'd still need the ordering guarantee that I'm asking for here.

@Trionista
Copy link

Trionista commented Feb 13, 2022

Current implementation guarantees that the order is always preserved.

When the buffer is not full, sending is basically synchronous and relies on deque.append squeezed between two checkpoint halves.

When the buffer is full, send adds the task to OrderedDict by ...[task] = value (preserving order) and receive_nowait gets the value by accessing the value directly from (task, value) tuple returned by OrderedDict.popitem(last=False) - which also keeps the order. Those two execution paths bypass the scheduler completely - the second half of checkpoint is never called in send() in this case (it's deferred to wait_task_rescheduled) and is called in receive() only after getting the value from receive_nowait().

It may look very fragile, but actually must always behave this way to keep the cancellation semantics (as described here and here) - every execution path must pass through exactly one schedule point.

Theoretically speaking, second half of checkpoint could for example be called before getting the value from the OrderedDict in receive(), but it would require an ugly rewrite of the code so that receive() wouldn't call receive_nowait(), but mess with the OrderedDict directly. Such breakage could not be done, however, in send(), because wait_task_rescheduled() is itself a schedule point - so we can't insert second half of checkpoint before that (we would then have two schedule points in one execution path).

@belm0
Copy link
Member

belm0 commented Feb 13, 2022

Thank you @Trionista. Remaining question: does it warrant stating in the docs and/or unit tests?

@Trionista
Copy link

Trionista commented Feb 14, 2022

This could be difficult to grasp in unit tests, since the only way this could break is via race condition in the scheduler.

Actually, now this got me thinking - even though it's hard to break this behaviour via scheduling points without breaking scheduling and cancellation semantics (as described in my last paragraph above), there is an easier way. Nobody said that the sending tasks must be stored in OrderedDict - if this were a set instead, order would be quickly lost after encountering full buffer. I can even imagine implementing this thing with trio.Lock - and in that case we wouldn't have guaranteed order as well (see: trio.StrictFIFOLock).

So yes, it should be stated in the documentation after all (possibly with describing all the quirks related to scheduling/cancellation semantics as well).

Since this is a channel we're talking about, which I imagine as a (long) pipe of values, it would be weird to not have the order preserved, so I don't think this even needs to be reconsidered. Especially since memory channels implement SendChannel/ReceiveChannel abstract base classes, which in docs are right next to Streams - that, obviously, must guarantee the order of the bytes inside. And since these are abstract classes that users are encouraged to subclass if necessary, that's even more reason to have that all clearly described in the docs, to keep that behaviour as part of the contract.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants