From 49d9febbbebd3a746245b3f6241d699fcdf6cb90 Mon Sep 17 00:00:00 2001 From: Brent Yi Date: Mon, 6 Nov 2023 15:40:21 -0800 Subject: [PATCH] Clean up asyncio usage --- src/viser/infra/_async_message_buffer.py | 12 +++++------- src/viser/infra/_infra.py | 4 +++- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/viser/infra/_async_message_buffer.py b/src/viser/infra/_async_message_buffer.py index 9bd2c2cd2..ca61345d8 100644 --- a/src/viser/infra/_async_message_buffer.py +++ b/src/viser/infra/_async_message_buffer.py @@ -70,11 +70,11 @@ async def window_generator( # Wait until there are new messages available. most_recent_message_id = self.message_counter - 1 while last_sent_id >= most_recent_message_id: - next_message = self.message_event.wait() + next_message = asyncio.create_task(self.message_event.wait()) + flush_wait = asyncio.create_task(self._flush_event.wait()) send_window = False try: - flush_wait = self._flush_event.wait() - done, pending = await asyncio.wait( # type: ignore + done, pending = await asyncio.wait( [flush_wait, next_message], timeout=window.max_time_until_ready(), return_when=asyncio.FIRST_COMPLETED, @@ -156,9 +156,8 @@ async def wait_and_append_to_window( self.append_to_window(await message) return True - message = asyncio.shield(message) - flush_wait = asyncio.shield(flush_event.wait()) - (done, pending) = await asyncio.wait( # type: ignore + flush_wait = asyncio.create_task(flush_event.wait()) + done, pending = await asyncio.wait( [message, flush_wait], timeout=self.max_time_until_ready(), return_when=asyncio.FIRST_COMPLETED, @@ -166,7 +165,6 @@ async def wait_and_append_to_window( del pending if flush_wait in done: flush_event.clear() - flush_wait.cancel() if message in cast(Set[Any], done): # Cast to prevent type narrowing. self.append_to_window(await message) return True diff --git a/src/viser/infra/_infra.py b/src/viser/infra/_infra.py index 021d5182a..a6eba1cb2 100644 --- a/src/viser/infra/_infra.py +++ b/src/viser/infra/_infra.py @@ -286,7 +286,9 @@ def handle_incoming(message: Message) -> None: # queue get() tasks, which suppresses a "Task was destroyed but it is # pending" error. await client_state.message_buffer.put(DONE_SENTINEL) - self._flush_event_from_client_id.pop(client_id) + + # Trigger then delete the flush event. + self._flush_event_from_client_id.pop(client_id).set() # Disconnection callbacks. for cb in self._client_disconnect_cb: