Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: accidental commit #410

Merged
merged 2 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion a_sync/asyncio/as_completed.pyi
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""
This module extends Python's :func:`asyncio.as_completed` with additional functionality.
"""

from a_sync._typing import *

__all__ = ['as_completed']
__all__ = ["as_completed"]

class tqdm_asyncio:
@staticmethod
Expand Down
33 changes: 27 additions & 6 deletions a_sync/asyncio/gather.pyi
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
from a_sync._typing import *
from typing import Any, Awaitable, Dict, List, Mapping, overload

__all__ = ['gather', 'gather_mapping']
__all__ = ["gather", "gather_mapping"]

class tqdm_asyncio:
@staticmethod
async def gather(*args, **kwargs) -> None: ...

Excluder = Callable[[T], bool]

@overload
async def gather(awaitables: Mapping[K, Awaitable[V]], return_exceptions: bool = False, exclude_if: Optional[Excluder[V]] = None, tqdm: bool = False, **tqdm_kwargs: Any) -> Dict[K, V]:
async def gather(
awaitables: Mapping[K, Awaitable[V]],
return_exceptions: bool = False,
exclude_if: Optional[Excluder[V]] = None,
tqdm: bool = False,
**tqdm_kwargs: Any
) -> Dict[K, V]:
"""
Concurrently awaits a k:v mapping of awaitables and returns the results.

