Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast channels #987

Open
njsmith opened this issue Mar 26, 2019 · 29 comments
Open

Broadcast channels #987

njsmith opened this issue Mar 26, 2019 · 29 comments

Comments

@njsmith
Copy link
Member

njsmith commented Mar 26, 2019

[This is a speculative discussion. I don't know if we should do anything here. But I was thinking about it so I figured I'd write it down so it doesn't get lost.]

There are two kinds of multi-consumer channels: the kind where each item is distributed to one receiver (e.g., by round-robin), and the kind where each item is distributed to all receivers ("broadcast"). We already have good support for the former, via memory channels. Implementing broadcast is trickier, though, because you need to pick some policy for handling the case where consumers run at different speeds. I guess the basic options are:

  • Apply backpressure based on the slowest consumer: this is appropriate if all the receivers are working together in some sense, and there's no point in letting one consumer get too far ahead of the others. It's straightforward to implement by using a collection of memory channels with bounded buffers, like:

    async def broadcast_send(self, value):
        for channel in self._channels:
            await channel.send(value)
  • Apply backpressure based on the faster consumer: this is intuitively tempting in cases where the receivers are unrelated, but it requires unbounded buffer space, so... not necessarily a good idea. It can be approximated by re-using our broadcast_send from the previous bullet point, but with infinite buffers. But, this disables backpressure entirely, rather than applying backpressure based on the fastest receiver. Doing this precisely would require some more work. I'm not sure if it's ever useful.

  • Instead of applying backpressure, drop messages: The intuition here is that one common case is where you have a value that updates occasionally, and subscribers want to keep track of the latest version. However, if multiple updates happen in quick succession while a subscriber isn't looking, then they only care about the latest value. One possible API:

    class BroadcastValue(Generic[T]):
        def set(self, value: T) -> None:
            ...
    
        def subscribe(self) -> ReceiveChannel[T]:
            ...

    The semantics of the ReceiveChannel would be: the first time you call receive (or __anext__), it immediately returns the current value, and stores some state in the channel object keeping track of what was the last value returned by this particular channel. After that, each call to receive waits until there's been at least one call to set (compared to the last time this channel returned a value), and then returns the latest value. So in practice, you'd write:

    async for current_value in broadcast_value.subscribe():
        print("Latest update:", current_value)

    and if there's a fast update rate, then the loop might skip over some intermediate values, but if the updates stop then the loop will always catch up with the latest value.

Are there any other interesting policies that I'm missing?

There's some discussion of related ideas in #637

@smurfix
Copy link
Contributor

smurfix commented Mar 26, 2019

A channel that drops the oldest enqueued message when a new one arrives (instead of blocking) would be a good building block to have.

The backpressure-on-the-fastest-receiver idea is interesting. But why would you need unbounded buffer space?

  • try to send_nowait to all receivers. If at least one accepts the message, you're done.
  • otherwise, add yourself to all receivers' parking lots and await being woken up (or cancelled of course).
  • Repeat (if not cancelled).

@oremanj
Copy link
Member

oremanj commented Mar 26, 2019

BroadcastValue is an excellent interface IMO. It could be built pretty easily on top of the "channel that drops the oldest enqueued message when a new one arrives" primitive described by @smurfix.

For the reliable delivery cases, one possible issue with "just send in an arbitrary order" is that it means consumers later in the delivery order have to wait for consumers earlier in it. Theoretically it seems like it might be better to do all the sends in parallel? Either using a nursery, or if that's too high overhead, using some select() ish thing. (Which I will get back to soon... dayjob work has been particularly high-intensity lately and it's not dovetailing as well with Trio work as it did a few months ago.)

@njsmith
Copy link
Member Author

njsmith commented Mar 26, 2019

A channel that drops the oldest enqueued message when a new one arrives (instead of blocking) would be a good building block to have.

