From 12d1300a4d6a74e34a9a3841543a1c0b9335e184 Mon Sep 17 00:00:00 2001 From: BobTheBuidler Date: Tue, 23 Apr 2024 16:29:32 +0000 Subject: [PATCH] feat: create_task log_destroyed_pending kwarg --- a_sync/_task.py | 11 ++++++++++- a_sync/primitives/queue.py | 16 ++++++++-------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/a_sync/_task.py b/a_sync/_task.py index 2abeb887..f45e7c12 100644 --- a/a_sync/_task.py +++ b/a_sync/_task.py @@ -8,7 +8,13 @@ logger = logging.getLogger(__name__) -def create_task(coro: Awaitable[T], *, name: Optional[str] = None, skip_gc_until_done: bool = False) -> "asyncio.Task[T]": +def create_task( + coro: Awaitable[T], + *, + name: Optional[str] = None, + skip_gc_until_done: bool = False, + log_destroyed_pending: bool = True, +) -> "asyncio.Task[T]": """ Extends asyncio.create_task to support any Awaitable, manage task lifecycle, and enhance error handling. @@ -20,6 +26,7 @@ def create_task(coro: Awaitable[T], *, name: Optional[str] = None, skip_gc_until coro: An Awaitable object from which to create the task. name: Optional name for the task, aiding in debugging. skip_gc_until_done: If True, the task is kept alive until it completes, preventing garbage collection. + log_destroyed_pending: If False, asyncio's default error log when a pending task is destroyed is suppressed. Returns: An asyncio.Task object created from the provided Awaitable. @@ -30,6 +37,8 @@ def create_task(coro: Awaitable[T], *, name: Optional[str] = None, skip_gc_until task = asyncio.create_task(coro, name=name) if skip_gc_until_done: __persist(asyncio.create_task(__persisted_task_exc_wrap(task))) + if not log_destroyed_pending: + task._log_destroyed_pending = False return task diff --git a/a_sync/primitives/queue.py b/a_sync/primitives/queue.py index 87f1f36c..ecfef5be 100644 --- a/a_sync/primitives/queue.py +++ b/a_sync/primitives/queue.py @@ -142,8 +142,14 @@ def _ensure_workers(self) -> None: @functools.cached_property def _workers(self) -> "asyncio.Task[NoReturn]": logger.debug("starting worker task for %s", self) - workers = [_start_task(self._worker_coro(), name=f"{self.name} [Task-{i}]") for i in range(self.num_workers)] - task = _start_task(asyncio.gather(*workers), name=f"{self.name} worker main Task") + workers = [ + create_task( + coro=self._worker_coro(), + name=f"{self.name} [Task-{i}]", + log_destroyed_pending=False, + ) for i in range(self.num_workers) + ] + task = create_task(asyncio.gather(*workers), name=f"{self.name} worker main Task", log_destroyed_pending=False) task._workers = workers return task async def _worker_coro(self) -> NoReturn: @@ -187,12 +193,6 @@ def __stop_workers(self) -> None: worker.cancel() -def _start_task(coro: Coroutine, name: str) -> "asyncio.Task[NoReturn]": - task = create_task(coro, name=name) - # when this queue is garbage collected, these tasks should not log - task._log_destroy_pending = False - return task - def _validate_args(i: int, can_return_less: bool) -> None: if not isinstance(i, int): raise TypeError(f"`i` must be an integer greater than 1. You passed {i}")