Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Extend gather and as_completed #82

Merged
merged 5 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions a_sync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from a_sync.primitives import *
from a_sync.singleton import ASyncGenericSingleton
from a_sync.utils import all, any, as_yielded
from a_sync.utils.as_completed import as_completed
from a_sync.utils.gather import gather

# I alias the aliases for your convenience.
# I prefer "aka" but its meaning is not intuitive when reading code so I created both aliases for you to choose from.
Expand All @@ -16,3 +18,16 @@

# alias for backward-compatability, will be removed eventually, probably in 0.1.0
ASyncBase = ASyncGenericBase


__all__ = [
"all",
"any",
"as_completed",
"as_yielded",
"exhaust_iterator",
"exhaust_iterators",
"gather",
"ASyncIterable",
"ASyncIterator",
]
23 changes: 3 additions & 20 deletions a_sync/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import asyncio
from typing import Awaitable, Dict, Iterator, Mapping, Tuple, TypeVar, List, Callable
from typing_extensions import ParamSpec

from a_sync.utils.iterators import (as_yielded, exhaust_iterator,
exhaust_iterators)

T = TypeVar("T")
KT = TypeVar("KT")
VT = TypeVar("VT")
P = ParamSpec("P")

__all__ = [
"all",
"any",
"as_yielded",
"exhaust_iterator",
"exhaust_iterators",
]

async def any(*awaitables) -> bool:
Expand All @@ -32,17 +29,3 @@ async def all(*awaitables) -> bool:
fut.cancel()
return False
return True

async def gather_mapping(mapping: Mapping[KT, Awaitable[VT]]) -> Dict[KT, VT]:
results = {k: None for k in mapping.keys()} # return data in same order
async for k, v in as_completed_mapping(mapping):
results[k] = v
return results

def as_completed_mapping(mapping: Mapping[KT, Awaitable[VT]]) -> Iterator["asyncio.Task[Tuple[KT, VT]]"]:
return asyncio.as_completed([__as_completed_wrap(k, v) for k, v in mapping.items()])

async def __as_completed_wrap(k: KT, v: Awaitable[VT]) -> VT:
return k, await v


51 changes: 51 additions & 0 deletions a_sync/utils/as_completed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

import asyncio
from typing import (Any, AsyncIterator, Awaitable, Iterable, Iterator, Mapping,
Optional, Tuple, TypeVar, Union, overload)

from tqdm.asyncio import tqdm_asyncio

from a_sync.iter import ASyncIterator

T = TypeVar('T')
KT = TypeVar('KT')
VT = TypeVar('VT')

@overload
def as_completed(fs: Mapping[KT, Awaitable[VT]], *, timeout: Optional[float], return_exceptions: bool, aiter = False, tqdm: bool, **tqdm_kwargs: Any) -> Iterator[Awaitable[Tuple[KT, VT]]]:
...
@overload
def as_completed(fs: Iterable[Awaitable[T]], *, timeout: Optional[float], return_exceptions: bool, aiter = False, tqdm: bool, **tqdm_kwargs: Any) -> Iterator[Awaitable[T]]:
...
@overload
def as_completed(fs: Mapping[KT, Awaitable[VT]], *, timeout: Optional[float], return_exceptions: bool, aiter = True, tqdm: bool, **tqdm_kwargs: Any) -> ASyncIterator[Tuple[KT, VT]]:
...
@overload
def as_completed(fs: Iterable[Awaitable[T]], *, timeout: Optional[float], return_exceptions: bool, aiter = True, tqdm: bool, **tqdm_kwargs: Any) -> ASyncIterator[T]:
...
def as_completed(fs, *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter: bool = False, tqdm: bool = False, **tqdm_kwargs: Any):
if return_exceptions:
raise NotImplementedError
return (
as_completed_mapping(fs, timeout=timeout, return_exceptions=return_exceptions, aiter=aiter, tqdm=tqdm, **tqdm_kwargs) if isinstance(fs, Mapping)
else ASyncIterator.wrap(__yield_as_completed(fs, tqdm=tqdm, **tqdm_kwargs)) if aiter
else tqdm_asyncio.as_completed(fs, timeout=timeout, **tqdm_kwargs) if tqdm
else asyncio.as_completed(fs, timeout=timeout)
)