I think it's worth distinguishing between two use cases:

  • There's a pattern for low-level APIs where you use a bounded capacity channel, and if it overflows then you drop the contents and enqueue a special message saying that some messages were lost. (Examples: inotify, posix real-time signals.) This is useful when you actually need the whole stream of messages, there's no way to apply backpressure, you have strictly bounded resources, and there is some (slow) way to recover after an overflow. It would be easy to implement this in Trio. But it's a tremendous pain to use these APIs correctly. You don't want to do it if you have any other choice. So it sounds simple but it's really a low-level experts-only kind of thing.

  • The broadcast value use case described above. In some ways this seems very similar to the bounded-queue with a particular drop policy, but it's much simpler to use and reason about, because it's designed for cases where it's totally fine to lose old messages. And if it's totally fine to lose old messages, then you might as well do it aggressively, and hard-code the buffer size to 1, and skip the overflow notification. Technically you could implement it on top of a discard-on-overflow channel, but it's simpler to implement directly.

I don't think we should mix these up. In particular, I'm skeptical that it ever makes sense to use a drop-oldest policy + buffer size >1... If you can afford to drop old messages, you might as well do it all the time, and exercise that case properly.

The backpressure-on-the-fastest-receiver idea is interesting. But why would you need unbounded buffer space?

If the slowest receiver is N messages behind the fastest receiver, then you need to buffer N messages. If you don't have any way for the slowest receiver to transmit backpressure, then N can grow without bound.

@njsmith
Copy link
Member Author

njsmith commented Mar 26, 2019

For the reliable delivery cases, one possible issue with "just send in an arbitrary order" is that it means consumers later in the delivery order have to wait for consumers earlier in it. Theoretically it seems like it might be better to do all the sends in parallel?

Yeah, I'm not sure it makes a huge difference – it's more-or-less equivalent to changing the buffer size by 1 – but doing the sends in parallel does feel more elegant.

@smurfix
Copy link
Contributor

smurfix commented Mar 27, 2019

The backpressure-on-the-fastest-receiver idea is interesting. But why would you need unbounded buffer space?

If the slowest receiver is N messages behind the fastest receiver, then you need to buffer N messages. If you don't have any way for the slowest receiver to transmit backpressure, then N can grow without bound.

Well, in that case, sure. I was thinking of the "we have more than one receiver and at least one needs to process the message while others can take it or leave it" case. I actually may need that for a distributed network test system: at least one receiver must run each test, while others may also do so if they have spare capacity.

@belm0
Copy link
Member

belm0 commented Jun 7, 2019

The semantics of BroadcastValue fits several instances in my existing codebase (notably most of the uses of Event.clear())-- but how about making the value optional? Often we just want to pass the value out of band.

With the value optional, the question becomes what to name it. BroadcastSignal()? No one wants to overload "signal"-- but that's what it is.

@njsmith
Copy link
Member Author

njsmith commented Jun 8, 2019

@belm0 I guess in general we want to encourage people to pass things in-band, because it makes the state tracking easier to reason about ("share memory by communicating" does have some merit). But if you wanted to reuse the machinery without any real state, I guess you could just increment a counter or something?

@belm0
Copy link
Member

belm0 commented Jun 11, 2019

Regarding BroadcastValue again-- I believe that direction has the most utility. However I'm thinking of merging this idea with the ValueEvent we have in our project. ValueEvent supports predicates for value transitions so the listener can choose what to be awoken with. The predicate feature is used a lot. Perhaps ValueEvent could be built on top of BroadcastValue, though I'm not sure because ValueEvent does other things (e.g. level triggering).

Roughly:

class BroadcastValue(Generic[T]):
    def set(self, value: T) -> None:
        ...

    def subscribe(self, predicate=None) -> ReceiveChannel[T]:
        """Yield value changes when predicate(value, old_value) is True.  Defaults to all changes."""
        ...

