Skip to content

Commit

Permalink
feat: create_task log_destroyed_pending kwarg
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler committed Apr 23, 2024
1 parent b1db682 commit 12d1300
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
11 changes: 10 additions & 1 deletion a_sync/_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@

logger = logging.getLogger(__name__)

def create_task(coro: Awaitable[T], *, name: Optional[str] = None, skip_gc_until_done: bool = False) -> "asyncio.Task[T]":
def create_task(
coro: Awaitable[T],
*,
name: Optional[str] = None,
skip_gc_until_done: bool = False,
log_destroyed_pending: bool = True,
) -> "asyncio.Task[T]":
"""
Extends asyncio.create_task to support any Awaitable, manage task lifecycle, and enhance error handling.
Expand All @@ -20,6 +26,7 @@ def create_task(coro: Awaitable[T], *, name: Optional[str] = None, skip_gc_until
coro: An Awaitable object from which to create the task.
name: Optional name for the task, aiding in debugging.
skip_gc_until_done: If True, the task is kept alive until it completes, preventing garbage collection.
log_destroyed_pending: If False, asyncio's default error log when a pending task is destroyed is suppressed.
Returns:
An asyncio.Task object created from the provided Awaitable.
Expand All @@ -30,6 +37,8 @@ def create_task(coro: Awaitable[T], *, name: Optional[str] = None, skip_gc_until
task = asyncio.create_task(coro, name=name)
if skip_gc_until_done:
__persist(asyncio.create_task(__persisted_task_exc_wrap(task)))
if not log_destroyed_pending:
task._log_destroyed_pending = False
return task


Expand Down
16 changes: 8 additions & 8 deletions a_sync/primitives/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,14 @@ def _ensure_workers(self) -> None:
@functools.cached_property
def _workers(self) -> "asyncio.Task[NoReturn]":
logger.debug("starting worker task for %s", self)
workers = [_start_task(self._worker_coro(), name=f"{self.name} [Task-{i}]") for i in range(self.num_workers)]
task = _start_task(asyncio.gather(*workers), name=f"{self.name} worker main Task")
workers = [
create_task(
coro=self._worker_coro(),
name=f"{self.name} [Task-{i}]",
log_destroyed_pending=False,
) for i in range(self.num_workers)
]
task = create_task(asyncio.gather(*workers), name=f"{self.name} worker main Task", log_destroyed_pending=False)
task._workers = workers
return task
async def _worker_coro(self) -> NoReturn:
Expand Down Expand Up @@ -187,12 +193,6 @@ def __stop_workers(self) -> None:
worker.cancel()


def _start_task(coro: Coroutine, name: str) -> "asyncio.Task[NoReturn]":
task = create_task(coro, name=name)
# when this queue is garbage collected, these tasks should not log
task._log_destroy_pending = False
return task

def _validate_args(i: int, can_return_less: bool) -> None:
if not isinstance(i, int):
raise TypeError(f"`i` must be an integer greater than 1. You passed {i}")
Expand Down

0 comments on commit 12d1300

Please sign in to comment.