Skip to content

Commit

Permalink
Clean up asyncio usage
Browse files Browse the repository at this point in the history
  • Loading branch information
brentyi committed Nov 6, 2023
1 parent be93f41 commit a146afc
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 24 deletions.
36 changes: 13 additions & 23 deletions src/viser/infra/_async_message_buffer.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
from __future__ import annotations

import asyncio
import dataclasses
import time
from asyncio.events import AbstractEventLoop
from typing import (
Any,
AsyncGenerator,
Awaitable,
Dict,
Optional,
Sequence,
Set,
Union,
cast,
)
from typing import Any, AsyncGenerator, Dict, Optional, Sequence, Set, Union, cast

from typing_extensions import Literal, TypeGuard

Expand Down Expand Up @@ -70,11 +62,11 @@ async def window_generator(
# Wait until there are new messages available.
most_recent_message_id = self.message_counter - 1
while last_sent_id >= most_recent_message_id:
next_message = self.message_event.wait()
next_message = asyncio.create_task(self.message_event.wait())
flush_wait = asyncio.create_task(self._flush_event.wait())
send_window = False
try:
flush_wait = self._flush_event.wait()
done, pending = await asyncio.wait( # type: ignore
done, pending = await asyncio.wait(
[flush_wait, next_message],
timeout=window.max_time_until_ready(),
return_when=asyncio.FIRST_COMPLETED,
Expand Down Expand Up @@ -147,28 +139,26 @@ def append_to_window(self, message: Union[Message, DoneSentinel]) -> None:

async def wait_and_append_to_window(
self,
message: Awaitable[Union[Message, DoneSentinel]],
message_task: asyncio.Task[Union[Message, DoneSentinel]],
flush_event: asyncio.Event,
) -> bool:
"""Async version of `append_to_window()`. Returns `True` if successful, `False`
if timed out."""
if len(self._window) == 0:
self.append_to_window(await message)
self.append_to_window(await message_task)
return True

message = asyncio.shield(message)
flush_wait = asyncio.shield(flush_event.wait())
(done, pending) = await asyncio.wait( # type: ignore
[message, flush_wait],
flush_wait = asyncio.create_task(flush_event.wait())
done, pending = await asyncio.wait(
[message_task, flush_wait],
timeout=self.max_time_until_ready(),
return_when=asyncio.FIRST_COMPLETED,
)
del pending
if flush_wait in done:
flush_event.clear()
flush_wait.cancel()
if message in cast(Set[Any], done): # Cast to prevent type narrowing.
self.append_to_window(await message)
if message_task in cast(Set[Any], done): # Cast to prevent type narrowing.
self.append_to_window(await message_task)
return True
return False

Expand Down
4 changes: 3 additions & 1 deletion src/viser/infra/_infra.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,9 @@ def handle_incoming(message: Message) -> None:
# queue get() tasks, which suppresses a "Task was destroyed but it is
# pending" error.
await client_state.message_buffer.put(DONE_SENTINEL)
self._flush_event_from_client_id.pop(client_id)

# Trigger then delete the flush event.
self._flush_event_from_client_id.pop(client_id).set()

# Disconnection callbacks.
for cb in self._client_disconnect_cb:
Expand Down

0 comments on commit a146afc

Please sign in to comment.