(In 90% of cases our predicates don't use old_value, and comparing new and old is somewhat dubious given that values may be dropped.)

@smurfix
Copy link
Contributor

smurfix commented Jun 11, 2019

I use a version of ValueEvent that's specifically not about broadcasting, but about passing exactly one value from one task to exactly one other task. Typical uses are passing a task's value back to the caller, or managing a server's async responses.

The crux of these ideas is that they are not cancellation safe, i.e. when that task is cancelled or otherwise exits before setting the value, how does the caller get notified? Conversely, when the caller dies, somebody please notify the callee that its work is no longer required.

@njsmith
Copy link
Member Author

njsmith commented Aug 10, 2019

Something that just occurred to me, though maybe it was obvious to everyone else: if broadcasting is about handling arbitrary numbers of consumers, then an important special case is when there are zero consumers. For BroadcastValue this case is trivial, but for the other approaches it's a bit less clear.

And the other thing any broadcast strategy has to think about is: can the set of consumers change over time, and if so then how do you handle consumers arriving and consumers departing? For BroadcastValue, again, this is obvious. For the backpressure-based strategies, it's not as clear.

There's a pattern for low-level APIs where you use a bounded capacity channel, and if it overflows then you drop the contents and enqueue a special message saying that some messages were lost. [...] But it's a tremendous pain to use these APIs correctly. You don't want to do it if you have any other choice.

I guess there might be one high-level case where this style could be useful, and that's logging. Maybe if your log consumer can't keep up, you want to just discard messages, but add a note saying that that's what you're doing.

But I feel like this still has a lot of subtleties that I don't fully understand.

If you're logging over something like UDP, then this isn't an issue – the transport layer automatically handles message boundaries and discarding messages on overflow. So I guess this is mostly useful for when you're logging over a bytestream, like TCP or stdout. If you're logging over a bytestream, then you need a task to handle the bytestream and mediate access, because otherwise you'll get garbled messages if a task gets cancelled while it's in the middle of logging. So you want something like an in-memory channel, with multiple producers who probably only want to call synchronous methods, and a single consumer. And the channel needs a buffer, plus a flag to indicate overflow, and whenever we consume from the channel we need to do some delicate handling to make sure the "messages dropped" indication gets recorded at the right point in the message stream.

That last part makes things slightly awkward, but I guess we can do it with a object that encapsulates a MemoryChannel plus a flag, where logging is:

  • try to send_nowait
  • if that gets WouldBlock, set the flag instead

And the consumer does:

  • block waiting for a message
  • once one message arrives, immediately use receive_nowait to pull out all the messages
  • and then pull out the flag, inserting it into the message stream immediately after all the messages we just pulled out

... But this has a bug. Consider this sequence:

  • the channel fills up
  • the overflow flag gets set
  • the consumer calls blocking receive, which of course wakes up immediately, but still executes a checkpoint
  • while the consumer is waiting to be rescheduled, another message gets logged into the open slot that was created by receive
  • the consumer is rescheduled and follows the algorithm above

In this case the final logs look like:

[message] [message] [new message] [overflow indication]

But they should look like:

[message] [message] [overflow indication] [new message]

So this is interesting. I'm not sure how important this is and there are certainly lots of ways to solve it (e.g. it wouldn't be too hard to create a custom LogChannel from scratch instead of trying to compose it out of existing pieces). But I'm always intrigued when we find a case that our abstractions don't make easy, because maybe it's a clue pointing to better abstractions.

Some ideas for fixing this situation:

  • I think it might be enough to modify the consumer so that before it calls receive, it first tries receive_nowait? This is super subtle though, if it works at all.

  • we could add some way to "force" a write to a memory channel even when its buffer is full, and then use this to write a "overflow" message instead of setting a flag. But we'd also need some way to peek at the end of the existing buffer, so if there's already an overflow message in there we skip adding a new one.

... None is this is seeming super appealing. Maybe a custom LogChannel is the best option.

@smurfix
Copy link
Contributor

smurfix commented Aug 11, 2019

Simple solution to the message flag problem: teach the sender to attach a sequence number to each message.

I'd rather not over-engineer this. A broadcast on UDP is a lossy transaction that doesn't have any loss indication and might even suffer from message reordering. If we add a broadcast channel that doesn't have these two faults it's a strict improvement.

Beyond that we have some incompatible design requirements (e.g. does a lossy receiver still receive the messages before the overflow occurred, given that it needs to re-sync anyway?) which remind me of the multiple choices we have for doing a Future/EventWithValue object, which is why we still don't have the latter either. :-/

@njsmith
Copy link
Member Author

njsmith commented Sep 20, 2019

Simple solution to the message flag problem: teach the sender to attach a sequence number to each message.

OK yeah that's probably better than all that stuff I wrote up above :-)


Returning to BroadcastValue: I've seen a few cases recently where I wanted something like this, but without actually transmitting any value. (E.g. in design sketches for snekomatic, where I want to notify background tasks "hey I just changed the database, you should check it again").