Expand All @@ -31,8 +38,15 @@ async def gather(awaitables: Mapping[K, Awaitable[V]], return_exceptions: bool =
See Also:
:func:`asyncio.gather`
"""

@overload
async def gather(*awaitables: Awaitable[T], return_exceptions: bool = False, exclude_if: Optional[Excluder[T]] = None, tqdm: bool = False, **tqdm_kwargs: Any) -> List[T]:
async def gather(
*awaitables: Awaitable[T],
return_exceptions: bool = False,
exclude_if: Optional[Excluder[T]] = None,
tqdm: bool = False,
**tqdm_kwargs: Any
) -> List[T]:
"""
Concurrently awaits a series of awaitable objects and returns the results.

Expand All @@ -53,8 +67,15 @@ async def gather(*awaitables: Awaitable[T], return_exceptions: bool = False, exc
See Also:
:func:`asyncio.gather`
"""
async def gather_mapping(mapping: Mapping[K, Awaitable[V]], return_exceptions: bool = False, exclude_if: Optional[Excluder[V]] = None, tqdm: bool = False, **tqdm_kwargs: Any) -> Dict[K, V]:
'''

async def gather_mapping(
mapping: Mapping[K, Awaitable[V]],
return_exceptions: bool = False,
exclude_if: Optional[Excluder[V]] = None,
tqdm: bool = False,
**tqdm_kwargs: Any
) -> Dict[K, V]:
"""
Concurrently awaits a mapping of awaitable objects and returns a dictionary of results.

This function is designed to await a mapping of awaitable objects, where each key-value pair represents a unique awaitable. It enables concurrent execution and gathers results into a dictionary.
Expand All @@ -76,4 +97,4 @@ async def gather_mapping(mapping: Mapping[K, Awaitable[V]], return_exceptions: b

See Also:
:func:`asyncio.gather`
'''
"""
57 changes: 45 additions & 12 deletions a_sync/task.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ from a_sync.iter import ASyncIterator
from collections.abc import Generator
from typing import Any

__all__ = ['TaskMapping', 'TaskMappingKeys', 'TaskMappingValues', 'TaskMappingItems']
__all__ = ["TaskMapping", "TaskMappingKeys", "TaskMappingValues", "TaskMappingItems"]

class TaskMapping(DefaultDict[K, 'asyncio.Task[V]'], AsyncIterable[Tuple[K, V]]):
'''
class TaskMapping(DefaultDict[K, "asyncio.Task[V]"], AsyncIterable[Tuple[K, V]]):
"""
A mapping of keys to asynchronous tasks with additional functionality.

`TaskMapping` is a specialized dictionary that maps keys to `asyncio` Tasks. It provides
Expand Down Expand Up @@ -38,11 +38,19 @@ class TaskMapping(DefaultDict[K, 'asyncio.Task[V]'], AsyncIterable[Tuple[K, V]])
- :class:`asyncio.Task`
- :func:`asyncio.create_task`
- :func:`a_sync.asyncio.create_task`
'''
"""

concurrency: Optional[int]
__iterables__: Tuple[AnyIterableOrAwaitableIterable[K], ...]
__wrapped__: Incomplete
def __init__(self, wrapped_func: MappingFn[K, P, V] = None, *iterables: AnyIterableOrAwaitableIterable[K], name: str = '', concurrency: Optional[int] = None, **wrapped_func_kwargs: P.kwargs) -> None:
def __init__(
self,
wrapped_func: MappingFn[K, P, V] = None,
*iterables: AnyIterableOrAwaitableIterable[K],
name: str = "",
concurrency: Optional[int] = None,
**wrapped_func_kwargs: P.kwargs
) -> None:
"""
Initialize a TaskMapping instance.

Expand All @@ -60,20 +68,28 @@ class TaskMapping(DefaultDict[K, 'asyncio.Task[V]'], AsyncIterable[Tuple[K, V]])

task_map = TaskMapping(process_item, [1, 2, 3], concurrency=2)
"""

def __hash__(self) -> int: ...
def __setitem__(self, item: Any, value: Any) -> None: ...
def __getitem__(self, item: K) -> asyncio.Task[V]: ...
def __await__(self) -> Generator[Any, None, Dict[K, V]]:
"""Wait for all tasks to complete and return a dictionary of the results."""

async def __aiter__(self, pop: bool = False) -> AsyncIterator[Tuple[K, V]]:
"""Asynchronously iterate through all key-task pairs, yielding the key-result pair as each task completes."""

def __delitem__(self, item: K) -> None: ...
def keys(self, pop: bool = False) -> TaskMappingKeys[K, V]: ...
def values(self, pop: bool = False) -> TaskMappingValues[K, V]: ...
def items(self, pop: bool = False) -> TaskMappingValues[K, V]: ...
async def close(self) -> None: ...
async def map(self, *iterables: AnyIterableOrAwaitableIterable[K], pop: bool = True, yields: Literal['keys', 'both'] = 'both') -> AsyncIterator[Tuple[K, V]]:
'''
async def map(
self,
*iterables: AnyIterableOrAwaitableIterable[K],
pop: bool = True,
yields: Literal["keys", "both"] = "both"
) -> AsyncIterator[Tuple[K, V]]:
"""
Asynchronously map iterables to tasks and yield their results.

Args:
Expand All @@ -93,16 +109,19 @@ class TaskMapping(DefaultDict[K, 'asyncio.Task[V]'], AsyncIterable[Tuple[K, V]])
task_map = TaskMapping(process_item)
async for key, result in task_map.map([1, 2, 3]):
print(f"Processed {key}: {result}")
'''
"""

async def all(self, pop: bool = True) -> bool: ...
async def any(self, pop: bool = True) -> bool: ...
async def max(self, pop: bool = True) -> V: ...
async def min(self, pop: bool = True) -> V:
"""Return the minimum result from the tasks in the mapping."""

async def sum(self, pop: bool = False) -> V:
"""Return the sum of the results from the tasks in the mapping."""

async def yield_completed(self, pop: bool = True) -> AsyncIterator[Tuple[K, V]]:
'''
"""
Asynchronously yield tuples of key-value pairs representing the results of any completed tasks.

Args:
Expand All @@ -119,9 +138,17 @@ class TaskMapping(DefaultDict[K, 'asyncio.Task[V]'], AsyncIterable[Tuple[K, V]])
task_map = TaskMapping(process_item, [1, 2, 3])
async for key, result in task_map.yield_completed():
print(f"Completed {key}: {result}")
'''
async def gather(self, return_exceptions: bool = False, exclude_if: Excluder[V] = None, tqdm: bool = False, **tqdm_kwargs: Any) -> Dict[K, V]:
"""

async def gather(
self,
return_exceptions: bool = False,
exclude_if: Excluder[V] = None,
tqdm: bool = False,
**tqdm_kwargs: Any
) -> Dict[K, V]:
"""Wait for all tasks to complete and return a dictionary of the results."""

def clear(self, cancel: bool = False) -> None:
"""# TODO write docs for this"""

Expand All @@ -132,9 +159,12 @@ class _TaskMappingView(ASyncGenericBase, Iterable[T], Generic[T, K, V]):
"""
Base class for TaskMapping views that provides common functionality.
"""

__view__: Incomplete
__mapping__: Incomplete
def __init__(self, view: Iterable[T], task_mapping: TaskMapping[K, V], pop: bool = False) -> None: ...
def __init__(
self, view: Iterable[T], task_mapping: TaskMapping[K, V], pop: bool = False
) -> None: ...
def __iter__(self) -> Iterator[T]: ...
def __await__(self) -> Generator[Any, None, List[T]]: ...
def __len__(self) -> int: ...
Expand All @@ -145,16 +175,19 @@ class TaskMappingKeys(_TaskMappingView[K, K, V], Generic[K, V]):
"""
Asynchronous view to iterate over the keys of a TaskMapping.
"""

async def __aiter__(self) -> AsyncIterator[K]: ...

class TaskMappingItems(_TaskMappingView[Tuple[K, V], K, V], Generic[K, V]):
"""
Asynchronous view to iterate over the items (key-value pairs) of a TaskMapping.
"""

async def __aiter__(self) -> AsyncIterator[Tuple[K, V]]: ...

class TaskMappingValues(_TaskMappingView[V, K, V], Generic[K, V]):
"""
Asynchronous view to iterate over the values of a TaskMapping.
"""

async def __aiter__(self) -> AsyncIterator[V]: ...
2 changes: 1 addition & 1 deletion a_sync/task.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ _get_key: Callable[[Tuple[K, V]], K] = lambda k_and_v: k_and_v[0]
_get_value: Callable[[Tuple[K, V]], V] = lambda k_and_v: k_and_v[1]


cdef class _TaskMappingView(Iterable[T], Generic[T, K, V]):
class _TaskMappingView(Iterable[T], Generic[T, K, V]):
"""
Base class for TaskMapping views that provides common functionality.
"""
Expand Down