Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio-style broadcast channels #229

Merged
merged 32 commits into from
Sep 3, 2021
Merged

tokio-style broadcast channels #229

merged 32 commits into from
Sep 3, 2021

Conversation

goodboy
Copy link
Owner

@goodboy goodboy commented Aug 8, 2021

Literally the most prototype thing ever actually decently well tested now, but based on discussion in:

Nothing more then a runnable module atm..

Some interesting notes:

Update:

before i lose this thought...

If we wanted to also support slowest consumer style, I'm pretty sure there's an easy hack to just inherit and proxy through reads to trio._channel.MemoryChannelState.data (presuming the queue size to broadcast_receiver() and the underlying mem chan's deque are the same size).

The thinking is, there's a length check to determine if the sender should block in MemorySendChannel.send_nowait():
https://github.com/python-trio/trio/blob/master/trio/_channel.py#L147

If .data here proxied to lambda: max(self._subs.values()) and the order of these branches were reversed, it might just work no?

        elif len(self._state.data) >= self._state.max_buffer_size:
            # causes ``.send()`` to block
            raise trio.WouldBlock
        else:
            self._state.data.append(value)

Follow up:

  • dropped all the clones stuff, bleh; a big source of errors and confusion imo
  • probably should just use id(self) for .key instead of uuid4()?
  • probably still need a couple tests for trying to (re)open a subscription from a now closed caster
  • need a test for the case where ._recv() is cancelled before the first ask in receives a value and that ensure that no other event queued tasks miss that value and/or re-get their last pulled value
  • make an issue detailing the slowest-consumer style notes from above. -> Slowest consumer broadcasting? #235

tractor/_live_from_tokio.py Outdated Show resolved Hide resolved
tractor/_live_from_tokio.py Outdated Show resolved Hide resolved
# their latest available value.
for sub_key, seq in self._subs.items():

if key == sub_key:
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe do a ._subs copy and pop instead ?

tractor/_live_from_tokio.py Outdated Show resolved Hide resolved
tractor/_live_from_tokio.py Outdated Show resolved Hide resolved
@goodboy goodboy force-pushed the live_on_air_from_tokio branch from 397b34b to 59354b4 Compare August 8, 2021 22:50
@goodboy goodboy changed the base branch from infect_asyncio to master August 8, 2021 22:54
tractor/_live_from_tokio.py Outdated Show resolved Hide resolved
@goodboy
Copy link
Owner Author

goodboy commented Aug 8, 2021

There. I think this is ready to start adding tests.

I'll leave it for a couple hours to see if any of you lurkers complain 😂

tractor/_broadcast.py Outdated Show resolved Hide resolved
@goodboy goodboy force-pushed the live_on_air_from_tokio branch from 8a58df3 to 898756a Compare August 9, 2021 20:51
# ``AsyncResource`` api.
await clone.aclose()

# TODO:
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last set of design questions i think?

tractor/_broadcast.py Outdated Show resolved Hide resolved
tractor/_broadcast.py Outdated Show resolved Hide resolved
# don't decerement the sequence # for this task since we
# already retreived the last value
subs.pop(key)
for sub_key, seq in subs.items():
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there some fancier-faster way to do something like this?

else:
await self._value_received.wait()

seq = self._subs[key]
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we drop all of these sequence # checks?

@goodboy goodboy force-pushed the live_on_air_from_tokio branch from 898756a to 2bfea0b Compare August 9, 2021 21:08
@goodboy
Copy link
Owner Author

goodboy commented Aug 10, 2021

lol, yeah so since moving to the "clones" version the actual N copies consistency was broken obviously because the ._value_received event wasn't common to all clones 😂

Fixed this and now am adding a "broadcast state" type much like default mem chans.

@goodboy goodboy changed the title Ultra naive broadcast channel prototype tokio-style broadcast channel prototype Aug 10, 2021
@goodboy goodboy mentioned this pull request Aug 10, 2021
3 tasks
@goodboy
Copy link
Owner Author

goodboy commented Aug 10, 2021

Yah so binance feeds proved that the assumption of slept consumers waking up can expect to read the 0 index is definitely false 😂

We can maybe try this down the road if we invert the indexing?
I gotta think about it.

if self._broadcaster is None:
self._broadcaster = broadcast_receiver(
self,
self._rx_chan._state.max_buffer_size,
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This presumes a trio.MemoryReceiveChannel which is not ideal, so what would be the best default here and how would we reflect it in typing?

# of the the ``BroadcastReceiver`` before calling
# this method for the first time.

# XXX: why does this work without a recursion issue?!
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was broken actually, but is now fixed by adding support for passing into BroadcastReceiver a recv_afunc kwarg that is by default the underlying's .receive() if not provided.

This makes it easy to wrap receive channel apis for use in task broadcasting without expecting any initial consumer-task of the underlying to also use a broadcast receiver instance when calling .receive(), it however does mutate the machinery to put the broadcaster behind the scenes as it were.

@goodboy
Copy link
Owner Author

goodboy commented Aug 19, 2021

Oh right, to use typing.Protocol we need 3.8+. I wonder is this incentive enough to drop 3.7 for next alpha?

@goodboy goodboy changed the title tokio-style broadcast channel prototype tokio-style broadcast channels Aug 19, 2021
@goodboy goodboy marked this pull request as ready for review August 19, 2021 18:00
@goodboy goodboy force-pushed the live_on_air_from_tokio branch 2 times, most recently from 7a3a587 to 2eb7715 Compare August 19, 2021 18:51
retries = 3
size = 100
tx, rx = trio.open_memory_channel(size)
brx = broadcast_receiver(rx, size)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm almost tempted to enforce this as an async context manager, something I kinda wish open_memory_channel() did as well.


async with brx.subscribe() as lbrx:
while True:
# await tractor.breakpoint()
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove.

For every set of broadcast receivers which pull from the same producer,
we need a singleton state for all of,
- subscriptions
- the sender ready event
- the queue

Add a `BroadcastState` dataclass for this and pass it to all
subscriptions. This makes the design much more like the built-in memory
channels which do something very similar with `MemoryChannelState`.

Use a `filter()` on the subs list in the sequence update step, plus some
other commented approaches we can try for speed.
Add `ReceiveMsgStream.subscribe()` which allows allocating a broadcast
receiver around the stream for use by multiple actor-local consumer
tasks. Entering this context manager idempotently mutates the stream's
receive machinery which for now can not be undone. Move `.clone()` to
the receive stream type.

Resolves #204
This allows for wrapping an existing stream by re-assigning its receive
method to the allocated broadcaster's `.receive()` so as to avoid
expecting any original consumer(s) of the stream to now know about the
broadcaster; this instead mutates the stream to delegate to the new
receive call behind the scenes any time `.subscribe()` is called.

Add a `typing.Protocol` for so called "cloneable channels" until we
decide/figure out a better keying system for each subscription and
mask all undesired typing failures.
Get rid of all the (requirements for) clones of the underlying
receivable. We can just use a uuid generated key for each instance
(thinking now this can probably just be `id(self)`). I'm fully convinced
now that channel cloning is only a source of confusion and anti-patterns
when we already have nurseries to define resource lifetimes. There is no
benefit in particular when you allocate subscriptions using a context
manager (not sure why `trio.open_memory_channel()` doesn't enforce
this).

Further refinements:
- add a `._closed` state that will error the receiver on reuse
- drop module script section;  it's been moved to a real test
- call the "receiver" duck-type stub a new name
Add a couple more tests to check that a parent and sub-task stream can
be lagged and recovered (depending on who's slower). Factor some of the
test machinery into a new ctx mngr to make it all happen.
The `collections.deque` takes care of array length truncation of values
for us implicitly but in the future we'll likely want this value exposed
to alternate array implementations. This patch is to provide for that as
well as make `mypy` happy since the `dequeu.maxlen` can also be `None`.
@goodboy goodboy force-pushed the live_on_air_from_tokio branch from ca15098 to 1137a9e Compare September 3, 2021 01:13
@goodboy goodboy merged commit 3f1bc37 into master Sep 3, 2021
@goodboy goodboy deleted the live_on_air_from_tokio branch September 3, 2021 11:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant