Skip to content

Commit

Permalink
feat: yields init kwarg
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler authored Feb 13, 2024
1 parent d81bc6c commit f28bfd5
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions a_sync/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from a_sync._typing import *
from a_sync import exceptions
from a_sync.iter import ASyncIterable
from a_sync.utils.as_completed import as_completed
from a_sync.utils.gather import gather
from a_sync.utils.iterators import as_yielded, exhaust_iterator
Expand All @@ -20,11 +21,12 @@ def create_task(coro: Awaitable[T], *, name: Optional[str] = None, skip_gc_until
__persist(task)
return task

class TaskMapping(DefaultDict[K, "asyncio.Task[V]"]):
def __init__(self, coro_fn: Callable[Concatenate[K, P], Awaitable[V]] = None, *iterables: AnyIterable[K], name: str = '', **coro_fn_kwargs: P.kwargs) -> None:
class TaskMapping(ASyncIterable[K, V], DefaultDict[K, "asyncio.Task[V]"]):
def __init__(self, coro_fn: Callable[Concatenate[K, P], Awaitable[V]] = None, *iterables: AnyIterable[K], name: str = '', yields: Literal['keys', 'both'] = 'both', **coro_fn_kwargs: P.kwargs) -> None:
self._coro_fn = coro_fn
self._coro_fn_kwargs = coro_fn_kwargs
self._name = name
self._yields = yields
if iterables:
self._loader = create_task(exhaust_iterator(self._tasks_for_iterables(*iterables)))
else:
Expand All @@ -48,15 +50,17 @@ async def __aiter__(self) -> AsyncIterator[Tuple[K, V]]:
"""aiterate thru all key-task pairs, yielding the key-result pair as each task completes"""
if self._loader:
await self._loader
async for k, v in as_completed(self, aiter=True):
yield k, v
async for key, value in as_completed(self, aiter=True):
yield _yield(key, value, self._yields)
async def map(self, *iterables: AnyIterable[K], pop: bool = True, yields: Literal['keys', 'both'] = 'both') -> AsyncIterator[Tuple[K, V]]:
if self:
raise exceptions.MappingNotEmptyError
async for _ in self._tasks_for_iterables(*iterables):
async for key, value in self.yield_completed(pop=pop):
yield _yield(key, value, yields)
async for key, value in as_completed(self, aiter=True):
if pop:
self.pop(key)
yield _yield(key, value, yields)
async def yield_completed(self, pop: bool = True) -> AsyncIterator[Tuple[K, V]]:
for k, task in dict(self).items():
Expand Down

0 comments on commit f28bfd5

Please sign in to comment.