I'm thinking, the basic structure of BroadcastValue, where it internally keeps a counter, and each consumer keeps its own snapshot of the counter, to prevent lost updates – that's a basic primitive, that covers a lot of the use cases for Condition but in a less error-prone way. I think it also covers a lot of the "blinker" use cases where people often reach for Event.clear and end up with bugs due to lost updates, that we discussed in #637. Adding an associated value that gets sent along with each update is a nice add-on, not definitely not necessary. And speaking of counters, maybe we should expose the counter, in case people want to detect lost updates?

So I'm not quite sure how we should organize all these ideas into, like, Python classes.

One option would be to pack it all into one class, that you can use like:

# Simple value broadcasting:
broadcast_value.set(...)

async for generation, value in broadcast_value:
    if generation != last_generation + 1:
        print("missed some updates")
    print(value)

# Representing an outside value, like a database:
broadcast_value.notify()  # bumps generation counter without changing value

async for _, _ in broadcast_value:
    print("database was updated")

This might be trying to jam too many ideas into a single class though, and end up making it more confusing than if we had multiple classes? And I'm not really sure the generation counter is useful. Really the main reason I put it in the sketch is that notify only makes sense if you understand that there's a generation counter involved, and that's easier to explain if the counter is an explicit value that users can see and experiment with, compared to if it's a piece of hidden implicit state.

I also wonder about whether we should expose a way to inspect the current value and/or generation counter directly, without subscribing. Is that helpful, or an invitation to subtle race conditions due to the values being out of sync with subscribers?

@belm0
Copy link
Member

belm0 commented Sep 25, 2019

I had this race condition recently with code using trio_util.AsyncValue:

while True:
    if x.value == A:
        ...
    elif x.value == B:
        ...
    else:
        await x.wait_transition()

I.e. missed updates (here just during scheduling time) can cause the await to block indefinitely even though value is A or B.

@belm0
Copy link
Member

belm0 commented Sep 25, 2019

I.e. missed updates (here just during scheduling time) [...]

It occurred to me that this particular race is somewhat tied to Python's implementation of async/await, and can be worked around with overexertion.

In Python, the body of an async function isn't entered until the coroutine is scheduled and run. So in my previous example, even though by logic we know that x can't be A or B at the time of await, it may be a different value by the time the coroutine is run. Contrast this with C#, where I believe await will run the async function immediately up until the first context switch.

My first attempt to work around this involved a wrapper function that returns the coroutine. The wrapper function can access the current value immediately and pass this to the async function. But it doesn't scale, because the wrapper will be ineffective when used in any API which defers async function calls (start_soon(), etc.).

So then I'm left with passing the immediate value explicitly:

while True:
    if x.value == A:
        ...
    elif x.value == B:
        ...
    else:
        await x.wait_transition(initial=x.value)

(Of course wait_transition will still not reflect x making a transition like C -> A -> C before the coroutine is run, but that doesn't affect this use case.)

@njsmith
Copy link
Member Author

njsmith commented Sep 26, 2019

In Python, the body of an async function isn't entered until the coroutine is scheduled and run. So in my previous example, even though by logic we know that x can't be A or B at the time of await, it may be a different value by the time the coroutine is run. Contrast this with C#, where I believe await will run the async function immediately up until the first context switch.

I think Python and C# are actually the same in this regard: when you do await blah(), then Python does run the async function immediately up until the first context switch. (I think this is the same issue that was catching you here?) Of course if you use nursery.start_soon(blah) then that's different. But await blah() is really like a normal function call; it's just that it has the option of eventually calling one of the primitive checkpoint functions like wait_task_rescheduled.

I spent a few minutes looking at the AsyncValue source, and I didn't spot any issues that could lead to missed wakeups... not sure what to tell you there.

await x.wait_transition(initial=x.value)

This looks like it's moving in the same direction as the BroadcastValue idea up above: the key idea there is that when you write async for new_value in ..., the iterator can internally hold onto some state for this specific loop – in particular, the iterator can know which value it yielded last, and use that to do something clever on the next loop iteration, without the user having to keep track explicitly.

@belm0
Copy link
Member

belm0 commented Sep 26, 2019

when you do await blah(), then Python does run the async function immediately up until the first context switch

@njsmith thank you for correcting me. So just to ingrain this: in the following code, no other task can get a time slice between await foo() and print()?

async def foo():
    print('hi')
    await trio.sleep(1)

...

await foo()

It seems that #987 (comment) shouldn't have a race then. I'll have to confirm more carefully one way or the other.

@njsmith
Copy link
Member Author

njsmith commented Sep 26, 2019

in the following code, no other task can get a time slice between await foo() and print()?

Correct.

@belm0
Copy link
Member

belm0 commented Oct 6, 2020

I've found an important subtlety in how you may want BroadcastValue to behave on overflow.

As long as a given subscriber is able to keep up with broadcasted values, queueing is desirable. For example, a queue size of 2 is sufficient to ensure that no values are dropped as long as 1) the subscriber body is synchronous, and 2) set() is not invoked more than once per round of the Trio scheduler. A queue size of 3 will allow up to 2 invocations per frame (say, if you had two tasks that could potentially call set()), etc.

However, if the subscriber is slow, the queueing is not desirable. For example, given queue size of 3, if set() was called every 100 ms, while the subscriber loop took 200 ms, the subscriber would always be working with values that are 300 ms old. Rather it should be using the latest value.

I addressed it as follows (note that the subscribe() here does not emit the current value):

class FrontRunnerBroadcastValue:
    """
    Broadcast channel that skips to the most recent value when a subscriber's
    queue has overflowed.

    Rationale: as long as a subscriber keeps up with new values, we want to use
    the full queue depth (currently 2, which is sufficient for up to one event
    per scheduler frame).  In contrast, a slow subscriber should always get the
    latest value, so the queue is effectively bypassed.
    """

    def __init__(self):
        self.event = ParkingLot()
        self.values = deque(maxlen=2)  # (generation, value), newest on the right
        self.head_generation = 0

    def broadcast(self, value):
        self.head_generation += 1
        self.values.append((self.head_generation, value))
        self.event.unpark_all()

    async def subscribe(self):
        last_generation = self.head_generation
        while True:
            if last_generation == self.head_generation:
                await self.event.park()
            # Emit the oldest value having a newer generation as long as no
            # values have been missed.  Otherwise emit the latest value.
            # NOTE: this implementation is only efficient for short queue lengths
            for generation, value in self.values:
                if generation == last_generation + 1 or \
                        generation > last_generation and generation == self.head_generation:
                    yield value
                    last_generation = generation
                    # break, since self.values may have mutated during yield
                    break

@belm0
Copy link
Member

belm0 commented Oct 8, 2020

Further refinement of broadcast values, disambiguating these queue uses:

  1. queueing while waiting for the generator to awake for the next value
  2. queueing while the subscriber body is blocked

I think they should be separated. (1) is set according to how many broadcasts you'd like to handle per scheduler pass (assuming your subscriber body is synchronous and can consume them all), while (2) is set according to the desired behavior when the consumer is slow. For (2), I've only found reasonable use cases for queueing 0 or 1 items. For >=2, it's time to use a memory channel with backpressure instead.

EDIT: don't use this-- there are bugs, and see subsequent comment by njsmith

class FlexibleBroadcastValue:
    """
    In-memory channel with synchronous broadcast (no backpressure)

    There are two parameters that control queueing:

        * max_broadcasts_per_scheduler_pass (default: 1)- a constructor option
          that limits how many broadcasts can be handled within one scheduler
          pass without dropping values.  This is only significant when the
          subscriber body is synchronous, and so can catch up atomically.
          (Detail: this queueing addresses when more than one item arrives
          before the generator itself awakes from blocking on the next value.)

        * blocked_queue_size (default: 0) - a per-subscriber option that limits
          how many values to queue while the subscriber body is blocked.  Sizes
          greater than 1 are questionable-- consider using a Trio memory
          channel having proper backpressure instead.
    """

    def __init__(self, *, max_broadcasts_per_scheduler_pass=1):
        self.event = ParkingLot()
        # deque of (generation, value), newest on the right
        self.values = deque(maxlen=2 * max_broadcasts_per_scheduler_pass)
        self.head_generation = 0

    def broadcast(self, value):
        self.head_generation += 1
        self.values.append((self.head_generation, value))
        self.event.unpark_all()

    async def subscribe(self, *, blocked_queue_size=0):
        assert blocked_queue_size <= self.values.maxlen
        # The trick to this loop is its lack of any blocking call unless the
        # subscriber is caught up.  If the subscriber body is synchronous as
        # well, it means the entire queue content can be relayed atomically.
        last_generation = self.head_generation
        while True:
            if last_generation == self.head_generation:
                await self.event.park()

            for generation, value in self.values:
                if generation > last_generation:
                    last_generation = generation
                    pre_yield_generation = self.head_generation
                    yield value
                    # If the head value changed, it implies that the subscriber
                    # blocked.  Propagate a maximum of blocked_queue_size values
                    # during that time and break (since the collection we're
                    # iterating has mutated).
                    if self.head_generation > pre_yield_generation:
                        last_generation = max(last_generation,
                                              self.head_generation - blocked_queue_size)
                        break