@overload
def as_completed_mapping(mapping: Mapping[KT, Awaitable[VT]], *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter = True, tqdm: bool, **tqdm_kwargs: Any) -> ASyncIterator[Tuple[KT, VT]]:
...
@overload
def as_completed_mapping(mapping: Mapping[KT, Awaitable[VT]], *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter = False, tqdm: bool, **tqdm_kwargs: Any) -> Iterator[Awaitable[Tuple[KT, VT]]]:
...
def as_completed_mapping(mapping: Mapping[KT, Awaitable[VT]], *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter: bool = False, tqdm: bool = False, **tqdm_kwargs: Any) -> Union[Iterator[Awaitable[Tuple[KT, VT]]], ASyncIterator[Tuple[KT, VT]]]:
return as_completed([__mapping_wrap(k, v) for k, v in mapping.items()], timeout=timeout, return_exceptions=return_exceptions, aiter=aiter, tqdm=tqdm, **tqdm_kwargs)

async def __yield_as_completed(futs: Iterable[Awaitable[T]], *, timeout: Optional[float] = None, return_exceptions: bool = False, tqdm: bool = False, **tqdm_kwargs: Any) -> AsyncIterator[T]:
futs = tqdm_asyncio.as_completed(futs, timeout=timeout, **tqdm_kwargs) if tqdm else asyncio.as_completed(futs, timeout=timeout)
for fut in futs:
yield await fut

async def __mapping_wrap(k: KT, v: Awaitable[VT]) -> VT:
return k, await v
33 changes: 33 additions & 0 deletions a_sync/utils/gather.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@

import asyncio
from typing import (Any, Awaitable, Dict, List, Mapping, TypeVar, Union,
overload)

from tqdm.asyncio import tqdm_asyncio

from a_sync.utils.as_completed import as_completed_mapping

T = TypeVar('T')
KT = TypeVar('KT')
VT = TypeVar('VT')

@overload
async def gather(*awaitables: Mapping[KT, Awaitable[VT]], return_exceptions: bool = False, tqdm: bool = False, **tqdm_kwargs: Any) -> Dict[KT, VT]:
...
@overload
async def gather(*awaitables: Awaitable[T], return_exceptions: bool = False, tqdm: bool = False, **tqdm_kwargs: Any) -> List[T]:
...
async def gather(*awaitables: Union[Awaitable[T], Mapping[KT, Awaitable[VT]]], return_exceptions: bool = False, tqdm: bool = False, **tqdm_kwargs: Any) -> Union[List[T], Dict[KT, VT]]:
return await (
gather_mapping(awaitables[0], return_exceptions=return_exceptions, tqdm=tqdm, **tqdm_kwargs) if _is_mapping(awaitables)
else tqdm_asyncio.gather(*awaitables, return_exceptions=return_exceptions, **tqdm_kwargs) if tqdm
else asyncio.gather(*awaitables, return_exceptions=return_exceptions)
)

async def gather_mapping(mapping: Mapping[KT, Awaitable[VT]], return_exceptions: bool = False, tqdm: bool = False, **tqdm_kwargs: Any) -> Dict[KT, VT]:
results = {k: None for k in mapping.keys()} # return data in same order
async for k, v in as_completed_mapping(mapping, return_exceptions=return_exceptions, aiter=True, tqdm=tqdm, **tqdm_kwargs):
results[k] = v
return results

_is_mapping = lambda awaitables: len(awaitables) == 1 and isinstance(awaitables[0], Mapping)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
aiolimiter==1.0.0
async_lru_threadsafe==2.0.4
async_property==0.2.1
tqdm
typed_envs>=0.0.2
typing_extensions>=4.1.0 # typing_extensions.Unpack was introduced in 4.1.0