@njsmith
Copy link
Member Author

njsmith commented Oct 8, 2020

In general, I would discourage everyone from writing code that depends on assumptions about the details of individual ticks and how Trio's scheduler processes them. This is not part of Trio's stable ABI.

@goodboy
Copy link
Member

goodboy commented Apr 5, 2021

@belm0 is this FrontRunnerBroadcastValue code published and/or refined elsewhere?

I currently have a need for it (or something similar) in tractor.

@belm0
Copy link
Member

belm0 commented Apr 5, 2021

@goodboy I gave up on the quasi-queueing of FrontRunnerBroadcastValue

However in our local version of trio-util, AsyncValue now has an eventual_values() iterator. If you're just needing eventual consistency that will do it.

I'm planning to commit that soon to the public repo.

update: now published (https://trio-util.readthedocs.io/en/v0.5.0/#trio_util.AsyncValue.eventual_values)

@basak
Copy link
Member

basak commented Apr 6, 2021

Thanks to @belm0 for pointing me to this issue. I've been doing broadcast messaging (slowest consumer backpressure, lossless) for a long time, originally with asyncio, then converted to Trio. I put my implementation in https://gist.github.com/basak/007da3fc2448300d037c9bb008cc5e80. It works well for me in practice.

I filed #1949 because I noticed a theoretical problem today. However I believe Trio's implementation as it is today is safe from that issue.

@goodboy
Copy link
Member

goodboy commented Apr 6, 2021

@basak @belm0 thanks for both links; will report back when we finally get to adding a more formal api for this on IPC streams.

@njsmith
Copy link
Member Author

njsmith commented Apr 12, 2021

@goodboy
Copy link
Member

goodboy commented Aug 9, 2021

We have what appears to be a tokio style implementation of the fastest consumer style as a broadcast_receiver() wrapper around a trio.abc.ReceiveChannel.

It's extremely nascent and in prototype testing atm; see the link above.

@goodboy
Copy link
Member

goodboy commented Sep 2, 2021

About to land goodboy/tractor#229 but more then happy to contribute all of the tractor._broadcast.py module to trio core if there's interest.

Pretty happy with the performance and ease of use around our IPC streaming api 😎
Also looks like this api might work well for wrapping shared-mem style array / ring buffer stuff what with the "rolling window of values" and all.

NB: it's fastest consumer style just like tokio.

@A5rocks
Copy link
Contributor

A5rocks commented Sep 19, 2021

Elixir's GenStage seems to backpressure on slowest consumer: https://hexdocs.pm/gen_stage/GenStage.BroadcastDispatcher.html

It's not exactly a channel persay, but it's a thing where you can have separate "stages" -- each can either be a producer, a producer-consumer, or a consumer. A consumer will send demand up to its producer whenever it is done processing (I think). That's it, I think?

I'm not sure how well this strategy works out, but GenStage is pretty much in the standard library for Elixir -- it's even developed on the Elixir org :P

@DavidBuchanan314
Copy link

There's a fourth policy you might want to use, adjacent to "instead of applying backpressure, drop messages" - but instead of dropping individual messages, you drop the whole connection to the consumer (the consumer may then decide to reconnect, or give up entirely).

Consider a consumer that is synchronising some state from the producer, with each event being a "diff" against the prior state. If just a single event was dropped, any future events would be useless to the consumer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants