diff --git a/README.md b/README.md index 0b4c9346..3e27c3c1 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,9 @@ `ez-a-sync` is a Python library that enables developers to write both synchronous and asynchronous code without having to write redundant code. It provides a decorator `@a_sync()`, as well as a base class `ASyncGenericBase` which can be used to create classes that can be executed in both synchronous and asynchronous contexts. +It also contains implementations of various asyncio primitives with extra functionality, including queues and various types of locks. +\# TODO add links to various objects' docs + ## Installation `ez-a-sync` can be installed via pip: diff --git a/a_sync/__init__.py b/a_sync/__init__.py index 95420b78..5a9fde7e 100644 --- a/a_sync/__init__.py +++ b/a_sync/__init__.py @@ -1,3 +1,26 @@ +""" +This module initializes the a_sync library by importing and organizing various components, utilities, and classes. +It provides a convenient and unified interface for asynchronous programming with a focus on flexibility and efficiency. + +The `a_sync` library offers decorators and base classes to facilitate writing both synchronous and asynchronous code. +It includes the `@a_sync()` decorator and the `ASyncGenericBase` class, which allow for creating functions and classes +that can operate in both synchronous and asynchronous contexts. Additionally, it provides enhanced asyncio primitives, +such as queues and locks, with extra functionality. + +Modules and components included: +- `aliases`, `exceptions`, `iter`, `task`: Core modules of the library. +- `ASyncGenericBase`, `ASyncGenericSingleton`, `a_sync`: Base classes and decorators for dual-context execution. +- `apply_semaphore`: Function to apply semaphores to coroutines. +- `ASyncCachedPropertyDescriptor`, `ASyncPropertyDescriptor`, `cached_property`, `property`: Property descriptors for async properties. +- `as_completed`, `create_task`, `gather`: Enhanced asyncio functions. +- Executors: `AsyncThreadPoolExecutor`, `AsyncProcessPoolExecutor`, `PruningThreadPoolExecutor` for async execution. +- Iterators: `ASyncFilter`, `ASyncSorter`, `ASyncIterable`, `ASyncIterator` for async iteration. +- Utilities: `all`, `any`, `as_yielded` for async utilities. + +Alias for backward compatibility: +- `ASyncBase` is an alias for `ASyncGenericBase`, which will be removed eventually, probably in version 0.1.0. +""" + from a_sync import aliases, exceptions, iter, task from a_sync.a_sync import ASyncGenericBase, ASyncGenericSingleton, a_sync from a_sync.a_sync.modifiers.semaphores import apply_semaphore @@ -76,4 +99,4 @@ # executor aliases "ThreadPoolExecutor", "ProcessPoolExecutor", -] +] \ No newline at end of file diff --git a/a_sync/_smart.py b/a_sync/_smart.py index 4c21ffc8..5bb67b78 100644 --- a/a_sync/_smart.py +++ b/a_sync/_smart.py @@ -1,3 +1,9 @@ +""" +This module defines smart future and task utilities for the a_sync library. +These utilities provide enhanced functionality for managing asynchronous tasks and futures, +including task shielding and a custom task factory for creating SmartTask instances. +""" + import asyncio import logging import warnings @@ -18,11 +24,20 @@ class _SmartFutureMixin(Generic[T]): + """ + Mixin class that provides common functionality for smart futures and tasks. + + This mixin provides methods for managing waiters and integrating with a smart processing queue. + """ + _queue: Optional["SmartProcessingQueue[Any, Any, T]"] = None _key: _Key _waiters: "weakref.WeakSet[SmartTask[T]]" def __await__(self: Union["SmartFuture", "SmartTask"]) -> Generator[Any, None, T]: + """ + Await the smart future or task, handling waiters and logging. + """ if self.done(): return self.result() # May raise too. self._asyncio_future_blocking = True @@ -37,6 +52,9 @@ def __await__(self: Union["SmartFuture", "SmartTask"]) -> Generator[Any, None, T @property def num_waiters(self: Union["SmartFuture", "SmartTask"]) -> int: # NOTE: we check .done() because the callback may not have ran yet and its very lightweight + """ + Get the number of waiters currently awaiting the future or task. + """ if self.done(): # if there are any waiters left, there won't be once the event loop runs once return 0 @@ -45,17 +63,31 @@ def num_waiters(self: Union["SmartFuture", "SmartTask"]) -> int: def _waiter_done_cleanup_callback( self: Union["SmartFuture", "SmartTask"], waiter: "SmartTask" ) -> None: - "Removes the waiter from _waiters, and _queue._futs if applicable" + """ + Callback to clean up waiters when a waiter task is done. + + Removes the waiter from _waiters, and _queue._futs if applicable + """ if not self.done(): self._waiters.remove(waiter) def _self_done_cleanup_callback(self: Union["SmartFuture", "SmartTask"]) -> None: + """ + Callback to clean up waiters and remove the future from the queue when done. + """ self._waiters.clear() if queue := self._queue: queue._futs.pop(self._key) class SmartFuture(_SmartFutureMixin[T], asyncio.Future): + """ + A smart future that tracks waiters and integrates with a smart processing queue. + + Inherits from both _SmartFutureMixin and asyncio.Future, providing additional functionality + for tracking waiters and integrating with a smart processing queue. + """ + _queue = None _key = None @@ -66,6 +98,14 @@ def __init__( key: Optional[_Key] = None, loop: Optional[asyncio.AbstractEventLoop] = None, ) -> None: + """ + Initialize the SmartFuture with an optional queue and key. + + Args: + queue: Optional; a smart processing queue. + key: Optional; a key identifying the future. + loop: Optional; the event loop. + """ super().__init__(loop=loop) if queue: self._queue = weakref.proxy(queue) @@ -78,11 +118,16 @@ def __repr__(self): return f"<{type(self).__name__} key={self._key} waiters={self.num_waiters} {self._state}>" def __lt__(self, other: "SmartFuture[T]") -> bool: - """heap considers lower values as higher priority so a future with more waiters will be 'less than' a future with less waiters.""" - # other = other_ref() - # if other is None: - # # garbage collected refs should always process first so they can be popped from the queue - # return False + """ + Compare the number of waiters to determine priority in a heap. + Lower values indicate higher priority, so more waiters means 'less than'. + + Args: + other: Another SmartFuture to compare with. + + Returns: + True if self has more waiters than other. + """ return self.num_waiters > other.num_waiters @@ -92,10 +137,28 @@ def create_future( key: Optional[_Key] = None, loop: Optional[asyncio.AbstractEventLoop] = None, ) -> SmartFuture[V]: + """ + Create a SmartFuture instance. + + Args: + queue: Optional; a smart processing queue. + key: Optional; a key identifying the future. + loop: Optional; the event loop. + + Returns: + A SmartFuture instance. + """ return SmartFuture(queue=queue, key=key, loop=loop or asyncio.get_event_loop()) class SmartTask(_SmartFutureMixin[T], asyncio.Task): + """ + A smart task that tracks waiters and integrates with a smart processing queue. + + Inherits from both _SmartFutureMixin and asyncio.Task, providing additional functionality + for tracking waiters and integrating with a smart processing queue. + """ + def __init__( self, coro: Awaitable[T], @@ -103,6 +166,14 @@ def __init__( loop: Optional[asyncio.AbstractEventLoop] = None, name: Optional[str] = None, ) -> None: + """ + Initialize the SmartTask with a coroutine and optional event loop. + + Args: + coro: The coroutine to run in the task. + loop: Optional; the event loop. + name: Optional; the name of the task. + """ super().__init__(coro, loop=loop, name=name) self._waiters: Set["asyncio.Task[T]"] = set() self.add_done_callback(SmartTask._self_done_cleanup_callback) @@ -166,6 +237,10 @@ def shield( res = await shield(something()) except CancelledError: res = None + + Args: + arg: The awaitable to shield from cancellation. + loop: Optional; the event loop. Deprecated since Python 3.8. """ if loop is not None: warnings.warn( @@ -216,4 +291,4 @@ def _outer_done_callback(outer): "SmartTask", "smart_task_factory", "set_smart_task_factory", -] +] \ No newline at end of file diff --git a/a_sync/a_sync/__init__.py b/a_sync/a_sync/__init__.py index a2c466b3..eb184015 100644 --- a/a_sync/a_sync/__init__.py +++ b/a_sync/a_sync/__init__.py @@ -1,3 +1,16 @@ +""" +This module enables developers to write both synchronous and asynchronous code without having to write redundant code. + +The two main objects you should use are + - a decorator `@a_sync()` + - a base class `ASyncGenericBase` which can be used to create classes that can be utilized in both synchronous and asynchronous contexts. + +The rest of the objects are exposed for type checking only, you should not make use of them otherwise. +""" + +# TODO: double check on these before adding them to docs +#- two decorators @:class:`property` and @:class:`cached_property` for the creation of dual-function properties and cached properties, respectively. + from a_sync.a_sync.base import ASyncGenericBase from a_sync.a_sync.decorator import a_sync from a_sync.a_sync.function import ( @@ -27,10 +40,12 @@ # entrypoints "a_sync", "ASyncGenericBase", - # classes - "ASyncFunction", + # maybe entrypoints (?) + # TODO: double check how I intended for these to be used "property", "cached_property", + # classes exposed for type hinting only + "ASyncFunction", "ASyncPropertyDescriptor", "ASyncCachedPropertyDescriptor", "HiddenMethod", diff --git a/a_sync/a_sync/_descriptor.py b/a_sync/a_sync/_descriptor.py index afb3e95d..1d90d345 100644 --- a/a_sync/a_sync/_descriptor.py +++ b/a_sync/a_sync/_descriptor.py @@ -9,18 +9,21 @@ from a_sync._typing import * from a_sync.a_sync import decorator -from a_sync.a_sync.function import ASyncFunction, ModifiedMixin, ModifierManager +from a_sync.a_sync.function import (ASyncFunction, ModifierManager, + _ModifiedMixin) if TYPE_CHECKING: from a_sync import TaskMapping -class ASyncDescriptor(ModifiedMixin, Generic[I, P, T]): +class ASyncDescriptor(_ModifiedMixin, Generic[I, P, T]): """ - A descriptor base class for asynchronous methods and properties. + A descriptor base class for dual-function ASync methods and properties. This class provides functionality for mapping operations across multiple instances - and includes utility methods for common operations. + and includes utility methods for common operations such as checking if all or any + results are truthy, and finding the minimum, maximum, or sum of results of the method + or property mapped across multiple instances. """ __wrapped__: AnyFn[Concatenate[I, P], T] diff --git a/a_sync/a_sync/_flags.py b/a_sync/a_sync/_flags.py index db94877f..23accd57 100644 --- a/a_sync/a_sync/_flags.py +++ b/a_sync/a_sync/_flags.py @@ -2,9 +2,15 @@ This module provides functionality for handling synchronous and asynchronous flags in the ez-a-sync library. -ez-a-sync uses 'flags' to indicate whether objects / function calls will be sync or async. +ez-a-sync uses 'flags' to indicate whether objects or function calls will be synchronous or asynchronous. -You can use any of the provided flags, whichever makes most sense for your use case. +You can use any of the provided flags, whichever makes the most sense for your use case. + +AFFIRMATIVE_FLAGS: Set of flags indicating synchronous behavior. Currently includes "sync". + +NEGATIVE_FLAGS: Set of flags indicating asynchronous behavior. Currently includes "asynchronous". + +VIABLE_FLAGS: Set of all valid flags, combining both synchronous and asynchronous indicators. """ from typing import Any @@ -32,7 +38,7 @@ def negate_if_necessary(flag: str, flag_value: bool) -> bool: The potentially negated flag value. Raises: - :class:`exceptions.InvalidFlag`: If the flag is not recognized. + exceptions.InvalidFlag: If the flag is not recognized. """ validate_flag_value(flag, flag_value) if flag in AFFIRMATIVE_FLAGS: @@ -54,8 +60,8 @@ def validate_flag_value(flag: str, flag_value: Any) -> bool: The validated flag value. Raises: - :class:`exceptions.InvalidFlagValue`: If the flag value is not a boolean. + exceptions.InvalidFlagValue: If the flag value is not a boolean. """ if not isinstance(flag_value, bool): raise exceptions.InvalidFlagValue(flag, flag_value) - return flag_value + return flag_value \ No newline at end of file diff --git a/a_sync/a_sync/_helpers.py b/a_sync/a_sync/_helpers.py index 8f6a2ed5..3cf5b910 100644 --- a/a_sync/a_sync/_helpers.py +++ b/a_sync/a_sync/_helpers.py @@ -19,11 +19,8 @@ def _await(awaitable: Awaitable[T]) -> T: Args: awaitable: The awaitable object to be awaited. - Returns: - The result of the awaitable. - Raises: - :class:`exceptions.SyncModeInAsyncContextError`: If the event loop is already running. + exceptions.SyncModeInAsyncContextError: If the event loop is already running. """ try: return a_sync.asyncio.get_event_loop().run_until_complete(awaitable) @@ -39,13 +36,13 @@ def _asyncify(func: SyncFn[P, T], executor: Executor) -> CoroFn[P, T]: # type: Args: func: The synchronous function to be converted. - executor: The executor to run the synchronous function. + executor: The executor used to run the synchronous function. Returns: A coroutine function wrapping the input function. Raises: - :class:`exceptions.FunctionNotSync`: If the input function is already asynchronous. + exceptions.FunctionNotSync: If the input function is a coroutine function or an instance of ASyncFunction. """ from a_sync.a_sync.function import ASyncFunction @@ -59,4 +56,4 @@ async def _asyncify_wrap(*args: P.args, **kwargs: P.kwargs) -> T: loop=a_sync.asyncio.get_event_loop(), ) - return _asyncify_wrap + return _asyncify_wrap \ No newline at end of file diff --git a/a_sync/a_sync/_meta.py b/a_sync/a_sync/_meta.py index 8ff408e8..4d115439 100644 --- a/a_sync/a_sync/_meta.py +++ b/a_sync/a_sync/_meta.py @@ -6,12 +6,10 @@ from a_sync import ENVIRONMENT_VARIABLES from a_sync.a_sync import modifiers -from a_sync.a_sync.function import ASyncFunction, ModifiedMixin +from a_sync.a_sync.function import ASyncFunction, _ModifiedMixin from a_sync.a_sync.method import ASyncMethodDescriptor -from a_sync.a_sync.property import ( - ASyncPropertyDescriptor, - ASyncCachedPropertyDescriptor, -) +from a_sync.a_sync.property import (ASyncCachedPropertyDescriptor, + ASyncPropertyDescriptor) from a_sync.future import _ASyncFutureWrappedFn # type: ignore [attr-defined] from a_sync.iter import ASyncGeneratorFunction from a_sync.primitives.locks.semaphore import Semaphore @@ -20,8 +18,16 @@ class ASyncMeta(ABCMeta): - """Any class with metaclass ASyncMeta will have its functions wrapped with a_sync upon class instantiation.""" + """Metaclass for wrapping class attributes with asynchronous capabilities. + Any class with `ASyncMeta` as its metaclass will have its functions and properties + wrapped with asynchronous capabilities upon class instantiation. This includes + wrapping functions with `ASyncMethodDescriptor` and properties with + `ASyncPropertyDescriptor` or `ASyncCachedPropertyDescriptor`. Additionally, it handles + `_ModifiedMixin` objects (# TODO replace this with the actual subclasses of _modifiedMixin, which is just an internal use mixin class that has no meaning ot the user), + which are used when functions are decorated with a_sync decorators + to apply specific modifiers to those functions. + """ def __new__(cls, new_class_name, bases, attrs): _update_logger(new_class_name) logger.debug( @@ -36,6 +42,7 @@ def __new__(cls, new_class_name, bases, attrs): # Currently the parent value is used for functions defined on the parent, # and the subclass value is used for functions defined on the subclass. class_defined_modifiers = modifiers.get_modifiers_from(attrs) + logger.debug("found modifiers: %s", class_defined_modifiers) logger.debug( "now I inspect the class definition to figure out which attributes need to be wrapped" @@ -68,19 +75,19 @@ def __new__(cls, new_class_name, bases, attrs): ) fn_modifiers = dict(class_defined_modifiers) # Special handling for functions decorated with a_sync decorators - if isinstance(attr_value, ModifiedMixin): + if isinstance(attr_value, _ModifiedMixin): logger.debug( - "`%s.%s` is a `ModifiedMixin` object, which means you decorated it with an a_sync decorator even though `%s` is an ASyncABC class", + "`%s.%s` is a `%s` object, which means you decorated it with an a_sync decorator even though `%s` is an ASyncABC class", new_class_name, attr_name, + type(attr_value).__name__, new_class_name, ) logger.debug( "you probably did this so you could apply some modifiers to `%s` specifically", attr_name, ) - modified_modifiers = attr_value.modifiers._modifiers - if modified_modifiers: + if modified_modifiers := attr_value.modifiers._modifiers: logger.debug( "I found `%s.%s` is modified with %s", new_class_name, @@ -133,11 +140,16 @@ def __new__(cls, new_class_name, bases, attrs): class ASyncSingletonMeta(ASyncMeta): - def __init__( - cls, name: str, bases: Tuple[type, ...], namespace: Dict[str, Any] - ) -> None: + """Metaclass for creating singleton instances with asynchronous capabilities. + + This metaclass extends `ASyncMeta` to ensure that only one instance of a class + is created for each synchronous or asynchronous context. + """ + def __init__(cls, name: str, bases: Tuple[type, ...], namespace: Dict[str, Any]) -> None: cls.__instances: Dict[bool, object] = {} + """Dictionary to store singleton instances.""" cls.__lock = threading.Lock() + """Lock to ensure thread-safe instance creation.""" super().__init__(name, bases, namespace) def __call__(cls, *args: Any, **kwargs: Any): @@ -151,6 +163,11 @@ def __call__(cls, *args: Any, **kwargs: Any): def _update_logger(new_class_name: str) -> None: + """Update the logger configuration based on environment variables. + + Args: + new_class_name: The name of the new class being created. + """ if ( ENVIRONMENT_VARIABLES.DEBUG_MODE or ENVIRONMENT_VARIABLES.DEBUG_CLASS_NAME == new_class_name diff --git a/a_sync/a_sync/abstract.py b/a_sync/a_sync/abstract.py index 4c8bdaa1..30cf8187 100644 --- a/a_sync/a_sync/abstract.py +++ b/a_sync/a_sync/abstract.py @@ -1,3 +1,15 @@ +""" +This module provides an abstract base class for defining asynchronous and synchronous behavior. + +The ASyncABC class uses the ASyncMeta metaclass to automatically wrap its methods +with asynchronous or synchronous behavior based on flags. Subclasses must +implement the abstract methods to define the flag name, flag value, and +default mode for asynchronous or synchronous execution. + +Note: It is recommended to use ASyncGenericBase for most use cases. This class +is intended for more custom implementations if necessary. +""" + import abc import functools import logging @@ -10,43 +22,73 @@ logger = logging.getLogger(__name__) - class ASyncABC(metaclass=ASyncMeta): + """Abstract Base Class for defining asynchronous and synchronous behavior. + + This class uses the ASyncMeta metaclass to automatically wrap its methods + with asynchronous or synchronous behavior based on flags. Subclasses must + implement the abstract methods to define the flag name, flag value, and + default mode for asynchronous or synchronous execution. + """ ################################## # Concrete Methods (overridable) # ################################## def __a_sync_should_await__(self, kwargs: dict) -> bool: - """Returns a boolean that indicates whether methods of 'instance' should be called as sync or async methods.""" + """Determines if methods should be called asynchronously. + + This method first checks the provided keyword arguments for flags + indicating the desired execution mode. If no flags are found, it + defaults to the instance's asynchronous flag. + + Args: + kwargs: A dictionary of keyword arguments to check for flags. + """ try: - # Defer to kwargs always return self.__a_sync_should_await_from_kwargs__(kwargs) except exceptions.NoFlagsFound: - # No flag found in kwargs, check for a flag attribute. return self.__a_sync_instance_should_await__ @functools.cached_property def __a_sync_instance_should_await__(self) -> bool: - """ - A flag indicating whether the instance should default to asynchronous execution. + """Indicates if the instance should default to asynchronous execution. - You can override this if you want. - If you want to be able to hotswap instance modes, you can redefine this as a non-cached property. + This property can be overridden if dynamic behavior is needed. For + instance, to allow hot-swapping of instance modes, redefine this as a + non-cached property. """ return _flags.negate_if_necessary( self.__a_sync_flag_name__, self.__a_sync_flag_value__ ) def __a_sync_should_await_from_kwargs__(self, kwargs: dict) -> bool: - """You can override this if you want.""" + """Determines execution mode from keyword arguments. + + This method can be overridden to customize how flags are extracted + from keyword arguments. + + Args: + kwargs: A dictionary of keyword arguments to check for flags. + + Raises: + NoFlagsFound: If no valid flags are found in the keyword arguments. + """ if flag := _kwargs.get_flag_name(kwargs): return _kwargs.is_sync(flag, kwargs, pop_flag=True) # type: ignore [arg-type] raise NoFlagsFound("kwargs", kwargs.keys()) @classmethod def __a_sync_instance_will_be_sync__(cls, args: tuple, kwargs: dict) -> bool: - """You can override this if you want.""" + """Determines if a new instance will be synchronous. + + This method checks the constructor's signature against provided + keyword arguments to determine the execution mode for the new instance. + + Args: + args: A tuple of positional arguments for the instance. + kwargs: A dictionary of keyword arguments for the instance. + """ logger.debug( "checking `%s.%s.__init__` signature against provided kwargs to determine a_sync mode for the new instance", cls.__module__, @@ -72,22 +114,40 @@ def __a_sync_instance_will_be_sync__(cls, args: tuple, kwargs: dict) -> bool: @property def __a_sync_modifiers__(self: "ASyncABC") -> ModifierKwargs: - """You should not override this.""" + """Retrieves modifiers for the instance. + + This method should not be overridden. It returns the modifiers + associated with the instance, which are used to customize behavior. + """ return modifiers.get_modifiers_from(self) #################### # Abstract Methods # #################### - @abc.abstractproperty + @property + @abc.abstractmethod def __a_sync_flag_name__(self) -> str: - pass + """Abstract property for the flag name. - @abc.abstractproperty + Subclasses must implement this property to return the name of the flag + used to determine execution mode. + """ + + @property + @abc.abstractmethod def __a_sync_flag_value__(self) -> bool: - pass + """Abstract property for the flag value. + + Subclasses must implement this property to return the value of the flag + indicating the default execution mode. + """ - @abc.abstractclassmethod # type: ignore [arg-type, misc] + @classmethod + @abc.abstractmethod # type: ignore [arg-type, misc] def __a_sync_default_mode__(cls) -> bool: # type: ignore [empty-body] - # mypy doesnt recognize this abc member - pass + """Abstract class method for the default execution mode. + + Subclasses must implement this method to return the default execution + mode (synchronous or asynchronous) for instances of the class. + """ \ No newline at end of file diff --git a/a_sync/a_sync/config.py b/a_sync/a_sync/config.py index b8449057..7293491c 100644 --- a/a_sync/a_sync/config.py +++ b/a_sync/a_sync/config.py @@ -1,9 +1,23 @@ """ -Configuration module for a_sync library. +Configuration module for the a_sync library. This module provides configuration options and default settings for the a_sync library. It includes functionality for setting up executors, defining default modifiers, and handling environment variable configurations. + +Environment Variables: + A_SYNC_EXECUTOR_TYPE: Specifies the type of executor to use. Valid values are + strings that start with 'p' for ProcessPoolExecutor (e.g., 'processes') + or 't' for ThreadPoolExecutor (e.g., 'threads'). Defaults to 'threads'. + A_SYNC_EXECUTOR_VALUE: Specifies the number of workers for the executor. + Defaults to 8. + A_SYNC_DEFAULT_MODE: Sets the default mode for a_sync functions if not specified. + A_SYNC_CACHE_TYPE: Sets the default cache type. If not specified, defaults to None. + A_SYNC_CACHE_TYPED: Boolean flag to determine if cache keys should consider types. + A_SYNC_RAM_CACHE_MAXSIZE: Sets the maximum size for the RAM cache. Defaults to -1. + A_SYNC_RAM_CACHE_TTL: Sets the time-to-live for cache entries. Defaults to None. + A_SYNC_RUNS_PER_MINUTE: Sets the rate limit for function execution. + A_SYNC_SEMAPHORE: Sets the semaphore limit for function execution. """ import functools @@ -19,14 +33,14 @@ @functools.lru_cache(maxsize=1) def get_default_executor() -> Executor: - """ - Get the default executor based on the :obj:`EXECUTOR_TYPE` environment variable. + """Get the default executor based on the EXECUTOR_TYPE environment variable. Returns: - Executor: An instance of either ProcessPoolExecutor or ThreadPoolExecutor. + An instance of either ProcessPoolExecutor or ThreadPoolExecutor. Raises: - :class:`ValueError`: If an invalid EXECUTOR_TYPE is specified. + ValueError: If an invalid EXECUTOR_TYPE is specified. Valid values are + strings that start with 'p' for ProcessPoolExecutor or 't' for ThreadPoolExecutor. """ if EXECUTOR_TYPE.lower().startswith("p"): # p, P, proc, Processes, etc return ProcessPoolExecutor(EXECUTOR_VALUE) @@ -90,4 +104,4 @@ def get_default_executor() -> Executor: runs_per_minute=RUNS_PER_MINUTE, semaphore=SEMAPHORE, executor=default_sync_executor, -) +) \ No newline at end of file diff --git a/a_sync/a_sync/decorator.py b/a_sync/a_sync/decorator.py index ab5e71c6..da2ddf67 100644 --- a/a_sync/a_sync/decorator.py +++ b/a_sync/a_sync/decorator.py @@ -28,20 +28,40 @@ def a_sync( default: Literal["async"], **modifiers: Unpack[ModifierKwargs], -) -> ASyncDecoratorAsyncDefault: ... +) -> ASyncDecoratorAsyncDefault: + """ + Creates an asynchronous default decorator. + + Args: + default: Specifies the default execution mode as 'async'. + **modifiers: Additional keyword arguments to modify the behavior of the decorated function. + """ @overload def a_sync( default: Literal["sync"], **modifiers: Unpack[ModifierKwargs], -) -> ASyncDecoratorSyncDefault: ... +) -> ASyncDecoratorSyncDefault: + """ + Creates a synchronous default decorator. + + Args: + default: Specifies the default execution mode as 'sync'. + **modifiers: Additional keyword arguments to modify the behavior of the decorated function. + """ @overload def a_sync( **modifiers: Unpack[ModifierKwargs], -) -> ASyncDecorator: ... +) -> ASyncDecorator: + """ + Creates a decorator with no default execution mode specified. + + Args: + **modifiers: Additional keyword arguments to modify the behavior of the decorated function. + """ @overload # async def, None default @@ -49,7 +69,15 @@ def a_sync( coro_fn: CoroFn[P, T], default: Literal[None] = None, **modifiers: Unpack[ModifierKwargs], -) -> ASyncFunctionAsyncDefault[P, T]: ... +) -> ASyncFunctionAsyncDefault[P, T]: + """ + Decorates an asynchronous function with no default execution mode specified. + + Args: + coro_fn: The coroutine function to be decorated. + default: Specifies no default execution mode. + **modifiers: Additional keyword arguments to modify the behavior of the decorated function. + """ @overload # sync def none default @@ -57,7 +85,15 @@ def a_sync( coro_fn: SyncFn[P, T], default: Literal[None] = None, **modifiers: Unpack[ModifierKwargs], -) -> ASyncFunctionSyncDefault[P, T]: ... +) -> ASyncFunctionSyncDefault[P, T]: + """ + Decorates a synchronous function with no default execution mode specified. + + Args: + coro_fn: The synchronous function to be decorated. + default: Specifies no default execution mode. + **modifiers: Additional keyword arguments to modify the behavior of the decorated function. + """ # @a_sync(default='async') @@ -76,7 +112,15 @@ def a_sync( coro_fn: Literal[None], default: Literal["async"], **modifiers: Unpack[ModifierKwargs], -) -> ASyncDecoratorAsyncDefault: ... +) -> ASyncDecoratorAsyncDefault: + """ + Creates an asynchronous default decorator with no function specified. + + Args: + coro_fn: Specifies no function. + default: Specifies the default execution mode as 'async'. + **modifiers: Additional keyword arguments to modify the behavior of the decorated function. + """ @overload # if you try to use default as the only arg @@ -84,7 +128,15 @@ def a_sync( coro_fn: Literal["async"], default: Literal[None], **modifiers: Unpack[ModifierKwargs], -) -> ASyncDecoratorAsyncDefault: ... +) -> ASyncDecoratorAsyncDefault: + """ + Creates an asynchronous default decorator with no default execution mode specified. + + Args: + coro_fn: Specifies the default execution mode as 'async'. + default: Specifies no default execution mode. + **modifiers: Additional keyword arguments to modify the behavior of the decorated function. + """ # a_sync(some_fn, default='async') @@ -95,7 +147,15 @@ def a_sync( coro_fn: CoroFn[P, T], default: Literal["async"], **modifiers: Unpack[ModifierKwargs], -) -> ASyncFunctionAsyncDefault[P, T]: ... +) -> ASyncFunctionAsyncDefault[P, T]: + """ + Decorates an asynchronous function with an asynchronous default execution mode. + + Args: + coro_fn: The coroutine function to be decorated. + default: Specifies the default execution mode as 'async'. + **modifiers: Additional keyword arguments to modify the behavior of the decorated function. + """ @overload # sync def async default @@ -103,7 +163,15 @@ def a_sync( coro_fn: SyncFn[P, T], default: Literal["async"], **modifiers: Unpack[ModifierKwargs], -) -> ASyncFunctionAsyncDefault[P, T]: ... +) -> ASyncFunctionAsyncDefault[P, T]: + """ + Decorates a synchronous function with an asynchronous default execution mode. + + Args: + coro_fn: The synchronous function to be decorated. + default: Specifies the default execution mode as 'async'. + **modifiers: Additional keyword arguments to modify the behavior of the decorated function. + """ # a_sync(some_fn, default='sync') @@ -114,7 +182,15 @@ def a_sync( coro_fn: CoroFn[P, T], default: Literal["sync"], **modifiers: Unpack[ModifierKwargs], -) -> ASyncFunctionSyncDefault: ... +) -> ASyncFunctionSyncDefault: + """ + Decorates an asynchronous function with a synchronous default execution mode. + + Args: + coro_fn: The coroutine function to be decorated. + default: Specifies the default execution mode as 'sync'. + **modifiers: Additional keyword arguments to modify the behavior of the decorated function. + """ @overload # sync def sync default @@ -122,7 +198,15 @@ def a_sync( coro_fn: SyncFn[P, T], default: Literal["sync"], **modifiers: Unpack[ModifierKwargs], -) -> ASyncFunctionSyncDefault: ... +) -> ASyncFunctionSyncDefault: + """ + Decorates a synchronous function with a synchronous default execution mode. + + Args: + coro_fn: The synchronous function to be decorated. + default: Specifies the default execution mode as 'sync'. + **modifiers: Additional keyword arguments to modify the behavior of the decorated function. + """ # @a_sync(default='sync') @@ -141,7 +225,15 @@ def a_sync( coro_fn: Literal[None], default: Literal["sync"], **modifiers: Unpack[ModifierKwargs], -) -> ASyncDecoratorSyncDefault: ... +) -> ASyncDecoratorSyncDefault: + """ + Creates a synchronous default decorator with no function specified. + + Args: + coro_fn: Specifies no function. + default: Specifies the default execution mode as 'sync'. + **modifiers: Additional keyword arguments to modify the behavior of the decorated function. + """ @overload # if you try to use default as the only arg @@ -149,7 +241,15 @@ def a_sync( coro_fn: Literal["sync"], default: Literal[None] = None, **modifiers: Unpack[ModifierKwargs], -) -> ASyncDecoratorSyncDefault: ... +) -> ASyncDecoratorSyncDefault: + """ + Creates a synchronous default decorator with no default execution mode specified. + + Args: + coro_fn: Specifies the default execution mode as 'sync'. + default: Specifies no default execution mode. + **modifiers: Additional keyword arguments to modify the behavior of the decorated function. + """ @overload # if you try to use default as the only arg @@ -157,7 +257,15 @@ def a_sync( coro_fn: Literal["sync"], default: Literal[None], **modifiers: Unpack[ModifierKwargs], -) -> ASyncDecoratorSyncDefault: ... +) -> ASyncDecoratorSyncDefault: + """ + Creates a synchronous default decorator with no default execution mode specified. + + Args: + coro_fn: Specifies the default execution mode as 'sync'. + default: Specifies no default execution mode. + **modifiers: Additional keyword arguments to modify the behavior of the decorated function. + """ # catchall @@ -183,84 +291,71 @@ def a_sync( See :class:`ModifierKwargs` for available options. Modifiers: - lib defaults: - async settings - cache_type: CacheType = None, - This can be None or 'memory'. 'memory' is a lru cache which can be modified with the 'cache_typed','ram_cache_maxsize','ram_cache_ttl' modifiers. - cache_typed: bool = False, - Set to True if you want types considered treated for cache keys. ie with cache_typed=True, Decimal(0) and 0 will be considered separate keys. - ram_cache_maxsize: Optional[int] = -1, - The maxsize for your lru cache. None if cache is unbounded. If you set this value without specifying a cache type, 'memory' will automatically be applied. - ram_cache_ttl: Optional[int] = None, - The ttl for items in your lru cache. Set to None. If you set this value without specifying a cache type, 'memory' will automatically be applied. - runs_per_minute: Optional[int] = None, - Setting this value enables a rate limiter for the decorated function. - semaphore: SemaphoreSpec = None, - drop in a Semaphore for your async defined functions. - sync settings - executor: Executor = config.default_sync_executor - - Returns: - An :class:`~ASyncDecorator` if used as a decorator factory, or an :class:`~ASyncFunction` - if used directly on a function. + The following modifiers can be used to customize the behavior of the decorator: + + - cache_type: Can be None or 'memory'. 'memory' is an LRU cache which can be modified with + the 'cache_typed', 'ram_cache_maxsize', and 'ram_cache_ttl' modifiers. + - cache_typed: Set to True if you want types considered for cache keys. For example, with + cache_typed=True, Decimal(0) and 0 will be considered separate keys. + - ram_cache_maxsize: The max size for your LRU cache. None if the cache is unbounded. If you + set this value without specifying a cache type, 'memory' will automatically be applied. + - ram_cache_ttl: The TTL for items in your LRU cache. Set to None. If you set this value + without specifying a cache type, 'memory' will automatically be applied. + - runs_per_minute: Setting this value enables a rate limiter for the decorated function. + - semaphore: Drop in a Semaphore for your async defined functions. + - executor: The executor for the synchronous function. Set to the library's default of + config.default_sync_executor. Examples: The decorator can be used in several ways. 1. As a simple decorator: - ```python - >>> @a_sync - ... async def some_async_fn(): - ... return True - >>> await some_fn() - True - >>> some_fn(sync=True) - True - ``` - ``` - >>> @a_sync - ... def some_sync_fn(): - ... return True - >>> some_sync_fn() - True - >>> some_sync_fn(sync=False) - - ``` + >>> @a_sync + ... async def some_async_fn(): + ... return True + >>> await some_fn() + True + >>> some_fn(sync=True) + True + + >>> @a_sync + ... def some_sync_fn(): + ... return True + >>> some_sync_fn() + True + >>> some_sync_fn(sync=False) + 2. As a decorator with default mode specified: - ```python - >>> @a_sync(default='sync') - ... async def some_fn(): - ... return True - ... - >>> some_fn() - True - ``` + >>> @a_sync(default='sync') + ... async def some_fn(): + ... return True + ... + >>> some_fn() + True 3. As a decorator with modifiers: - ```python - >>> @a_sync(cache_type='memory', runs_per_minute=60) - ... async def some_fn(): - ... return True - ... - >>> some_fn(sync=True) - True - ``` + >>> @a_sync(cache_type='memory', runs_per_minute=60) + ... async def some_fn(): + ... return True + ... + >>> some_fn(sync=True) + True 4. Applied directly to a function: - ```python - >>> some_fn = a_sync(some_existing_function, default='sync') - >>> some_fn() - "some return value" - ``` + >>> some_fn = a_sync(some_existing_function, default='sync') + >>> some_fn() + "some return value" The decorated function can then be called either synchronously or asynchronously: - ```python - result = some_fn() # Synchronous call - result = await some_fn() # Asynchronous call - ``` + result = some_fn() # Synchronous call + result = await some_fn() # Asynchronous call The execution mode can also be explicitly specified during the call: - ```python - result = some_fn(sync=True) # Force synchronous execution - result = await some_fn(sync=False) # Force asynchronous execution - ``` + result = some_fn(sync=True) # Force synchronous execution + result = await some_fn(sync=False) # Force asynchronous execution This decorator is particularly useful for libraries that need to support both synchronous and asynchronous usage, or for gradually migrating @@ -281,4 +376,4 @@ def a_sync( return deco if coro_fn is None else deco(coro_fn) # type: ignore [arg-type] -# TODO: in a future release, I will make this usable with sync functions as well +# TODO: in a future release, I will make this usable with sync functions as well \ No newline at end of file diff --git a/a_sync/a_sync/function.py b/a_sync/a_sync/function.py index 7a75cca8..a405938e 100644 --- a/a_sync/a_sync/function.py +++ b/a_sync/a_sync/function.py @@ -22,12 +22,14 @@ logger = logging.getLogger(__name__) -class ModifiedMixin: +class _ModifiedMixin: """ - A mixin class that provides functionality for applying modifiers to functions. + A mixin class for internal use that provides functionality for applying modifiers to functions. - This class is used as a base for :class:`~ASyncFunction` and its variants to handle - the application of async and sync modifiers to functions. + This class is used as a base for :class:`~ASyncFunction` and its variants, such as + `ASyncFunctionAsyncDefault` and `ASyncFunctionSyncDefault`, to handle the application + of async and sync modifiers to functions. Modifiers can alter the behavior of functions, + such as converting sync functions to async, applying caching, or rate limiting. """ modifiers: ModifierManager @@ -35,13 +37,13 @@ class ModifiedMixin: def _asyncify(self, func: SyncFn[P, T]) -> CoroFn[P, T]: """ - Convert a synchronous function to an asynchronous one and apply async modifiers. + Converts a synchronous function to an asynchronous one and applies async modifiers. Args: func: The synchronous function to be converted. Returns: - An asynchronous function with async modifiers applied. + The asynchronous version of the function with applied modifiers. """ coro_fn = _helpers._asyncify(func, self.modifiers.executor) return self.modifiers.apply_async_modifiers(coro_fn) @@ -49,26 +51,34 @@ def _asyncify(self, func: SyncFn[P, T]) -> CoroFn[P, T]: @functools.cached_property def _await(self) -> Callable[[Awaitable[T]], T]: """ - Apply sync modifiers to the _helpers._await function and cache it. + Applies sync modifiers to the _helpers._await function and caches it. Returns: - A function that applies sync modifiers to awaitable objects. + The modified _await function. """ return self.modifiers.apply_sync_modifiers(_helpers._await) @functools.cached_property def default(self) -> DefaultMode: """ - Get the default execution mode (sync, async, or None) for the function. + Gets the default execution mode (sync, async, or None) for the function. Returns: - The default execution mode as determined by the modifiers. + The default execution mode. """ return self.modifiers.default def _validate_wrapped_fn(fn: Callable) -> None: - """Ensures 'fn' is an appropriate function for wrapping with a_sync.""" + """Ensures 'fn' is an appropriate function for wrapping with a_sync. + + Args: + fn: The function to validate. + + Raises: + TypeError: If the input is not callable. + RuntimeError: If the function has arguments with names that conflict with viable flags. + """ if isinstance(fn, (AsyncPropertyDescriptor, AsyncCachedPropertyDescriptor)): return # These are always valid if not callable(fn): @@ -84,7 +94,7 @@ def _validate_wrapped_fn(fn: Callable) -> None: ) -class ASyncFunction(ModifiedMixin, Generic[P, T]): +class ASyncFunction(_ModifiedMixin, Generic[P, T]): """ A callable wrapper object that can be executed both synchronously and asynchronously. @@ -96,7 +106,6 @@ class ASyncFunction(ModifiedMixin, Generic[P, T]): such as caching, rate limiting, and execution in specific contexts (e.g., thread pools). Example: - ```python async def my_coroutine(x: int) -> str: return str(x) @@ -107,7 +116,6 @@ async def my_coroutine(x: int) -> str: # Asynchronous call result = await func(5) # returns "5" - ``` """ # NOTE: We can't use __slots__ here because it breaks functools.update_wrapper @@ -115,14 +123,16 @@ async def my_coroutine(x: int) -> str: @overload def __init__( self, fn: CoroFn[P, T], **modifiers: Unpack[ModifierKwargs] - ) -> None: ... + ) -> None: + # TODO write specific docs for this overload @overload def __init__( self, fn: SyncFn[P, T], **modifiers: Unpack[ModifierKwargs] - ) -> None: ... + ) -> None: + # TODO write specific docs for this overload def __init__(self, fn: AnyFn[P, T], **modifiers: Unpack[ModifierKwargs]) -> None: """ - Initialize an ASyncFunction instance. + Initializes an ASyncFunction instance. Args: fn: The function to wrap. @@ -145,24 +155,29 @@ def __init__(self, fn: AnyFn[P, T], **modifiers: Unpack[ModifierKwargs]) -> None ) @overload - def __call__(self, *args: P.args, sync: Literal[True], **kwargs: P.kwargs) -> T: ... + def __call__(self, *args: P.args, sync: Literal[True], **kwargs: P.kwargs) -> T: + # TODO write specific docs for this overload @overload def __call__( self, *args: P.args, sync: Literal[False], **kwargs: P.kwargs - ) -> Coroutine[Any, Any, T]: ... + ) -> Coroutine[Any, Any, T]: + # TODO write specific docs for this overload @overload def __call__( self, *args: P.args, asynchronous: Literal[False], **kwargs: P.kwargs - ) -> T: ... + ) -> T: + # TODO write specific docs for this overload @overload def __call__( self, *args: P.args, asynchronous: Literal[True], **kwargs: P.kwargs - ) -> Coroutine[Any, Any, T]: ... + ) -> Coroutine[Any, Any, T]: + # TODO write specific docs for this overload @overload - def __call__(self, *args: P.args, **kwargs: P.kwargs) -> MaybeCoro[T]: ... + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> MaybeCoro[T]: + # TODO write specific docs for this overload def __call__(self, *args: P.args, **kwargs: P.kwargs) -> MaybeCoro[T]: """ - Call the wrapped function either synchronously or asynchronously. + Calls the wrapped function either synchronously or asynchronously. This method determines whether to execute the wrapped function synchronously or asynchronously based on the default mode and any provided flags. @@ -171,9 +186,6 @@ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> MaybeCoro[T]: *args: Positional arguments to pass to the wrapped function. **kwargs: Keyword arguments to pass to the wrapped function. - Returns: - The result of the wrapped function call, which may be a coroutine if run asynchronously. - Raises: Exception: Any exception that may be raised by the wrapped function. """ @@ -207,7 +219,7 @@ def map( **function_kwargs: P.kwargs, ) -> "TaskMapping[P, T]": """ - Create a TaskMapping for the wrapped function with the given iterables. + Creates a TaskMapping for the wrapped function with the given iterables. Args: *iterables: Iterable objects to be used as arguments for the function. @@ -216,7 +228,7 @@ def map( **function_kwargs: Additional keyword arguments to pass to the function. Returns: - A TaskMapping object. + A TaskMapping object for managing concurrent execution. """ from a_sync import TaskMapping @@ -236,7 +248,7 @@ async def any( **function_kwargs: P.kwargs, ) -> bool: """ - Check if any result of the function applied to the iterables is truthy. + Checks if any result of the function applied to the iterables is truthy. Args: *iterables: Iterable objects to be used as arguments for the function. @@ -245,7 +257,7 @@ async def any( **function_kwargs: Additional keyword arguments to pass to the function. Returns: - A boolean indicating if any result is truthy. + True if any result is truthy, otherwise False. """ return await self.map( *iterables, @@ -262,7 +274,7 @@ async def all( **function_kwargs: P.kwargs, ) -> bool: """ - Check if all results of the function applied to the iterables are truthy. + Checks if all results of the function applied to the iterables are truthy. Args: *iterables: Iterable objects to be used as arguments for the function. @@ -271,7 +283,7 @@ async def all( **function_kwargs: Additional keyword arguments to pass to the function. Returns: - A boolean indicating if all results are truthy. + True if all results are truthy, otherwise False. """ return await self.map( *iterables, @@ -288,7 +300,7 @@ async def min( **function_kwargs: P.kwargs, ) -> T: """ - Find the minimum result of the function applied to the iterables. + Finds the minimum result of the function applied to the iterables. Args: *iterables: Iterable objects to be used as arguments for the function. @@ -314,7 +326,7 @@ async def max( **function_kwargs: P.kwargs, ) -> T: """ - Find the maximum result of the function applied to the iterables. + Finds the maximum result of the function applied to the iterables. Args: *iterables: Iterable objects to be used as arguments for the function. @@ -340,7 +352,7 @@ async def sum( **function_kwargs: P.kwargs, ) -> T: """ - Calculate the sum of the results of the function applied to the iterables. + Calculates the sum of the results of the function applied to the iterables. Args: *iterables: Iterable objects to be used as arguments for the function. @@ -368,7 +380,7 @@ def map( **function_kwargs: P.kwargs, ) -> "TaskMapping[P, T]": """ - Create a TaskMapping for the wrapped function with the given iterables. + Creates a TaskMapping for the wrapped function with the given iterables. Args: *iterables: Iterable objects to be used as arguments for the function. @@ -377,7 +389,7 @@ def map( **function_kwargs: Additional keyword arguments to pass to the function. Returns: - A TaskMapping object. + A TaskMapping object for managing concurrent execution. """ from a_sync import TaskMapping @@ -397,7 +409,7 @@ async def any( **function_kwargs: P.kwargs, ) -> bool: """ - Check if any result of the function applied to the iterables is truthy. + Checks if any result of the function applied to the iterables is truthy. Args: *iterables: Iterable objects to be used as arguments for the function. @@ -406,7 +418,7 @@ async def any( **function_kwargs: Additional keyword arguments to pass to the function. Returns: - A boolean indicating if any result is truthy. + True if any result is truthy, otherwise False. """ return await self.map( *iterables, @@ -423,7 +435,7 @@ async def all( **function_kwargs: P.kwargs, ) -> bool: """ - Check if all results of the function applied to the iterables are truthy. + Checks if all results of the function applied to the iterables are truthy. Args: *iterables: Iterable objects to be used as arguments for the function. @@ -432,7 +444,7 @@ async def all( **function_kwargs: Additional keyword arguments to pass to the function. Returns: - A boolean indicating if all results are truthy. + True if all results are truthy, otherwise False. """ return await self.map( *iterables, @@ -449,7 +461,7 @@ async def min( **function_kwargs: P.kwargs, ) -> T: """ - Find the minimum result of the function applied to the iterables. + Finds the minimum result of the function applied to the iterables. Args: *iterables: Iterable objects to be used as arguments for the function. @@ -475,7 +487,7 @@ async def max( **function_kwargs: P.kwargs, ) -> T: """ - Find the maximum result of the function applied to the iterables. + Finds the maximum result of the function applied to the iterables. Args: *iterables: Iterable objects to be used as arguments for the function. @@ -501,7 +513,7 @@ async def sum( **function_kwargs: P.kwargs, ) -> T: """ - Calculate the sum of the results of the function applied to the iterables. + Calculates the sum of the results of the function applied to the iterables. Args: *iterables: Iterable objects to be used as arguments for the function. @@ -522,13 +534,13 @@ async def sum( @functools.cached_property def _sync_default(self) -> bool: """ - Determine the default execution mode (sync or async) for the function. + Determines the default execution mode (sync or async) for the function. If the user did not specify a default, this method defers to the function's definition (sync vs async def). Returns: - True if the default is sync, False if the default is async. + True if the default is sync, False if async. """ return ( True @@ -539,16 +551,16 @@ def _sync_default(self) -> bool: @functools.cached_property def _async_def(self) -> bool: """ - Check if the wrapped function is an asynchronous function. + Checks if the wrapped function is an asynchronous function. Returns: - True if the wrapped function is an asynchronous function, False otherwise. + True if the function is asynchronous, otherwise False. """ return asyncio.iscoroutinefunction(self.__wrapped__) def _run_sync(self, kwargs: dict) -> bool: """ - Determine whether to run the function synchronously or asynchronously. + Determines whether to run the function synchronously or asynchronously. This method checks for a flag in the kwargs and defers to it if present. If no flag is specified, it defers to the default execution mode. @@ -557,7 +569,7 @@ def _run_sync(self, kwargs: dict) -> bool: kwargs: The keyword arguments passed to the function. Returns: - True if the function should be run synchronously, False otherwise. + True if the function should run synchronously, otherwise False. """ if flag := _kwargs.get_flag_name(kwargs): # If a flag was specified in the kwargs, we will defer to it. @@ -569,10 +581,13 @@ def _run_sync(self, kwargs: dict) -> bool: @functools.cached_property def _asyncified(self) -> CoroFn[P, T]: """ - Convert the wrapped function to an asynchronous function and apply both sync and async modifiers. + Converts the wrapped function to an asynchronous function and applies both sync and async modifiers. + + Raises: + TypeError: If the wrapped function is already asynchronous. Returns: - An asynchronous function with both sync and async modifiers applied. + The asynchronous version of the wrapped function. """ if self._async_def: raise TypeError( @@ -583,13 +598,13 @@ def _asyncified(self) -> CoroFn[P, T]: @functools.cached_property def _modified_fn(self) -> AnyFn[P, T]: """ - Apply modifiers to the wrapped function. + Applies modifiers to the wrapped function. If the wrapped function is an asynchronous function, this method applies async modifiers. If the wrapped function is a synchronous function, this method applies sync modifiers. Returns: - The wrapped function with modifiers applied. + The modified function. """ if self._async_def: return self.modifiers.apply_async_modifiers(self.__wrapped__) # type: ignore [arg-type] @@ -603,7 +618,7 @@ def _async_wrap(self): # -> SyncFn[[CoroFn[P, T]], MaybeAwaitable[T]]: This method applies the appropriate modifiers and determines whether to await the result. Returns: - The final wrapped function. + The wrapped function with async handling. """ @functools.wraps(self._modified_fn) @@ -624,7 +639,7 @@ def _sync_wrap(self): # -> SyncFn[[SyncFn[P, T]], MaybeAwaitable[T]]: This method applies the appropriate modifiers and determines whether to run the function synchronously or asynchronously. Returns: - The final wrapped function. + The wrapped function with sync handling. """ @functools.wraps(self._modified_fn) @@ -644,13 +659,28 @@ def sync_wrap(*args: P.args, **kwargs: P.kwargs) -> MaybeAwaitable[T]: # type: _inherit = ASyncFunction[[AnyFn[P, T]], ASyncFunction[P, T]] -class ASyncDecorator(ModifiedMixin): +class ASyncDecorator(_ModifiedMixin): def __init__(self, **modifiers: Unpack[ModifierKwargs]) -> None: + """ + Initializes an ASyncDecorator instance. + + Args: + **modifiers: Keyword arguments for function modifiers. + + Raises: + ValueError: If 'default' is not 'sync', 'async', or None. + """ assert "default" in modifiers, modifiers self.modifiers = ModifierManager(modifiers) self.validate_inputs() def validate_inputs(self) -> None: + """ + Validates the input modifiers. + + Raises: + ValueError: If 'default' is not 'sync', 'async', or None. + """ if self.modifiers.default not in ["sync", "async", None]: raise ValueError( f"'default' must be either 'sync', 'async', or None. You passed {self.modifiers.default}." @@ -658,13 +688,22 @@ def validate_inputs(self) -> None: @overload def __call__(self, func: AnyFn[Concatenate[B, P], T]) -> "ASyncBoundMethod[B, P, T]": # type: ignore [override] - ... + # TODO write specific docs for this overload @overload def __call__(self, func: AnyFn[P, T]) -> ASyncFunction[P, T]: # type: ignore [override] - ... + # TODO write specific docs for this overload def __call__(self, func: AnyFn[P, T]) -> ASyncFunction[P, T]: # type: ignore [override] + """ + Decorates a function with async or sync behavior based on the default modifier. + + Args: + func: The function to decorate. + + Returns: + An ASyncFunction instance with the appropriate default behavior. + """ if self.default == "async": return ASyncFunctionAsyncDefault(func, **self.modifiers) elif self.default == "sync": @@ -676,6 +715,14 @@ def __call__(self, func: AnyFn[P, T]) -> ASyncFunction[P, T]: # type: ignore [o def _check_not_genfunc(func: Callable) -> None: + """Raises an error if the function is a generator or async generator. + + Args: + func: The function to check. + + Raises: + ValueError: If the function is a generator or async generator. + """ if inspect.isasyncgenfunction(func) or inspect.isgeneratorfunction(func): raise ValueError("unable to decorate generator functions with this decorator") @@ -694,7 +741,6 @@ class ASyncFunctionSyncDefault(ASyncFunction[P, T]): or `asynchronous=True` as a keyword argument. Example: - ```python @a_sync(default='sync') async def my_function(x: int) -> str: return str(x) @@ -704,27 +750,31 @@ async def my_function(x: int) -> str: # Asynchronous call result = await my_function(5, sync=False) # returns "5" - ``` """ @overload - def __call__(self, *args: P.args, sync: Literal[True], **kwargs: P.kwargs) -> T: ... + def __call__(self, *args: P.args, sync: Literal[True], **kwargs: P.kwargs) -> T: + # TODO write specific docs for this overload @overload def __call__( self, *args: P.args, sync: Literal[False], **kwargs: P.kwargs - ) -> Coroutine[Any, Any, T]: ... + ) -> Coroutine[Any, Any, T]: + # TODO write specific docs for this overload @overload def __call__( self, *args: P.args, asynchronous: Literal[False], **kwargs: P.kwargs - ) -> T: ... + ) -> T: + # TODO write specific docs for this overload @overload def __call__( self, *args: P.args, asynchronous: Literal[True], **kwargs: P.kwargs - ) -> Coroutine[Any, Any, T]: ... + ) -> Coroutine[Any, Any, T]: + # TODO write specific docs for this overload @overload - def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: ... + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: + # TODO write specific docs for this overload def __call__(self, *args: P.args, **kwargs: P.kwargs) -> MaybeCoro[T]: - """Call the wrapped function, defaulting to synchronous execution. + """Calls the wrapped function, defaulting to synchronous execution. This method overrides the base :meth:`ASyncFunction.__call__` to provide a synchronous default behavior. @@ -733,11 +783,11 @@ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> MaybeCoro[T]: *args: Positional arguments to pass to the wrapped function. **kwargs: Keyword arguments to pass to the wrapped function. - Returns: - The result of the wrapped function call. - Raises: Exception: Any exception that may be raised by the wrapped function. + + Returns: + The result of the function call. """ return self.fn(*args, **kwargs) @@ -756,7 +806,6 @@ class ASyncFunctionAsyncDefault(ASyncFunction[P, T]): or `asynchronous=False` as a keyword argument. Example: - ```python @a_sync(default='async') async def my_function(x: int) -> str: return str(x) @@ -766,27 +815,31 @@ async def my_function(x: int) -> str: # Synchronous call result = my_function(5, sync=True) # returns "5" - ``` """ @overload - def __call__(self, *args: P.args, sync: Literal[True], **kwargs: P.kwargs) -> T: ... + def __call__(self, *args: P.args, sync: Literal[True], **kwargs: P.kwargs) -> T: + # TODO write specific docs for this overload @overload def __call__( self, *args: P.args, sync: Literal[False], **kwargs: P.kwargs - ) -> Coroutine[Any, Any, T]: ... + ) -> Coroutine[Any, Any, T]: + # TODO write specific docs for this overload @overload def __call__( self, *args: P.args, asynchronous: Literal[False], **kwargs: P.kwargs - ) -> T: ... + ) -> T: + # TODO write specific docs for this overload @overload def __call__( self, *args: P.args, asynchronous: Literal[True], **kwargs: P.kwargs - ) -> Coroutine[Any, Any, T]: ... + ) -> Coroutine[Any, Any, T]: + # TODO write specific docs for this overload @overload - def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Coroutine[Any, Any, T]: ... + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Coroutine[Any, Any, T]: + # TODO write specific docs for this overload def __call__(self, *args: P.args, **kwargs: P.kwargs) -> MaybeCoro[T]: - """Call the wrapped function, defaulting to asynchronous execution. + """Calls the wrapped function, defaulting to asynchronous execution. This method overrides the base :meth:`ASyncFunction.__call__` to provide an asynchronous default behavior. @@ -795,11 +848,11 @@ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> MaybeCoro[T]: *args: Positional arguments to pass to the wrapped function. **kwargs: Keyword arguments to pass to the wrapped function. - Returns: - A coroutine object representing the asynchronous execution of the wrapped function. - Raises: Exception: Any exception that may be raised by the wrapped function. + + Returns: + The result of the function call. """ return self.fn(*args, **kwargs) @@ -809,32 +862,34 @@ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> MaybeCoro[T]: class ASyncDecoratorSyncDefault(ASyncDecorator): @overload def __call__(self, func: AnyFn[Concatenate[B, P], T]) -> "ASyncBoundMethodSyncDefault[P, T]": # type: ignore [override] - ... + # TODO write specific docs for this overload @overload def __call__(self, func: AnyBoundMethod[P, T]) -> ASyncFunctionSyncDefault[P, T]: # type: ignore [override] - ... + # TODO write specific docs for this overload @overload def __call__(self, func: AnyFn[P, T]) -> ASyncFunctionSyncDefault[P, T]: # type: ignore [override] - ... + # TODO write specific docs for this overload def __call__(self, func: AnyFn[P, T]) -> ASyncFunctionSyncDefault[P, T]: + # TODO write specific docs for this overload return ASyncFunctionSyncDefault(func, **self.modifiers) class ASyncDecoratorAsyncDefault(ASyncDecorator): @overload def __call__(self, func: AnyFn[Concatenate[B, P], T]) -> "ASyncBoundMethodAsyncDefault[P, T]": # type: ignore [override] - ... + # TODO write specific docs for this overload @overload def __call__(self, func: AnyBoundMethod[P, T]) -> ASyncFunctionAsyncDefault[P, T]: # type: ignore [override] - ... + # TODO write specific docs for this overload @overload def __call__(self, func: AnyFn[P, T]) -> ASyncFunctionAsyncDefault[P, T]: # type: ignore [override] - ... + # TODO write specific docs for this overload def __call__(self, func: AnyFn[P, T]) -> ASyncFunctionAsyncDefault[P, T]: - return ASyncFunctionAsyncDefault(func, **self.modifiers) + # TODO write specific docs for this overload + return ASyncFunctionAsyncDefault(func, **self.modifiers) \ No newline at end of file diff --git a/a_sync/a_sync/modifiers/__init__.py b/a_sync/a_sync/modifiers/__init__.py index 5f32da2d..d4f8600c 100644 --- a/a_sync/a_sync/modifiers/__init__.py +++ b/a_sync/a_sync/modifiers/__init__.py @@ -1,3 +1,25 @@ +""" +This file contains all logic for ez-a-sync's "modifiers". + +Modifiers modify the behavior of ez-a-sync's ASync objects in various ways. + +Submodules: + cache: Handles caching mechanisms for async functions. + limiter: Manages rate limiting for async functions. + manager: Provides management of valid modifiers and their application. + semaphores: Implements semaphore logic for controlling concurrency. + +The modifiers available are: +- `cache_type`: Specifies the type of cache to use, such as 'memory'. +- `cache_typed`: Determines if types are considered for cache keys. +- `ram_cache_maxsize`: Sets the maximum size for the LRU cache. +- `ram_cache_ttl`: Defines the time-to-live for items in the cache. +- `runs_per_minute`: Sets a rate limit for function execution. +- `semaphore`: Specifies a semaphore for controlling concurrency. +- `executor`: Defines the executor for synchronous functions. + +""" + from aiolimiter import AsyncLimiter from a_sync._typing import * @@ -6,6 +28,15 @@ def get_modifiers_from(thing: Union[dict, type, object]) -> ModifierKwargs: + """Extracts valid modifiers from a given object, type, or dictionary. + + Args: + thing: The source from which to extract modifiers. It can be a dictionary, + a type, or an object. + + Returns: + A ModifierKwargs object containing the valid modifiers extracted from the input. + """ if isinstance(thing, dict): apply_class_defined_modifiers(thing) return ModifierKwargs({modifier: thing[modifier] for modifier in valid_modifiers if modifier in thing}) # type: ignore [misc] @@ -13,7 +44,18 @@ def get_modifiers_from(thing: Union[dict, type, object]) -> ModifierKwargs: def apply_class_defined_modifiers(attrs_from_metaclass: dict): + """Applies class-defined modifiers to a dictionary of attributes. + + This function modifies the input dictionary in place. If the 'semaphore' key + is present and its value is an integer, it is converted to a ThreadsafeSemaphore. + If the 'runs_per_minute' key is present and its value is an integer, it is + converted to an AsyncLimiter. If these keys are not present or their values + are not integers, the function will silently do nothing. + + Args: + attrs_from_metaclass: A dictionary of attributes from a metaclass. + """ if isinstance(val := attrs_from_metaclass.get("semaphore"), int): attrs_from_metaclass["semaphore"] = ThreadsafeSemaphore(val) if isinstance(val := attrs_from_metaclass.get("runs_per_minute"), int): - attrs_from_metaclass["runs_per_minute"] = AsyncLimiter(val) + attrs_from_metaclass["runs_per_minute"] = AsyncLimiter(val) \ No newline at end of file diff --git a/a_sync/a_sync/modifiers/cache/__init__.py b/a_sync/a_sync/modifiers/cache/__init__.py index 4605f2e1..908a53ae 100644 --- a/a_sync/a_sync/modifiers/cache/__init__.py +++ b/a_sync/a_sync/modifiers/cache/__init__.py @@ -8,6 +8,7 @@ class CacheArgs(TypedDict): + """Typed dictionary for cache arguments.""" cache_type: CacheType cache_typed: bool ram_cache_maxsize: Optional[int] @@ -16,24 +17,23 @@ class CacheArgs(TypedDict): @overload def apply_async_cache( - coro_fn: Literal[None], **modifiers: Unpack[CacheArgs], -) -> AsyncDecorator[P, T]: ... - +) -> AsyncDecorator[P, T]: + """Overload for when no coroutine function is provided.""" @overload def apply_async_cache( coro_fn: int, **modifiers: Unpack[CacheArgs], -) -> AsyncDecorator[P, T]: ... - +) -> AsyncDecorator[P, T]: + """Overload for when an integer is provided as the coroutine function.""" @overload def apply_async_cache( coro_fn: CoroFn[P, T], **modifiers: Unpack[CacheArgs], -) -> CoroFn[P, T]: ... - +) -> CoroFn[P, T]: + """Overload for when a coroutine function is provided.""" def apply_async_cache( coro_fn: Union[CoroFn[P, T], CacheType, int] = None, @@ -42,7 +42,24 @@ def apply_async_cache( ram_cache_maxsize: Optional[int] = None, ram_cache_ttl: Optional[int] = None, ) -> AsyncDecoratorOrCoroFn[P, T]: + """Applies an asynchronous cache to a coroutine function. + + Args: + coro_fn: The coroutine function to apply the cache to, or an integer to set as the max size. + cache_type: The type of cache to use. Currently, only 'memory' is implemented. + cache_typed: Whether to consider types for cache keys. + ram_cache_maxsize: The maximum size for the LRU cache. If set to an integer, it overrides coro_fn. + ram_cache_ttl: The time-to-live for items in the LRU cache. + + Raises: + TypeError: If 'ram_cache_maxsize' is not an integer or None. + FunctionNotAsync: If the provided function is not asynchronous. + NotImplementedError: If an unsupported cache type is specified. + Returns: + A decorator or the decorated coroutine function. + """ + # Parse Inputs if isinstance(coro_fn, int): assert ram_cache_maxsize is None diff --git a/a_sync/a_sync/modifiers/cache/memory.py b/a_sync/a_sync/modifiers/cache/memory.py index 5363394a..8b7ac41c 100644 --- a/a_sync/a_sync/modifiers/cache/memory.py +++ b/a_sync/a_sync/modifiers/cache/memory.py @@ -9,6 +9,7 @@ class CacheKwargs(TypedDict): + """Typed dictionary for cache keyword arguments.""" maxsize: Optional[int] ttl: Optional[int] typed: bool @@ -16,26 +17,69 @@ class CacheKwargs(TypedDict): @overload def apply_async_memory_cache( - coro_fn: Literal[None], **kwargs: Unpack[CacheKwargs] -) -> AsyncDecorator[P, T]: ... + **kwargs: Unpack[CacheKwargs] +) -> AsyncDecorator[P, T]: + """Creates a decorator to apply an asynchronous LRU cache. + + This overload is used when no coroutine function is provided. The returned + decorator can be applied to a coroutine function later. + + Args: + **kwargs: Keyword arguments for cache configuration, including maxsize, + ttl, and typed. + """ @overload def apply_async_memory_cache( - coro_fn: int, **kwargs: Unpack[CacheKwargs] -) -> AsyncDecorator[P, T]: ... + coro_fn: int, + **kwargs: Unpack[CacheKwargs] +) -> AsyncDecorator[P, T]: + """Creates a decorator with maxsize set by an integer. + + This overload is used when an integer is provided as the coroutine function, + which sets the maxsize for the cache. The returned decorator can be applied + to a coroutine function later. + + Args: + coro_fn: An integer to set as maxsize for the cache. + **kwargs: Additional keyword arguments for cache configuration, including + ttl and typed. + """ @overload def apply_async_memory_cache( - coro_fn: CoroFn[P, T], **kwargs: Unpack[CacheKwargs] -) -> CoroFn[P, T]: ... + coro_fn: CoroFn[P, T], + **kwargs: Unpack[CacheKwargs] +) -> CoroFn[P, T]: + """Applies an asynchronous LRU cache to a provided coroutine function. + + This overload is used when a coroutine function is provided. The cache is + applied directly to the function. + + Args: + coro_fn: The coroutine function to be cached. + **kwargs: Keyword arguments for cache configuration, including maxsize, + ttl, and typed. + """ @overload def apply_async_memory_cache( - coro_fn: Literal[None], **kwargs: Unpack[CacheKwargs] -) -> AsyncDecorator[P, T]: ... + coro_fn: Literal[None], + **kwargs: Unpack[CacheKwargs] +) -> AsyncDecorator[P, T]: + """Creates a decorator to apply an asynchronous LRU cache. + + This duplicate overload is used when no coroutine function is provided. The + returned decorator can be applied to a coroutine function later. + + Args: + coro_fn: None, indicating no coroutine function is provided. + **kwargs: Keyword arguments for cache configuration, including maxsize, + ttl, and typed. + """ def apply_async_memory_cache( @@ -44,6 +88,27 @@ def apply_async_memory_cache( ttl: Optional[int] = None, typed: bool = False, ) -> AsyncDecoratorOrCoroFn[P, T]: + """Applies an asynchronous LRU cache to a coroutine function. + + This function uses the `alru_cache` from the `async_lru` library to cache + the results of an asynchronous coroutine function. The cache can be configured + with a maximum size, a time-to-live (TTL), and whether the cache keys should + be typed. + + Args: + coro_fn: The coroutine function to be cached, or an integer to set as maxsize. + maxsize: The maximum size of the cache. If set to -1, it is converted to None, + making the cache unbounded. + ttl: The time-to-live for cache entries in seconds. If None, entries do not expire. + typed: Whether to consider the types of arguments as part of the cache key. + + Raises: + TypeError: If `maxsize` is not a positive integer or None when `coro_fn` is None. + exceptions.FunctionNotAsync: If `coro_fn` is not an asynchronous function. + + Returns: + A decorator if `coro_fn` is None, otherwise the cached coroutine function. + """ # Parse Inputs if isinstance(coro_fn, int): assert maxsize is None @@ -63,4 +128,4 @@ def apply_async_memory_cache( maxsize = None cache_decorator = alru_cache(maxsize=maxsize, ttl=ttl, typed=typed) - return cache_decorator if coro_fn is None else cache_decorator(coro_fn) + return cache_decorator if coro_fn is None else cache_decorator(coro_fn) \ No newline at end of file diff --git a/a_sync/a_sync/modifiers/limiter.py b/a_sync/a_sync/modifiers/limiter.py index 864b62c3..771b19a2 100644 --- a/a_sync/a_sync/modifiers/limiter.py +++ b/a_sync/a_sync/modifiers/limiter.py @@ -8,39 +8,57 @@ from a_sync._typing import * -@overload -def apply_rate_limit( - coro_fn: Literal[None], - runs_per_minute: int, -) -> AsyncDecorator[P, T]: ... - +LimiterSpec = Union[int, AsyncLimiter] @overload def apply_rate_limit( - coro_fn: int, - runs_per_minute: Literal[None], -) -> AsyncDecorator[P, T]: ... - + runs_per_minute: int, +) -> AsyncDecorator[P, T]: + """Decorator to apply a rate limit to an asynchronous function. + Args: + runs_per_minute: The number of allowed executions per minute. + """ + @overload def apply_rate_limit( coro_fn: CoroFn[P, T], - runs_per_minute: Union[int, AsyncLimiter], -) -> CoroFn[P, T]: ... - - + runs_per_minute: LimiterSpec, +) -> CoroFn[P, T]: + """Decorator to apply a rate limit to an asynchronous function. + + Args: + coro_fn: The coroutine function to be rate-limited. + runs_per_minute: The number of allowed executions per minute or an AsyncLimiter instance. + """ + def apply_rate_limit( coro_fn: Optional[Union[CoroFn[P, T], int]] = None, - runs_per_minute: Optional[Union[int, AsyncLimiter]] = None, + runs_per_minute: Optional[LimiterSpec] = None, ) -> AsyncDecoratorOrCoroFn[P, T]: + """Applies a rate limit to an asynchronous function. + + This function can be used as a decorator to limit the number of times + an asynchronous function can be called per minute. It can be configured + with either an integer specifying the number of runs per minute or an + AsyncLimiter instance. + + Args: + coro_fn: The coroutine function to be rate-limited. If an integer is provided, it is treated as runs per minute, and runs_per_minute should be None. + runs_per_minute: The number of allowed executions per minute or an AsyncLimiter instance. If coro_fn is an integer, this should be None. + + Raises: + TypeError: If 'runs_per_minute' is neither an integer nor an AsyncLimiter when 'coro_fn' is None. + exceptions.FunctionNotAsync: If 'coro_fn' is not an asynchronous function. + """ # Parse Inputs - if isinstance(coro_fn, int): + if isinstance(coro_fn, (int, AsyncLimiter)): assert runs_per_minute is None runs_per_minute = coro_fn coro_fn = None elif coro_fn is None: - if runs_per_minute is not None and not isinstance(runs_per_minute, int): + if runs_per_minute is not None and not isinstance(runs_per_minute, (int, AsyncLimiter)): raise TypeError("'runs_per_minute' must be an integer.", runs_per_minute) elif not asyncio.iscoroutinefunction(coro_fn): @@ -59,4 +77,4 @@ async def rate_limit_wrap(*args: P.args, **kwargs: P.kwargs) -> T: return rate_limit_wrap - return rate_limit_decorator if coro_fn is None else rate_limit_decorator(coro_fn) + return rate_limit_decorator if coro_fn is None else rate_limit_decorator(coro_fn) \ No newline at end of file diff --git a/a_sync/a_sync/modifiers/manager.py b/a_sync/a_sync/modifiers/manager.py index 96ed9ab0..fe73bbec 100644 --- a/a_sync/a_sync/modifiers/manager.py +++ b/a_sync/a_sync/modifiers/manager.py @@ -13,6 +13,12 @@ class ModifierManager(Dict[str, Any]): + """Manages modifiers for asynchronous and synchronous functions. + + This class is responsible for applying modifiers to functions, such as + caching, rate limiting, and semaphores for asynchronous functions. + """ + default: DefaultMode cache_type: CacheType cache_typed: bool @@ -22,18 +28,37 @@ class ModifierManager(Dict[str, Any]): semaphore: SemaphoreSpec # sync modifiers executor: Executor + """This is not applied like a typical modifier. The executor is used to run the sync function in an asynchronous context.""" + __slots__ = ("_modifiers",) def __init__(self, modifiers: ModifierKwargs) -> None: + """Initializes the ModifierManager with the given modifiers. + + Args: + modifiers: A dictionary of modifiers to be applied. + + Raises: + ValueError: If an unsupported modifier is provided. + """ for key in modifiers.keys(): if key not in valid_modifiers: raise ValueError(f"'{key}' is not a supported modifier.") self._modifiers = modifiers def __repr__(self) -> str: + """Returns a string representation of the modifiers.""" return str(self._modifiers) def __getattribute__(self, modifier_key: str) -> Any: + """Gets the value of a modifier. + + Args: + modifier_key: The key of the modifier to retrieve. + + Returns: + The value of the modifier, or the default value if not set. + """ if modifier_key not in valid_modifiers: return super().__getattribute__(modifier_key) return ( @@ -42,14 +67,17 @@ def __getattribute__(self, modifier_key: str) -> Any: @property def use_limiter(self) -> bool: + """Determines if a rate limiter should be used.""" return self.runs_per_minute != nulls.runs_per_minute @property def use_semaphore(self) -> bool: + """Determines if a semaphore should be used.""" return self.semaphore != nulls.semaphore @property def use_cache(self) -> bool: + """Determines if caching should be used.""" return any( [ self.cache_type != nulls.cache_type, @@ -60,6 +88,14 @@ def use_cache(self) -> bool: ) def apply_async_modifiers(self, coro_fn: CoroFn[P, T]) -> CoroFn[P, T]: + """Applies asynchronous modifiers to a coroutine function. + + Args: + coro_fn: The coroutine function to modify. + + Returns: + The modified coroutine function. + """ # NOTE: THESE STACK IN REVERSE ORDER if self.use_limiter: coro_fn = limiter.apply_rate_limit(coro_fn, self.runs_per_minute) @@ -76,35 +112,59 @@ def apply_async_modifiers(self, coro_fn: CoroFn[P, T]) -> CoroFn[P, T]: return coro_fn def apply_sync_modifiers(self, function: SyncFn[P, T]) -> SyncFn[P, T]: + """Wraps a synchronous function. + + Note: + There are no sync modifiers at this time, but they will be added here for convenience. + + Args: + function: The synchronous function to wrap. + + Returns: + The wrapped synchronous function. + """ @functools.wraps(function) def sync_modifier_wrap(*args: P.args, **kwargs: P.kwargs) -> T: return function(*args, **kwargs) - # NOTE There are no sync modifiers at this time but they will be added here for my convenience. return sync_modifier_wrap # Dictionary api def keys(self) -> KeysView[str]: # type: ignore [override] + """Returns the keys of the modifiers.""" return self._modifiers.keys() def values(self) -> ValuesView[Any]: # type: ignore [override] + """Returns the values of the modifiers.""" return self._modifiers.values() def items(self) -> ItemsView[str, Any]: # type: ignore [override] + """Returns the items of the modifiers.""" return self._modifiers.items() def __contains__(self, key: str) -> bool: # type: ignore [override] + """Checks if a key is in the modifiers.""" return key in self._modifiers def __iter__(self) -> Iterator[str]: + """Returns an iterator over the modifier keys.""" return self._modifiers.__iter__() def __len__(self) -> int: + """Returns the number of modifiers.""" return len(self._modifiers) def __getitem__(self, modifier_key: str): + """Gets the value of a modifier by key. + + Args: + modifier_key: The key of the modifier to retrieve. + + Returns: + The value of the modifier. + """ return self._modifiers[modifier_key] # type: ignore [literal-required] nulls = ModifierManager(null_modifiers) -user_defaults = ModifierManager(user_set_default_modifiers) +user_defaults = ModifierManager(user_set_default_modifiers) \ No newline at end of file diff --git a/a_sync/a_sync/modifiers/semaphores.py b/a_sync/a_sync/modifiers/semaphores.py index e056f623..1a9c1b99 100644 --- a/a_sync/a_sync/modifiers/semaphores.py +++ b/a_sync/a_sync/modifiers/semaphores.py @@ -9,32 +9,54 @@ # We keep this here for now so we don't break downstream deps. Eventually will be removed. from a_sync.primitives import ThreadsafeSemaphore, DummySemaphore - @overload def apply_semaphore( # type: ignore [misc] - coro_fn: Literal[None], semaphore: SemaphoreSpec, -) -> AsyncDecorator[P, T]: ... +) -> AsyncDecorator[P, T]: + """Applies a semaphore to a coroutine function. + This overload is used when the semaphore is provided as a single argument, + returning a decorator that can be applied to a coroutine function. -@overload -def apply_semaphore( - coro_fn: SemaphoreSpec, - semaphore: Literal[None], -) -> AsyncDecorator[P, T]: ... - + Args: + semaphore: The semaphore to apply, which can be an integer or an asyncio.Semaphore object. + """ @overload def apply_semaphore( coro_fn: CoroFn[P, T], semaphore: SemaphoreSpec, -) -> CoroFn[P, T]: ... +) -> CoroFn[P, T]: + """Applies a semaphore to a coroutine function. + This overload is used when both the coroutine function and semaphore are provided, + directly applying the semaphore to the coroutine function. + Args: + coro_fn: The coroutine function to which the semaphore will be applied. + semaphore: The semaphore to apply, which can be an integer or an asyncio.Semaphore object. + """ + def apply_semaphore( coro_fn: Optional[Union[CoroFn[P, T], SemaphoreSpec]] = None, semaphore: SemaphoreSpec = None, ) -> AsyncDecoratorOrCoroFn[P, T]: + """Applies a semaphore to a coroutine function or returns a decorator. + + This function can be used to apply a semaphore to a coroutine function either by + passing the coroutine function and semaphore as arguments or by using the semaphore + as a decorator. It raises exceptions if the inputs are not valid. + + Args: + coro_fn: The coroutine function to which the semaphore will be applied, or None + if the semaphore is to be used as a decorator. + semaphore: The semaphore to apply, which can be an integer or an asyncio.Semaphore object. + + Raises: + ValueError: If both coro_fn and semaphore are provided as invalid inputs. + exceptions.FunctionNotAsync: If the provided function is not a coroutine. + TypeError: If the semaphore is not an integer or an asyncio.Semaphore object. + """ # Parse Inputs if isinstance(coro_fn, (int, asyncio.Semaphore)): if semaphore is not None: @@ -72,5 +94,5 @@ async def semaphore_wrap(*args, **kwargs) -> T: return semaphore_decorator if coro_fn is None else semaphore_decorator(coro_fn) - dummy_semaphore = primitives.DummySemaphore() +"""A dummy semaphore that does not enforce any concurrency limits.""" \ No newline at end of file diff --git a/a_sync/a_sync/property.py b/a_sync/a_sync/property.py index 478bf14b..1c564afd 100644 --- a/a_sync/a_sync/property.py +++ b/a_sync/a_sync/property.py @@ -26,13 +26,34 @@ class _ASyncPropertyDescriptorBase(ASyncDescriptor[I, Tuple[()], T]): + """Base class for creating asynchronous properties. + + This class provides the foundation for defining properties that can be accessed + both synchronously and asynchronously. It includes utility methods for common + operations such as `any`, `all`, `min`, `max`, and `sum`. + """ + any: ASyncFunction[AnyIterable[I], bool] + """An ASyncFunction that checks if any result is truthy.""" + all: ASyncFunction[AnyIterable[I], bool] + """An ASyncFunction that checks if all results are truthy.""" + min: ASyncFunction[AnyIterable[I], T] + """An ASyncFunction that returns the minimum result.""" + max: ASyncFunction[AnyIterable[I], T] + """An ASyncFunction that returns the maximum result.""" + sum: ASyncFunction[AnyIterable[I], T] + """An ASyncFunction that returns the sum of results.""" + hidden_method_descriptor: "HiddenMethodDescriptor[T]" + """A descriptor for the hidden method.""" + __wrapped__: Callable[[I], T] + """The wrapped function or method.""" + __slots__ = "hidden_method_name", "hidden_method_descriptor", "_fget" def __init__( @@ -41,6 +62,13 @@ def __init__( field_name: Optional[str] = None, **modifiers: Unpack[ModifierKwargs], ) -> None: + """Initializes the _ASyncPropertyDescriptorBase. + + Args: + _fget: The function to be wrapped. + field_name: Optional name for the field. If not provided, the function's name will be used. + **modifiers: Additional modifier arguments. + """ super().__init__(_fget, field_name, **modifiers) self.hidden_method_name = f"__{self.field_name}__" hidden_modifiers = dict(self.modifiers) @@ -60,6 +88,15 @@ def __get__(self, instance: I, owner: Type[I]) -> Awaitable[T]: ... def __get__( self, instance: Optional[I], owner: Type[I] ) -> Union[Self, Awaitable[T]]: + """Retrieves the property value, either synchronously or asynchronously. + + Args: + instance: The instance from which the property is accessed. + owner: The owner class of the property. + + Returns: + The property value, either as an awaitable or directly. + """ if instance is None: return self awaitable = super().__get__(instance, owner) @@ -97,6 +134,15 @@ def __get__( return retval async def get(self, instance: I, owner: Optional[Type[I]] = None) -> T: + """Asynchronously retrieves the property value. + + Args: + instance: The instance from which the property is accessed. + owner: The owner class of the property. + + Returns: + The property value. + """ if instance is None: raise ValueError(instance) logger.debug("awaiting %s for instance %s", self, instance) @@ -109,6 +155,17 @@ def map( concurrency: Optional[int] = None, name: str = "", ) -> "TaskMapping[I, T]": + """Maps the property across multiple instances. + + Args: + instances: An iterable of instances. + owner: The owner class of the property. + concurrency: Optional concurrency limit. + name: Optional name for the task mapping. + + Returns: + A TaskMapping object. + """ from a_sync.task import TaskMapping logger.debug("mapping %s to instances: %s owner: %s", self, instances, owner) @@ -124,10 +181,11 @@ def map( class ASyncPropertyDescriptor( _ASyncPropertyDescriptorBase[I, T], ap.base.AsyncPropertyDescriptor ): - pass + """Descriptor class for asynchronous properties.""" -class property(ASyncPropertyDescriptor[I, T]): ... +class property(ASyncPropertyDescriptor[I, T]): + """Descriptor for defining properties that can be accessed both synchronously and asynchronously.""" @final @@ -151,6 +209,15 @@ def __get__(self, instance: None, owner: Type[I]) -> Self: ... @overload def __get__(self, instance: I, owner: Type[I]) -> T: ... def __get__(self, instance: Optional[I], owner: Type[I]) -> Union[Self, T]: + """Retrieves the property value, either synchronously or asynchronously. + + Args: + instance: The instance from which the property is accessed. + owner: The owner class of the property. + + Returns: + The property value, either as an awaitable or directly. + """ return _ASyncPropertyDescriptorBase.__get__(self, instance, owner) @@ -275,6 +342,15 @@ def a_sync_property( # type: ignore [misc] ASyncPropertyDecoratorSyncDefault[I, T], ASyncPropertyDecoratorAsyncDefault[I, T], ]: + """Decorator for creating properties that can be accessed both synchronously and asynchronously. + + Args: + func: The function to be wrapped. + **modifiers: Additional modifier arguments. + + Returns: + A property descriptor that supports both sync and async access. + """ func, modifiers = _parse_args(func, modifiers) if modifiers.get("default") == "sync": descriptor_class = ASyncPropertyDescriptorSyncDefault @@ -306,13 +382,33 @@ def __init__( field_name=None, **modifiers: Unpack[ModifierKwargs], ) -> None: + """Initializes the ASyncCachedPropertyDescriptor. + + Args: + _fget: The function to be wrapped. + _fset: Optional setter function for the property. + _fdel: Optional deleter function for the property. + field_name: Optional name for the field. If not provided, the function's name will be used. + **modifiers: Additional modifier arguments. + """ super().__init__(_fget, field_name, **modifiers) self._check_method_sync(_fset, "setter") - self._check_method_sync(_fdel, "deleter") self._fset = _fset + """Optional setter function for the property.""" + + self._check_method_sync(_fdel, "deleter") self._fdel = _fdel + """Optional deleter function for the property.""" def get_lock(self, instance: I) -> "asyncio.Task[T]": + """Retrieves the lock for the property. + + Args: + instance: The instance from which the property is accessed. + + Returns: + An asyncio Task representing the lock. + """ instance_state = self.get_instance_state(instance) task = instance_state.lock[self.field_name] if isinstance(task, asyncio.Lock): @@ -322,9 +418,22 @@ def get_lock(self, instance: I) -> "asyncio.Task[T]": return task def pop_lock(self, instance: I) -> None: + """Removes the lock for the property. + + Args: + instance: The instance from which the property is accessed. + """ self.get_instance_state(instance).lock.pop(self.field_name, None) def get_loader(self, instance: I) -> Callable[[], T]: + """Retrieves the loader function for the property. + + Args: + instance: The instance from which the property is accessed. + + Returns: + A callable that loads the property value. + """ @functools.wraps(self._fget) async def load_value(): inner_task = self.get_lock(instance) @@ -340,7 +449,8 @@ async def load_value(): return load_value -class cached_property(ASyncCachedPropertyDescriptor[I, T]): ... +class cached_property(ASyncCachedPropertyDescriptor[I, T]): + """Descriptor for defining cached properties that can be accessed both synchronously and asynchronously.""" @final @@ -359,6 +469,15 @@ def __get__(self, instance: None, owner: Type[I]) -> Self: ... @overload def __get__(self, instance: I, owner: Type[I]) -> T: ... def __get__(self, instance: Optional[I], owner: Type[I]) -> Union[Self, T]: + """Retrieves the cached property value, either synchronously or asynchronously. + + Args: + instance: The instance from which the property is accessed. + owner: The owner class of the property. + + Returns: + The cached property value, either as an awaitable or directly. + """ return _ASyncPropertyDescriptorBase.__get__(self, instance, owner) @@ -472,10 +591,19 @@ def a_sync_cached_property( # type: ignore [misc] ASyncCachedPropertyDecoratorSyncDefault[I, T], ASyncCachedPropertyDecoratorAsyncDefault[I, T], ]: + """Decorator for creating cached properties that can be accessed both synchronously and asynchronously. + + Args: + func: The function to be wrapped. + **modifiers: Additional modifier arguments. + + Returns: + A cached property descriptor that supports both sync and async access. + """ func, modifiers = _parse_args(func, modifiers) if modifiers.get("default") == "sync": descriptor_class = ASyncCachedPropertyDescriptorSyncDefault - elif modifiers.get("default") == "sync": + elif modifiers.get("default") == "async": descriptor_class = ASyncCachedPropertyDescriptorAsyncDefault else: descriptor_class = ASyncCachedPropertyDescriptor @@ -485,6 +613,12 @@ def a_sync_cached_property( # type: ignore [misc] @final class HiddenMethod(ASyncBoundMethodAsyncDefault[I, Tuple[()], T]): + """Represents a hidden method for asynchronous properties. + + This class is used internally to manage hidden methods associated with + asynchronous properties. + """ + def __init__( self, instance: I, @@ -493,25 +627,51 @@ def __init__( field_name: str, **modifiers: Unpack[ModifierKwargs], ) -> None: + """Initializes the HiddenMethod. + + Args: + instance: The instance to which the method is bound. + unbound: The unbound function to be wrapped. + async_def: Indicates if the method is asynchronous. + field_name: The name of the field associated with the method. + **modifiers: Additional modifier arguments. + """ super().__init__(instance, unbound, async_def, **modifiers) self.__name__ = field_name + """The name of the hidden method.""" def __repr__(self) -> str: + """Returns a string representation of the HiddenMethod.""" instance_type = type(self.__self__) return f"<{self.__class__.__name__} for property {instance_type.__module__}.{instance_type.__name__}.{self.__name__[2:-2]} bound to {self.__self__}>" def _should_await(self, kwargs: dict) -> bool: + """Determines if the method should be awaited. + + Args: + kwargs: The keyword arguments passed to the method. + + Returns: + A boolean indicating if the method should be awaited. + """ try: return self.__self__.__a_sync_should_await_from_kwargs__(kwargs) except (AttributeError, exceptions.NoFlagsFound): return False def __await__(self) -> Generator[Any, None, T]: + """Returns an awaitable for the method.""" return self(sync=False).__await__() @final class HiddenMethodDescriptor(ASyncMethodDescriptorAsyncDefault[I, Tuple[()], T]): + """Descriptor for hidden methods associated with asynchronous properties. + + This class is used internally to manage hidden methods associated with + asynchronous properties. + """ + def __init__( self, _fget: AnyFn[Concatenate[I, P], Awaitable[T]], @@ -538,6 +698,15 @@ def __init__( self.__doc__ += f"\n\nThe original docstring for :meth:`~{self.__wrapped__.__qualname__}` is shown below:\n\n{self.__wrapped__.__doc__}" def __get__(self, instance: I, owner: Type[I]) -> HiddenMethod[I, T]: + """Retrieves the hidden method for the property. + + Args: + instance: The instance from which the method is accessed. + owner: The owner class of the method. + + Returns: + The hidden method. + """ if instance is None: return self try: @@ -558,6 +727,14 @@ def __get__(self, instance: I, owner: Type[I]) -> HiddenMethod[I, T]: def _is_a_sync_instance(instance: object) -> bool: + """Checks if an instance is an ASync instance. + + Args: + instance: The instance to check. + + Returns: + A boolean indicating if the instance is an ASync instance. + """ try: return instance.__is_a_sync_instance__ # type: ignore [attr-defined] except AttributeError: @@ -571,7 +748,16 @@ def _is_a_sync_instance(instance: object) -> bool: def _parse_args( func: Union[None, DefaultMode, AsyncGetterFunction[I, T]], modifiers: ModifierKwargs ) -> Tuple[Optional[AsyncGetterFunction[I, T]], ModifierKwargs]: + """Parses the arguments for the property decorators. + + Args: + func: The function to be wrapped. + modifiers: Additional modifier arguments. + + Returns: + A tuple containing the parsed function and modifiers. + """ if func in ["sync", "async"]: modifiers["default"] = func func = None - return func, modifiers + return func, modifiers \ No newline at end of file diff --git a/a_sync/asyncio/__init__.py b/a_sync/asyncio/__init__.py index afec4f7c..07a976aa 100644 --- a/a_sync/asyncio/__init__.py +++ b/a_sync/asyncio/__init__.py @@ -1,5 +1,8 @@ """ -This package contains buffed versions of the objects found in the builtin `asyncio` package. +This package provides custom utilities and extensions to the builtin `asyncio` package. + +These utilities include enhanced versions of common asyncio functions, offering additional +features and improved functionality for asynchronous programming. """ from a_sync.asyncio.as_completed import as_completed @@ -7,4 +10,4 @@ from a_sync.asyncio.gather import gather from a_sync.asyncio.utils import get_event_loop -__all__ = ["create_task", "gather", "as_completed", "get_event_loop"] +__all__ = ["create_task", "gather", "as_completed", "get_event_loop"] \ No newline at end of file diff --git a/a_sync/asyncio/as_completed.py b/a_sync/asyncio/as_completed.py index 1bb83d6d..5dc8f2f6 100644 --- a/a_sync/asyncio/as_completed.py +++ b/a_sync/asyncio/as_completed.py @@ -78,16 +78,13 @@ def as_completed( - Provides progress reporting using tqdm if 'tqdm' is set to True. Args: - fs (Iterable[Awaitable[T] or Mapping[K, Awaitable[V]]]): The awaitables to await concurrently. It can be a list of individual awaitables or a mapping of awaitables. - timeout (float, optional): The maximum time, in seconds, to wait for the completion of awaitables. Defaults to None (no timeout). - return_exceptions (bool, optional): If True, exceptions are returned as results instead of raising them. Defaults to False. - aiter (bool, optional): If True, returns an async iterator of results. Defaults to False. - tqdm (bool, optional): If True, enables progress reporting using tqdm. Defaults to False. + fs: The awaitables to await concurrently. It can be a list of individual awaitables or a mapping of awaitables. + timeout: The maximum time, in seconds, to wait for the completion of awaitables. Defaults to None (no timeout). + return_exceptions: If True, exceptions are returned as results instead of raising them. Defaults to False. Note: This parameter is not currently implemented in the function logic. + aiter: If True, returns an async iterator of results. Defaults to False. + tqdm: If True, enables progress reporting using tqdm. Defaults to False. **tqdm_kwargs: Additional keyword arguments for tqdm if progress reporting is enabled. - Returns: - Iterator[Coroutine[Any, Any, T] or ASyncIterator[Tuple[K, V]]]: An iterator of results when awaiting individual awaitables or an async iterator when awaiting mappings. - Examples: Awaiting individual awaitables: ``` @@ -171,16 +168,13 @@ def as_completed_mapping( This function is designed to await a mapping of awaitable objects, where each key-value pair represents a unique awaitable. It enables concurrent execution and gathers results into an iterator or an async iterator. Args: - mapping (Mapping[K, Awaitable[V]]): A dictionary-like object where keys are of type K and values are awaitable objects of type V. - timeout (float, optional): The maximum time, in seconds, to wait for the completion of awaitables. Defaults to None (no timeout). - return_exceptions (bool, optional): If True, exceptions are returned as results instead of raising them. Defaults to False. - aiter (bool, optional): If True, returns an async iterator of results. Defaults to False. - tqdm (bool, optional): If True, enables progress reporting using tqdm. Defaults to False. + mapping: A dictionary-like object where keys are of type K and values are awaitable objects of type V. + timeout: The maximum time, in seconds, to wait for the completion of awaitables. Defaults to None (no timeout). + return_exceptions: If True, exceptions are returned as results instead of raising them. Defaults to False. + aiter: If True, returns an async iterator of results. Defaults to False. + tqdm: If True, enables progress reporting using tqdm. Defaults to False. **tqdm_kwargs: Additional keyword arguments for tqdm if progress reporting is enabled. - Returns: - Union[Iterator[Coroutine[Any, Any, Tuple[K, V]]] or ASyncIterator[Tuple[K, V]]]: An iterator of results or an async iterator when awaiting mappings. - Example: ``` mapping = {'key1': async_function1(), 'key2': async_function2()} @@ -206,6 +200,14 @@ def as_completed_mapping( async def _exc_wrap(awaitable: Awaitable[T]) -> Union[T, Exception]: + """Wraps an awaitable to catch exceptions and return them instead of raising. + + Args: + awaitable: The awaitable to wrap. + + Returns: + The result of the awaitable or the exception if one is raised. + """ try: return await awaitable except Exception as e: @@ -220,6 +222,15 @@ async def __yield_as_completed( tqdm: bool = False, **tqdm_kwargs: Any ) -> AsyncIterator[T]: + """Yields results from awaitables as they complete. + + Args: + futs: The awaitables to await. + timeout: The maximum time, in seconds, to wait for the completion of awaitables. Defaults to None (no timeout). + return_exceptions: If True, exceptions are returned as results instead of raising them. Defaults to False. + tqdm: If True, enables progress reporting using tqdm. Defaults to False. + **tqdm_kwargs: Additional keyword arguments for tqdm if progress reporting is enabled. + """ for fut in as_completed( futs, timeout=timeout, @@ -241,6 +252,16 @@ async def __mapping_wrap( async def __mapping_wrap( k: K, v: Awaitable[V], return_exceptions: bool = False ) -> Union[V, Exception]: + """Wraps a key-value pair of awaitable to catch exceptions and return them with the key. + + Args: + k: The key associated with the awaitable. + v: The awaitable to wrap. + return_exceptions: If True, exceptions are returned as results instead of raising them. Defaults to False. + + Returns: + A tuple of the key and the result of the awaitable or the exception if one is raised. + """ try: return k, await v except Exception as e: @@ -249,4 +270,4 @@ async def __mapping_wrap( raise -__all__ = ["as_completed", "as_completed_mapping"] +__all__ = ["as_completed", "as_completed_mapping"] \ No newline at end of file diff --git a/a_sync/asyncio/create_task.py b/a_sync/asyncio/create_task.py index 1f7fe57f..6209d660 100644 --- a/a_sync/asyncio/create_task.py +++ b/a_sync/asyncio/create_task.py @@ -23,14 +23,15 @@ def create_task( """ Extends asyncio.create_task to support any Awaitable, manage task lifecycle, and enhance error handling. - Unlike asyncio.create_task, which requires a coroutine, this function accepts any Awaitable, ensuring broader - compatibility. It optionally prevents the task from being garbage-collected until completion and provides - enhanced error management by wrapping exceptions in a custom exception. + This function accepts any Awaitable, ensuring broader compatibility. If the Awaitable is not a coroutine, + it attempts to convert it to one. It optionally prevents the task from being garbage-collected until completion + and provides enhanced error management by wrapping exceptions in a custom exception when skip_gc_until_done is True. Args: - coro: An Awaitable object from which to create the task. + coro: An Awaitable object from which to create the task. If not a coroutine, it will be converted. name: Optional name for the task, aiding in debugging. skip_gc_until_done: If True, the task is kept alive until it completes, preventing garbage collection. + Exceptions are wrapped in PersistedTaskException for special handling. log_destroy_pending: If False, asyncio's default error log when a pending task is destroyed is suppressed. Returns: @@ -53,7 +54,14 @@ def create_task( async def __await(awaitable: Awaitable[T]) -> T: - """Wait for the completion of an Awaitable.""" + """Wait for the completion of an Awaitable. + + Args: + awaitable: The Awaitable object to wait for. + + Raises: + RuntimeError: If a RuntimeError occurs during the await, it is raised with additional context. + """ try: return await awaitable except RuntimeError as e: @@ -64,7 +72,12 @@ async def __await(awaitable: Awaitable[T]) -> T: def __prune_persisted_tasks(): - """Remove completed tasks from the set of persisted tasks.""" + """Remove completed tasks from the set of persisted tasks. + + This function checks each task in the persisted tasks set. If a task is done and has an exception, + it logs the exception and raises it if it's not a PersistedTaskException. It also logs the traceback + manually since the usual handler will not run after retrieving the exception. + """ for task in tuple(__persisted_tasks): if task.done() and (e := task.exception()): # force exceptions related to this lib to bubble up @@ -91,9 +104,6 @@ async def __persisted_task_exc_wrap(task: "asyncio.Task[T]") -> T: Args: task: The asyncio Task to wrap. - Returns: - The result of the task, if successful. - Raises: PersistedTaskException: Wraps any exception raised by the task for special handling. """ @@ -103,4 +113,4 @@ async def __persisted_task_exc_wrap(task: "asyncio.Task[T]") -> T: raise exceptions.PersistedTaskException(e, task) from e -__all__ = ["create_task"] +__all__ = ["create_task"] \ No newline at end of file diff --git a/a_sync/asyncio/gather.py b/a_sync/asyncio/gather.py index db53b0c5..b4a9fe3d 100644 --- a/a_sync/asyncio/gather.py +++ b/a_sync/asyncio/gather.py @@ -47,45 +47,40 @@ async def gather( **tqdm_kwargs: Any, ) -> Union[List[T], Dict[K, V]]: """ - Concurrently awaits a list of awaitable objects or mappings of awaitables and returns the results. + Concurrently awaits a list of awaitable objects or a mapping of awaitables and returns the results. - This function extends Python's asyncio.gather, providing additional features for mixed use cases of individual awaitable objects and mappings of awaitables. + This function extends Python's asyncio.gather, providing additional features for handling either individual awaitable objects or a mapping of awaitables. Differences from asyncio.gather: - Uses type hints for use with static type checkers. - Supports gathering either individual awaitables or a k:v mapping of awaitables. - Provides progress reporting using tqdm if 'tqdm' is set to True. + - Allows exclusion of results based on a condition using the 'exclude_if' parameter. Args: - *awaitables: The awaitables to await concurrently. It can be a single awaitable or a mapping of awaitables. - return_exceptions (optional): If True, exceptions are returned as results instead of raising them. Defaults to False. - tqdm (optional): If True, enables progress reporting using tqdm. Defaults to False. + *awaitables: The awaitables to await concurrently. It can be a list of individual awaitables or a mapping of awaitables. + return_exceptions: If True, exceptions are returned as results instead of raising them. Defaults to False. + exclude_if: A callable that takes a result and returns True if the result should be excluded from the final output. Defaults to None. + tqdm: If True, enables progress reporting using tqdm. Defaults to False. **tqdm_kwargs: Additional keyword arguments for tqdm if progress reporting is enabled. - Returns: - A list of results when awaiting individual awaitables or a dictionary of results when awaiting mappings. - Examples: Awaiting individual awaitables: - Results will be a list containing the result of each awaitable in sequential order. - ``` >>> results = await gather(thing1(), thing2()) >>> results ['result', 123] - ``` - Awaiting mappings of awaitables: + Awaiting a mapping of awaitables: - Results will be a dictionary with 'key1' mapped to the result of thing1() and 'key2' mapped to the result of thing2. - ``` >>> mapping = {'key1': thing1(), 'key2': thing2()} >>> results = await gather(mapping) >>> results {'key1': 'result', 'key2': 123} - ``` """ is_mapping = _is_mapping(awaitables) results = await ( @@ -129,21 +124,18 @@ async def gather_mapping( Args: mapping: A dictionary-like object where keys are of type K and values are awaitable objects of type V. - return_exceptions (optional): If True, exceptions are returned as results instead of raising them. Defaults to False. - tqdm (optional): If True, enables progress reporting using tqdm. Defaults to False. + return_exceptions: If True, exceptions are returned as results instead of raising them. Defaults to False. + exclude_if: A callable that takes a result and returns True if the result should be excluded from the final output. Defaults to None. + tqdm: If True, enables progress reporting using tqdm. Defaults to False. **tqdm_kwargs: Additional keyword arguments for tqdm if progress reporting is enabled. - Returns: - A dictionary with keys corresponding to the keys of the input mapping and values containing the results of the corresponding awaitables. - Example: The 'results' dictionary will contain the awaited results, where keys match the keys in the 'mapping' and values contain the results of the corresponding awaitables. - ``` + >>> mapping = {'task1': async_function1(), 'task2': async_function2(), 'task3': async_function3()} >>> results = await gather_mapping(mapping) >>> results {'task1': "result", 'task2': 123, 'task3': None} - ``` """ results = { k: v @@ -164,4 +156,4 @@ async def gather_mapping( awaitables[0], Mapping ) -__all__ = ["gather", "gather_mapping"] +__all__ = ["gather", "gather_mapping"] \ No newline at end of file diff --git a/a_sync/exceptions.py b/a_sync/exceptions.py index b2df5d21..0074fdc0 100644 --- a/a_sync/exceptions.py +++ b/a_sync/exceptions.py @@ -16,15 +16,25 @@ class ASyncFlagException(ValueError): Base exception class for flag-related errors in the a_sync library. """ - @property - def viable_flags(self) -> Set[str]: - """ - Returns the set of viable flags. - """ - return VIABLE_FLAGS + viable_flags = VIABLE_FLAGS + """ + The set of viable flags. + + A-Sync uses 'flags' to indicate whether objects / fn calls will be sync or async. + You can use any of the provided flags, whichever makes most sense for your use case. + """ def desc(self, target) -> str: - if target == "kwargs": + """ + Returns a description of the target for the flag error message. + + Args: + target: The target object or string to describe. + + Returns: + A string description of the target. + """ + if target == 'kwargs': return "flags present in 'kwargs'" else: return f"flag attributes defined on {target}" @@ -160,6 +170,9 @@ def __init__(self, fn): class ASyncRuntimeError(RuntimeError): + """ + Raised for runtime errors in asynchronous operations. + """ def __init__(self, e: RuntimeError): """ Initializes the ASyncRuntimeError exception. @@ -225,7 +238,17 @@ class MappingNotEmptyError(MappingError): class PersistedTaskException(Exception): + """ + Raised when an exception persists in an asyncio Task. + """ def __init__(self, exc: E, task: asyncio.Task) -> None: + """ + Initializes the PersistedTaskException exception. + + Args: + exc: The exception that persisted. + task: The asyncio Task where the exception occurred. + """ super().__init__(f"{exc.__class__.__name__}: {exc}", task) self.exception = exc self.task = task @@ -233,5 +256,5 @@ def __init__(self, exc: E, task: asyncio.Task) -> None: class EmptySequenceError(ValueError): """ - Raised when an operation is attempted on an empty sequence but items are expected. + Raised when an operation is attempted on an empty sequence but items are required. """ diff --git a/a_sync/executor.py b/a_sync/executor.py index 0e85587b..d579d0ca 100644 --- a/a_sync/executor.py +++ b/a_sync/executor.py @@ -1,13 +1,13 @@ """ With these executors, you can simply run sync functions in your executor with `await executor.run(fn, *args)`. -`executor.submit(fn, *args)` will work the same as the concurrent.futures implementation, but will return an asyncio.Future instead of a concurrent.futures.Future +`executor.submit(fn, *args)` will work the same as the concurrent.futures implementation, but will return an asyncio.Future instead of a concurrent.futures.Future. This module provides several executor classes: -- _AsyncExecutorMixin: A mixin providing asynchronous run and submit methods. +- _AsyncExecutorMixin: A mixin providing asynchronous run and submit methods, with support for synchronous mode. - AsyncProcessPoolExecutor: An async process pool executor. - AsyncThreadPoolExecutor: An async thread pool executor. -- PruningThreadPoolExecutor: A thread pool executor that prunes inactive threads after a timeout. +- PruningThreadPoolExecutor: A thread pool executor that prunes inactive threads after a timeout, ensuring at least one thread remains active. """ import asyncio @@ -34,23 +34,23 @@ class _AsyncExecutorMixin(cf.Executor, _DebugDaemonMixin): """ _max_workers: int + _workers: str + """The type of workers used.""" + __slots__ = "_max_workers", "_initializer", "_initargs", "_broken", "_shutdown_lock" - async def run(self, fn: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T: + async def run(self, fn: Callable[P, T], *args: P.args, **kwargs: P.kwargs): """ - A shorthand way to call `await asyncio.get_event_loop().run_in_executor(this_executor, fn, *args)` + A shorthand way to call `await asyncio.get_event_loop().run_in_executor(this_executor, fn, *args)`. Doesn't `await this_executor.run(fn, *args)` look so much better? Oh, and you can also use kwargs! Args: - fn (Callable[P, T]): The function to run. + fn: The function to run. *args: Positional arguments for the function. **kwargs: Keyword arguments for the function. - - Returns: - T: The result of the function. """ return ( fn(*args, **kwargs) @@ -63,12 +63,9 @@ def submit(self, fn: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> "asyn Submits a job to the executor and returns an asyncio.Future that can be awaited for the result without blocking. Args: - fn (Callable[P, T]): The function to submit. + fn: The function to submit. *args: Positional arguments for the function. **kwargs: Keyword arguments for the function. - - Returns: - asyncio.Future[T]: The future representing the result of the function. """ if self.sync_mode: fut = asyncio.get_event_loop().create_future() @@ -92,9 +89,6 @@ def __len__(self) -> int: def sync_mode(self) -> bool: """ Indicates if the executor is in synchronous mode (max_workers == 0). - - Returns: - bool: True if in synchronous mode, False otherwise. """ return self._max_workers == 0 @@ -102,9 +96,6 @@ def sync_mode(self) -> bool: def worker_count_current(self) -> int: """ Returns the current number of workers. - - Returns: - int: The current number of workers. """ return len(getattr(self, f"_{self._workers}")) @@ -113,7 +104,7 @@ async def _debug_daemon(self, fut: asyncio.Future, fn, *args, **kwargs) -> None: Runs until manually cancelled by the finished work item. Args: - fut (asyncio.Future): The future being debugged. + fut: The future being debugged. fn: The function being executed. *args: Positional arguments for the function. **kwargs: Keyword arguments for the function. @@ -144,9 +135,14 @@ async def _debug_daemon(self, fut: asyncio.Future, fn, *args, **kwargs) -> None: class AsyncProcessPoolExecutor(_AsyncExecutorMixin, cf.ProcessPoolExecutor): """ An async process pool executor that allows use of kwargs. + + Attributes: + _workers: """ _workers = "processes" + """The type of workers used, set to "processes".""" + __slots__ = ( "_mp_context", "_processes", @@ -171,10 +167,10 @@ def __init__( Initializes the AsyncProcessPoolExecutor. Args: - max_workers (Optional[int], optional): The maximum number of workers. Defaults to None. - mp_context (Optional[multiprocessing.context.BaseContext], optional): The multiprocessing context. Defaults to None. - initializer (Optional[Initializer], optional): An initializer callable. Defaults to None. - initargs (Tuple[Any, ...], optional): Arguments for the initializer. Defaults to (). + max_workers: The maximum number of workers. Defaults to None. + mp_context: The multiprocessing context. Defaults to None. + initializer: An initializer callable. Defaults to None. + initargs: Arguments for the initializer. Defaults to (). """ if max_workers == 0: super().__init__(1, mp_context, initializer, initargs) @@ -192,6 +188,8 @@ class AsyncThreadPoolExecutor(_AsyncExecutorMixin, cf.ThreadPoolExecutor): """ _workers = "threads" + """The type of workers used, set to "threads".""" + __slots__ = ( "_work_queue", "_idle_semaphore", @@ -211,10 +209,10 @@ def __init__( Initializes the AsyncThreadPoolExecutor. Args: - max_workers (Optional[int], optional): The maximum number of workers. Defaults to None. - thread_name_prefix (str, optional): Prefix for thread names. Defaults to ''. - initializer (Optional[Initializer], optional): An initializer callable. Defaults to None. - initargs (Tuple[Any, ...], optional): Arguments for the initializer. Defaults to (). + max_workers: The maximum number of workers. Defaults to None. + thread_name_prefix: Prefix for thread names. Defaults to ''. + initializer: An initializer callable. Defaults to None. + initargs: Arguments for the initializer. Defaults to (). """ if max_workers == 0: super().__init__(1, thread_name_prefix, initializer, initargs) @@ -306,6 +304,7 @@ class PruningThreadPoolExecutor(AsyncThreadPoolExecutor): """ This `AsyncThreadPoolExecutor` implementation prunes inactive threads after 'timeout' seconds without a work item. Pruned threads will be automatically recreated as needed for future workloads. Up to 'max_threads' can be active at any one time. + A minimum of one thread will remain active to prevent locks. """ __slots__ = "_timeout", "_adjusting_lock" @@ -322,14 +321,19 @@ def __init__( Initializes the PruningThreadPoolExecutor. Args: - max_workers (Optional[int], optional): The maximum number of workers. Defaults to None. - thread_name_prefix (str, optional): Prefix for thread names. Defaults to ''. - initializer (Optional[Initializer], optional): An initializer callable. Defaults to None. - initargs (Tuple[Any, ...], optional): Arguments for the initializer. Defaults to (). - timeout (int, optional): Timeout duration for pruning inactive threads. Defaults to TEN_MINUTES. + max_workers: The maximum number of workers. Defaults to None. + thread_name_prefix: Prefix for thread names. Defaults to ''. + initializer: An initializer callable. Defaults to None. + initargs: Arguments for the initializer. Defaults to (). + timeout: Timeout duration for pruning inactive threads. Defaults to TEN_MINUTES. """ + self._timeout = timeout + """Timeout duration for pruning inactive threads.""" + self._adjusting_lock = threading.Lock() + """Lock used to adjust the number of threads.""" + super().__init__(max_workers, thread_name_prefix, initializer, initargs) def __len__(self) -> int: @@ -375,4 +379,4 @@ def weakref_cb(_, q=self._work_queue): "AsyncThreadPoolExecutor", "AsyncProcessPoolExecutor", "PruningThreadPoolExecutor", -] +] \ No newline at end of file diff --git a/a_sync/future.py b/a_sync/future.py index 70259f6c..1ee3c07a 100644 --- a/a_sync/future.py +++ b/a_sync/future.py @@ -1,5 +1,26 @@ # type: ignore [var-annotated] +""" +The future.py module provides functionality for handling asynchronous futures, +including a decorator for converting callables into ASyncFuture objects and +utilities for managing asynchronous computations. + +Functions: + future(callable: Union[Callable[P, Awaitable[T]], Callable[P, T]] = None, **kwargs: Unpack[ModifierKwargs]) -> Callable[P, Union[T, "ASyncFuture[T]"]]: + A decorator to convert a callable into an ASyncFuture, with optional modifiers. + _gather_check_and_materialize(*things: Unpack[MaybeAwaitable[T]]) -> List[T]: + Gathers and materializes a list of awaitable or non-awaitable items. + _check_and_materialize(thing: T) -> T: + Checks if an item is awaitable and materializes it. + _materialize(meta: "ASyncFuture[T]") -> T: + Materializes the result of an ASyncFuture. + +Classes: + ASyncFuture: Represents an asynchronous future result. + _ASyncFutureWrappedFn: A callable class to wrap functions and return ASyncFuture objects. + _ASyncFutureInstanceMethod: A class to handle instance methods wrapped as ASyncFuture. +""" + import asyncio import concurrent.futures from functools import partial, wraps @@ -12,18 +33,46 @@ def future( callable: AnyFn[P, T] = None, **kwargs: Unpack[ModifierKwargs], ) -> Callable[P, Union[T, "ASyncFuture[T]"]]: + """ + A decorator function to convert a callable into an ASyncFuture, with optional modifiers. + + Args: + callable: The callable to convert. Defaults to None. + **kwargs: Additional keyword arguments for the modifier. + """ return _ASyncFutureWrappedFn(callable, **kwargs) async def _gather_check_and_materialize(*things: Unpack[MaybeAwaitable[T]]) -> List[T]: + """ + Gathers and materializes a list of awaitable or non-awaitable items. + + Args: + *things: Items to gather and materialize. + """ return await asyncio.gather(*[_check_and_materialize(thing) for thing in things]) async def _check_and_materialize(thing: T) -> T: + """ + Checks if an item is awaitable and materializes it. + + Args: + thing: The item to check and materialize. + """ return await thing if isawaitable(thing) else thing def _materialize(meta: "ASyncFuture[T]") -> T: + """ + Materializes the result of an ASyncFuture. + + Args: + meta: The ASyncFuture to materialize. + + Raises: + RuntimeError: If the event loop is running and the result cannot be awaited. + """ try: return asyncio.get_event_loop().run_until_complete(meta) except RuntimeError as e: @@ -38,18 +87,33 @@ def _materialize(meta: "ASyncFuture[T]") -> T: class ASyncFuture(concurrent.futures.Future, Awaitable[T]): + """ + A class representing an asynchronous future result. + + Inherits from both concurrent.futures.Future and Awaitable[T], allowing it to be used in both synchronous and asynchronous contexts. + """ + __slots__ = "__awaitable__", "__dependencies", "__dependants", "__task" - def __init__( - self, awaitable: Awaitable[T], dependencies: List["ASyncFuture"] = [] - ) -> None: + def __init__(self, awaitable: Awaitable[T], dependencies: List["ASyncFuture"] = []) -> None: + """ + Initializes an ASyncFuture with an awaitable and optional dependencies. + + Args: + awaitable: The awaitable object. + dependencies: A list of dependencies. Defaults to []. + """ self.__awaitable__ = awaitable + """The awaitable object.""" self.__dependencies = dependencies + """A list of dependencies.""" for dependency in dependencies: assert isinstance(dependency, ASyncFuture) dependency.__dependants.append(self) self.__dependants: List[ASyncFuture] = [] + """A list of dependants.""" self.__task = None + """The task associated with the awaitable.""" super().__init__() def __hash__(self) -> int: @@ -68,6 +132,12 @@ def __repr__(self) -> str: return string + ">" def __list_dependencies(self, other) -> List["ASyncFuture"]: + """ + Lists dependencies for the ASyncFuture. + + Args: + other: The other dependency to list. + """ if isinstance(other, ASyncFuture): return [self, other] return [self] @@ -76,8 +146,8 @@ def __list_dependencies(self, other) -> List["ASyncFuture"]: def result(self) -> Union[Callable[[], T], Any]: """ If this future is not done, it will work like cf.Future.result. It will block, await the awaitable, and return the result when ready. - If this future is done and the result has attribute `results`, will return `getattr(future_result, 'result')` - If this future is done and the result does NOT have attribute `results`, will again work like cf.Future.result + If this future is done and the result has attribute `result`, will return `getattr(future_result, 'result')` + If this future is done and the result does NOT have attribute `result`, will again work like cf.Future.result """ if self.done(): if hasattr(r := super().result(), "result"): @@ -106,6 +176,9 @@ def __contains__(self, key: Any) -> bool: ) def __await__(self) -> Generator[Any, None, T]: + """ + Makes the ASyncFuture awaitable. + """ return self.__await().__await__() async def __await(self) -> T: @@ -115,6 +188,9 @@ async def __await(self) -> T: @property def __task__(self) -> "asyncio.Task[T]": + """ + Returns the asyncio task associated with the awaitable, creating it if necessary. + """ if self.__task is None: self.__task = asyncio.create_task(self.__awaitable__) return self.__task @@ -741,6 +817,9 @@ def __float__(self) -> float: @property def __dependants__(self) -> Set["ASyncFuture"]: + """ + Returns the set of dependants for this ASyncFuture, including nested dependants. + """ dependants = set() for dep in self.__dependants: dependants.add(dep) @@ -749,6 +828,9 @@ def __dependants__(self) -> Set["ASyncFuture"]: @property def __dependencies__(self) -> Set["ASyncFuture"]: + """ + Returns the set of dependencies for this ASyncFuture, including nested dependencies. + """ dependencies = set() for dep in self.__dependencies: dependencies.add(dep) @@ -767,6 +849,10 @@ def __sizeof__(self) -> int: @final class _ASyncFutureWrappedFn(Callable[P, ASyncFuture[T]]): + """ + A callable class to wrap functions and return ASyncFuture objects. + """ + __slots__ = "callable", "wrapped", "_callable_name" def __init__( @@ -778,7 +864,11 @@ def __init__( if callable: self.callable = callable + """The callable function.""" + self._callable_name = callable.__name__ + """The name of the callable function.""" + a_sync_callable = a_sync(callable, default="async", **kwargs) @wraps(callable) @@ -786,6 +876,7 @@ def future_wrap(*args: P.args, **kwargs: P.kwargs) -> "ASyncFuture[T]": return ASyncFuture(a_sync_callable(*args, **kwargs, sync=False)) self.wrapped = future_wrap + """The wrapped function returning ASyncFuture.""" else: self.wrapped = partial(_ASyncFutureWrappedFn, **kwargs) @@ -807,6 +898,31 @@ def __get__( @final class _ASyncFutureInstanceMethod(Generic[I, P, T]): # NOTE: probably could just replace this with functools.partial + """ + A class to handle instance methods wrapped as ASyncFuture. + """ + + __module__: str + """The module name of the wrapper.""" + + __name__: str + """The name of the wrapper.""" + + __qualname__: str + """The qualified name of the wrapper.""" + + __doc__: Optional[str] + """The docstring of the wrapper.""" + + __annotations__: Dict[str, Any] + """The annotations of the wrapper.""" + + __instance: I + """The instance to which the method is bound.""" + + __wrapper: _ASyncFutureWrappedFn[P, T] + """The wrapper function.""" + def __init__( self, wrapper: _ASyncFutureWrappedFn[P, T], @@ -838,7 +954,6 @@ def __init__( pass self.__instance = instance self.__wrapper = wrapper - def __repr__(self) -> str: return f"<{self.__class__.__name__} for {self.__wrapper.callable} bound to {self.__instance}>" @@ -846,4 +961,4 @@ def __call__(self, /, *fn_args: P.args, **fn_kwargs: P.kwargs) -> T: return self.__wrapper(self.__instance, *fn_args, **fn_kwargs) -__all__ = ["future", "ASyncFuture"] +__all__ = ["future", "ASyncFuture"] \ No newline at end of file diff --git a/a_sync/iter.py b/a_sync/iter.py index bf1e0c9a..741dbdee 100644 --- a/a_sync/iter.py +++ b/a_sync/iter.py @@ -30,17 +30,15 @@ class _AwaitableAsyncIterableMixin(AsyncIterable[T]): Example: You must subclass this mixin class and define your own `__aiter__` method as shown below. - ``` + >>> class MyAwaitableAIterable(_AwaitableAsyncIterableMixin): - ... def __aiter__(self): + ... async def __aiter__(self): ... for i in range(4): ... yield i - ... + >>> aiterable = MyAwaitableAIterable() >>> await aiterable - [0, 1, 2, 3, 4] - - ``` + [0, 1, 2, 3] """ __wrapped__: AsyncIterable[T] @@ -139,7 +137,12 @@ def wrap(cls, wrapped: AsyncIterable[T]) -> "ASyncIterable[T]": return cls(wrapped) def __init__(self, async_iterable: AsyncIterable[T]): - "Initializes the ASyncIterable with an async iterable." + """ + Initializes the ASyncIterable with an async iterable. + + Args: + async_iterable: The async iterable to wrap. + """ if not isinstance(async_iterable, AsyncIterable): raise TypeError( f"`async_iterable` must be an AsyncIterable. You passed {async_iterable}" @@ -197,11 +200,23 @@ def __next__(self) -> T: raise @overload - def wrap(cls, aiterator: AsyncIterator[T]) -> "ASyncIterator[T]": ... + def wrap(cls, aiterator: AsyncIterator[T]) -> "ASyncIterator[T]": + """ + Wraps an AsyncIterator in an ASyncIterator. + + Args: + aiterator: The AsyncIterator to wrap. + """ @overload def wrap( cls, async_gen_func: AsyncGenFunc[P, T] - ) -> "ASyncGeneratorFunction[P, T]": ... + ) -> "ASyncGeneratorFunction[P, T]": + """ + Wraps an async generator function in an ASyncGeneratorFunction. + + Args: + async_gen_func: The async generator function to wrap. + """ @classmethod def wrap(cls, wrapped): "Class method to wrap either an AsyncIterator or an async generator function." @@ -217,7 +232,12 @@ def wrap(cls, wrapped): ) def __init__(self, async_iterator: AsyncIterator[T]): - "Initializes the ASyncIterator with an async iterator." + """ + Initializes the ASyncIterator with an async iterator. + + Args: + async_iterator: The async iterator to wrap. + """ if not isinstance(async_iterator, AsyncIterator): raise TypeError( f"`async_iterator` must be an AsyncIterator. You passed {async_iterator}" @@ -415,9 +435,6 @@ def _key_if_no_key(obj: T) -> T: Args: obj: The object to return. - - Returns: - The object itself. """ return obj diff --git a/a_sync/primitives/__init__.py b/a_sync/primitives/__init__.py index 58dae18a..76cb820e 100644 --- a/a_sync/primitives/__init__.py +++ b/a_sync/primitives/__init__.py @@ -1,5 +1,18 @@ """ -While not the focus of this lib, this module includes some new primitives and some modified versions of standard asyncio primitives. +This module includes both new primitives and modified versions of standard asyncio primitives. + +The primitives provided in this module are: + +- Semaphore +- ThreadsafeSemaphore +- PrioritySemaphore +- CounterLock +- Event +- Queue +- ProcessingQueue +- SmartProcessingQueue + +These primitives extend or modify the functionality of standard asyncio primitives to provide additional features or improved performance for specific use cases. """ from a_sync.primitives.locks import * @@ -14,4 +27,4 @@ "Queue", "ProcessingQueue", "SmartProcessingQueue", -] +] \ No newline at end of file diff --git a/a_sync/primitives/_debug.py b/a_sync/primitives/_debug.py index 64be655c..a899c210 100644 --- a/a_sync/primitives/_debug.py +++ b/a_sync/primitives/_debug.py @@ -41,7 +41,7 @@ def _start_debug_daemon(self, *args, **kwargs) -> "asyncio.Future[None]": **kwargs: Keyword arguments for the debug daemon. Returns: - The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created. + The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created. """ if self.debug_logs_enabled and asyncio.get_event_loop().is_running(): return asyncio.create_task(self._debug_daemon(*args, **kwargs)) diff --git a/a_sync/primitives/locks/counter.py b/a_sync/primitives/locks/counter.py index 8bb332b3..dc786492 100644 --- a/a_sync/primitives/locks/counter.py +++ b/a_sync/primitives/locks/counter.py @@ -1,7 +1,7 @@ """ This module provides two specialized async flow management classes, CounterLock and CounterLockCluster. -These primitives manages :class:`asyncio.Task` objects that must wait for an internal counter to reach a specific value. +These primitives manage :class:`asyncio.Task` objects that must wait for an internal counter to reach a specific value. """ import asyncio @@ -15,12 +15,12 @@ class CounterLock(_DebugDaemonMixin): """ - An async primitive that blocks until the internal counter has reached a specific value. + An async primitive that uses an internal counter to manage task synchronization. - A coroutine can `await counter.wait_for(3)` and it will block until the internal counter >= 3. - If some other task executes `counter.value = 5` or `counter.set(5)`, the first coroutine will unblock as 5 >= 3. + A coroutine can `await counter.wait_for(3)` and it will wait until the internal counter >= 3. + If some other task executes `counter.value = 5` or `counter.set(5)`, the first coroutine will proceed as 5 >= 3. - The internal counter can only increase. + The internal counter can only be set to a value greater than the current value. """ __slots__ = "is_ready", "_name", "_value", "_events" @@ -31,9 +31,8 @@ def __init__(self, start_value: int = 0, name: Optional[str] = None): Args: start_value: The initial value of the counter. - name (optional): An optional name for the counter, used in debug logs. + name: An optional name for the counter, used in debug logs. """ - self._name = name """An optional name for the counter, used in debug logs.""" @@ -52,9 +51,6 @@ async def wait_for(self, value: int) -> bool: Args: value: The value to wait for. - - Returns: - True when the counter reaches or exceeds the specified value. """ if not self.is_ready(value): self._ensure_debug_daemon() @@ -66,10 +62,10 @@ def set(self, value: int) -> None: Sets the counter to the specified value. Args: - value: The value to set the counter to. Must be >= the current value. + value: The value to set the counter to. Must be strictly greater than the current value. Raises: - ValueError: If the new value is less than the current value. + ValueError: If the new value is less than or equal to the current value. """ self.value = value @@ -81,9 +77,6 @@ def __repr__(self) -> str: def value(self) -> int: """ Gets the current value of the counter. - - Returns: - The current value of the counter. """ return self._value @@ -124,9 +117,9 @@ async def _debug_daemon(self) -> None: class CounterLockCluster: """ - An asyncio primitive that represents 2 or more CounterLock objects. + An asyncio primitive that represents a collection of CounterLock objects. - `wait_for(i)` will block until the value of all CounterLock objects is >= i. + `wait_for(i)` will wait until the value of all CounterLock objects is >= i. """ __slots__ = ("locks",) @@ -146,11 +139,8 @@ async def wait_for(self, value: int) -> bool: Args: value: The value to wait for. - - Returns: - True when the value of all CounterLock objects reach or exceed the specified value. """ await asyncio.gather( *[counter_lock.wait_for(value) for counter_lock in self.locks] ) - return True + return True \ No newline at end of file diff --git a/a_sync/primitives/locks/prio_semaphore.py b/a_sync/primitives/locks/prio_semaphore.py index d17fd01b..9c7b937c 100644 --- a/a_sync/primitives/locks/prio_semaphore.py +++ b/a_sync/primitives/locks/prio_semaphore.py @@ -1,3 +1,9 @@ +""" +This module provides priority-based semaphore implementations. These semaphores allow +waiters to be assigned priorities, ensuring that higher priority waiters are +processed before lower priority ones. +""" + import asyncio import heapq import logging @@ -20,6 +26,14 @@ def __lt__(self, other) -> bool: ... class _AbstractPrioritySemaphore(Semaphore, Generic[PT, CM]): + """ + A semaphore that allows prioritization of waiters. + + This semaphore manages waiters with associated priorities, ensuring that waiters with higher + priorities are processed before those with lower priorities. If no priority is specified, + the semaphore uses a default top priority. + """ + name: Optional[str] _value: int _waiters: List["_AbstractPrioritySemaphoreContextManager[PT]"] # type: ignore [assignment] @@ -45,28 +59,54 @@ def _top_priority(self) -> PT: raise NotImplementedError def __init__(self, value: int = 1, *, name: Optional[str] = None) -> None: + """Initializes the priority semaphore. + + Args: + value: The initial capacity of the semaphore. + name: An optional name for the semaphore, used for debugging. + """ + self._context_managers = {} + """A dictionary mapping priorities to their context managers.""" + self._capacity = value + """The initial capacity of the semaphore.""" + super().__init__(value, name=name) self._waiters = [] + """A heap queue of context managers, sorted by priority.""" + # NOTE: This should (hopefully) be temporary self._potential_lost_waiters: List["asyncio.Future[None]"] = [] + """A list of futures representing waiters that might have been lost.""" def __repr__(self) -> str: + """Returns a string representation of the semaphore.""" return f"<{self.__class__.__name__} name={self.name} capacity={self._capacity} value={self._value} waiters={self._count_waiters()}>" async def __aenter__(self) -> None: + """Enters the semaphore context, acquiring it with the top priority.""" await self[self._top_priority].acquire() async def __aexit__(self, *_) -> None: + """Exits the semaphore context, releasing it with the top priority.""" self[self._top_priority].release() async def acquire(self) -> Literal[True]: + """Acquires the semaphore with the top priority.""" return await self[self._top_priority].acquire() def __getitem__( self, priority: Optional[PT] ) -> "_AbstractPrioritySemaphoreContextManager[PT]": + """Gets the context manager for a given priority. + + Args: + priority: The priority for which to get the context manager. If None, uses the top priority. + + Returns: + The context manager associated with the given priority. + """ priority = self._top_priority if priority is None else priority if priority not in self._context_managers: context_manager = self._context_manager_class( @@ -77,7 +117,11 @@ def __getitem__( return self._context_managers[priority] def locked(self) -> bool: - """Returns True if semaphore cannot be acquired immediately.""" + """Checks if the semaphore is locked. + + Returns: + True if the semaphore cannot be acquired immediately, False otherwise. + """ return self._value == 0 or ( any( cm._waiters and any(not w.cancelled() for w in cm._waiters) @@ -86,12 +130,23 @@ def locked(self) -> bool: ) def _count_waiters(self) -> Dict[PT, int]: + """Counts the number of waiters for each priority. + + Returns: + A dictionary mapping each priority to the number of waiters. + """ return { manager._priority: len(manager.waiters) for manager in sorted(self._waiters, key=lambda m: m._priority) } def _wake_up_next(self) -> None: + """Wakes up the next waiter in line. + + This method handles the waking of waiters based on priority. It includes an emergency + procedure to handle potential lost waiters, ensuring that no waiter is left indefinitely + waiting. + """ while self._waiters: manager = heapq.heappop(self._waiters) if len(manager) == 0: @@ -148,6 +203,13 @@ def _wake_up_next(self) -> None: class _AbstractPrioritySemaphoreContextManager(Semaphore, Generic[PT]): + """ + A context manager for priority semaphore waiters. + + This context manager is associated with a specific priority and handles + the acquisition and release of the semaphore for waiters with that priority. + """ + _loop: asyncio.AbstractEventLoop _waiters: Deque[asyncio.Future] # type: ignore [assignment] __slots__ = "_parent", "_priority" @@ -162,36 +224,60 @@ def __init__( priority: PT, name: Optional[str] = None, ) -> None: + """Initializes the context manager for a specific priority. + + Args: + parent: The parent semaphore. + priority: The priority associated with this context manager. + name: An optional name for the context manager, used for debugging. + """ + self._parent = parent + """The parent semaphore.""" + self._priority = priority + """The priority associated with this context manager.""" + super().__init__(0, name=name) def __repr__(self) -> str: + """Returns a string representation of the context manager.""" return f"<{self.__class__.__name__} parent={self._parent} {self._priority_name}={self._priority} waiters={len(self)}>" def _repr_no_parent_(self) -> str: + """Returns a string representation of the context manager without the parent.""" return f"<{self.__class__.__name__} parent_name={self._parent.name} {self._priority_name}={self._priority} waiters={len(self)}>" def __lt__(self, other) -> bool: + """Compares this context manager with another based on priority. + + Args: + other: The other context manager to compare with. + + Returns: + True if this context manager has a lower priority than the other, False otherwise. + """ if type(other) is not type(self): raise TypeError(f"{other} is not type {self.__class__.__name__}") return self._priority < other._priority @cached_property def loop(self) -> asyncio.AbstractEventLoop: + """Gets the event loop associated with this context manager.""" return self._loop or asyncio.get_event_loop() @property def waiters(self) -> Deque[asyncio.Future]: + """Gets the deque of waiters for this context manager.""" if self._waiters is None: self._waiters = deque() return self._waiters async def acquire(self) -> Literal[True]: - """Acquire a semaphore. + """Acquires the semaphore for this context manager. If the internal counter is larger than zero on entry, - decrement it by one and return True immediately. If it is + decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True. @@ -214,34 +300,31 @@ async def acquire(self) -> Literal[True]: return True def release(self) -> None: + """Releases the semaphore for this context manager.""" self._parent.release() - -class _PrioritySemaphoreContextManager( - _AbstractPrioritySemaphoreContextManager[Numeric] -): +class _PrioritySemaphoreContextManager(_AbstractPrioritySemaphoreContextManager[Numeric]): + """Context manager for numeric priority semaphores.""" _priority_name = "priority" class PrioritySemaphore(_AbstractPrioritySemaphore[Numeric, _PrioritySemaphoreContextManager]): # type: ignore [type-var] - _context_manager_class = _PrioritySemaphoreContextManager - _top_priority = -1 - """ - It's kinda like a regular Semaphore but you must give each waiter a priority: + """Semaphore that uses numeric priorities for waiters. - ``` - priority_semaphore = PrioritySemaphore(10) + It's similar to a regular Semaphore but requires each waiter to have a priority: - async with priority_semaphore[priority]: - await do_stuff() - ``` + Examples: + The primary way to use this semaphore is by specifying a priority. + + >>> priority_semaphore = PrioritySemaphore(10) + >>> async with priority_semaphore[priority]: + ... await do_stuff() - You can aenter and aexit this semaphore without a priority and it will process those first. Like so: - - ``` - priority_semaphore = PrioritySemaphore(10) - - async with priority_semaphore: - await do_stuff() - ``` + You can also enter and exit this semaphore without specifying a priority, and it will use the top priority by default: + + >>> priority_semaphore = PrioritySemaphore(10) + >>> async with priority_semaphore: + ... await do_stuff() """ + _context_manager_class = _PrioritySemaphoreContextManager + _top_priority = -1 \ No newline at end of file diff --git a/a_sync/primitives/locks/semaphore.py b/a_sync/primitives/locks/semaphore.py index d7bac45c..2946b908 100644 --- a/a_sync/primitives/locks/semaphore.py +++ b/a_sync/primitives/locks/semaphore.py @@ -1,3 +1,8 @@ +""" +This module provides various semaphore implementations, including a debug-enabled semaphore, +a dummy semaphore that does nothing, and a threadsafe semaphore for use in multi-threaded applications. +""" + import asyncio import functools import logging @@ -15,9 +20,10 @@ class Semaphore(asyncio.Semaphore, _DebugDaemonMixin): """ A semaphore with additional debugging capabilities. - This semaphore includes debug logging. + This semaphore includes debug logging and can be used to decorate coroutine functions. + It allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator. - Also, it can be used to decorate coroutine functions so you can rewrite this pattern: + So you can write this pattern: ``` semaphore = Semaphore(5) @@ -56,30 +62,18 @@ def __init__(self, value: int, name=None, **kwargs) -> None: self.name = name or self.__origin__ if hasattr(self, "__origin__") else None self._decorated: Set[str] = set() - # Dank new functionality - def __call__(self, fn: CoroFn[P, T]) -> CoroFn[P, T]: """ - Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern: - - ``` - semaphore = Semaphore(5) - - async def limited(): - async with semaphore: - return 1 - - ``` + Decorator method to wrap coroutine functions with the semaphore. - like this: + This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator. - ``` - semaphore = Semaphore(5) + Example: + semaphore = Semaphore(5) - @semaphore - async def limited(): - return 1 - ``` + @semaphore + async def limited(): + return 1 """ return self.decorate(fn) # type: ignore [arg-type, return-value] @@ -97,26 +91,11 @@ def decorate(self, fn: CoroFn[P, T]) -> CoroFn[P, T]: Wrap a coroutine function to ensure it runs with the semaphore. Example: - Now you can rewrite this pattern: - - ``` - semaphore = Semaphore(5) - - async def limited(): - async with semaphore: - return 1 - - ``` - - like this: - - ``` semaphore = Semaphore(5) @semaphore async def limited(): return 1 - ``` """ if not asyncio.iscoroutinefunction(fn): raise TypeError(f"{fn} must be a coroutine function") @@ -134,7 +113,6 @@ async def acquire(self) -> Literal[True]: self._ensure_debug_daemon() return await super().acquire() - # Everything below just adds some debug logs async def _debug_daemon(self) -> None: """ Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters. @@ -149,11 +127,19 @@ async def _debug_daemon(self) -> None: class DummySemaphore(asyncio.Semaphore): """ A dummy semaphore that implements the standard :class:`asyncio.Semaphore` API but does nothing. + + This class is useful for scenarios where a semaphore interface is required but no actual synchronization is needed. """ __slots__ = "name", "_value" def __init__(self, name: Optional[str] = None): + """ + Initialize the dummy semaphore with an optional name. + + Args: + name (optional): An optional name for the dummy semaphore. + """ self.name = name self._value = 0 @@ -172,16 +158,22 @@ async def __aexit__(self, *args): ... class ThreadsafeSemaphore(Semaphore): """ - While its a bit weird to run multiple event loops, sometimes either you or a lib you're using must do so. - When in use in threaded applications, this semaphore will not work as intended but at least your program will function. - You may need to reduce the semaphore value for multi-threaded applications. + A semaphore that works in a multi-threaded environment. - # TL;DR it's a janky fix for an edge case problem and will otherwise function as a normal a_sync.Semaphore (which is just an asyncio.Semaphore with extra bells and whistles). + This semaphore ensures that the program functions correctly even when used with multiple event loops. + It provides a workaround for edge cases involving multiple threads and event loops. """ __slots__ = "semaphores", "dummy" def __init__(self, value: Optional[int], name: Optional[str] = None) -> None: + """ + Initialize the threadsafe semaphore with a given value and optional name. + + Args: + value: The initial value for the semaphore, should be an integer. + name (optional): An optional name for the semaphore. + """ assert isinstance(value, int), f"{value} should be an integer." super().__init__(value, name=name) self.semaphores: DefaultDict[Thread, Semaphore] = defaultdict(lambda: Semaphore(value, name=self.name)) # type: ignore [arg-type] @@ -192,6 +184,12 @@ def __len__(self) -> int: @functools.cached_property def use_dummy(self) -> bool: + """ + Determine whether to use a dummy semaphore. + + Returns: + True if the semaphore value is None, indicating the use of a dummy semaphore. + """ return self._value is None @property @@ -207,4 +205,4 @@ async def __aenter__(self): await self.semaphore.acquire() async def __aexit__(self, *args): - self.semaphore.release() + self.semaphore.release() \ No newline at end of file diff --git a/a_sync/sphinx/ext.py b/a_sync/sphinx/ext.py index ea5840e7..3de66bd1 100644 --- a/a_sync/sphinx/ext.py +++ b/a_sync/sphinx/ext.py @@ -49,22 +49,47 @@ class _ASyncWrapperDocumenter: + """Base class for documenters that handle wrapped ASync functions.""" + typ: type @classmethod def can_document_member(cls, member, membername, isattr, parent): + """Determine if the member can be documented by this documenter. + + Args: + member: The member to check. + membername: The name of the member. + isattr: Boolean indicating if the member is an attribute. + parent: The parent object. + + Returns: + bool: True if the member can be documented, False otherwise. + """ return ( isinstance(member, cls.typ) and getattr(member, "__wrapped__") is not None ) def document_members(self, all_members=False): + """Document members of the object. + + Args: + all_members: Boolean indicating if all members should be documented. + """ pass def check_module(self): - # Normally checks if *self.object* is really defined in the module - # given by *self.modname*. But since functions decorated with the @task - # decorator are instances living in the celery.local, we have to check - # the wrapped function instead. + """Check if the object is defined in the expected module. + + Returns: + bool: True if the object is defined in the expected module, False otherwise. + + Note: + Normally checks if *self.object* is really defined in the module + given by *self.modname*. But since functions decorated with the @task + decorator are instances living in the celery.local, we have to check + the wrapped function instead. + """ wrapped = getattr(self.object, "__wrapped__", None) if wrapped and getattr(wrapped, "__module__") == self.modname: return True @@ -72,7 +97,14 @@ def check_module(self): class _ASyncFunctionDocumenter(_ASyncWrapperDocumenter, FunctionDocumenter): + """Documenter for ASyncFunction instances.""" + def format_args(self): + """Format the arguments of the wrapped function. + + Returns: + str: The formatted arguments. + """ wrapped = getattr(self.object, "__wrapped__", None) if wrapped is not None: sig = signature(wrapped) @@ -83,7 +115,14 @@ def format_args(self): class _ASyncMethodDocumenter(_ASyncWrapperDocumenter, MethodDocumenter): + """Documenter for ASyncMethod instances.""" + def format_args(self): + """Format the arguments of the wrapped method. + + Returns: + str: The formatted arguments. + """ wrapped = getattr(self.object, "__wrapped__") if wrapped is not None: return str(signature(wrapped)) @@ -91,17 +130,29 @@ def format_args(self): class _ASyncDirective: + """Base class for ASync directives.""" + prefix_env: str def get_signature_prefix(self, sig): + """Get the signature prefix for the directive. + + Args: + sig: The signature to process. + + Returns: + list: A list of nodes representing the signature prefix. + """ return [nodes.Text(getattr(self.env.config, self.prefix_env))] class _ASyncFunctionDirective(_ASyncDirective, PyFunction): + """Directive for ASyncFunction instances.""" pass class _ASyncMethodDirective(_ASyncDirective, PyMethod): + """Directive for ASyncMethod instances.""" pass @@ -115,7 +166,7 @@ class ASyncFunctionDocumenter(_ASyncFunctionDocumenter): class ASyncFunctionSyncDocumenter(_ASyncFunctionDocumenter): - """Document ASyncFunction instance definitions.""" + """Document ASyncFunctionSyncDefault instance definitions.""" objtype = "a_sync_function_sync" typ = ASyncFunctionSyncDefault @@ -124,7 +175,7 @@ class ASyncFunctionSyncDocumenter(_ASyncFunctionDocumenter): class ASyncFunctionAsyncDocumenter(_ASyncFunctionDocumenter): - """Document ASyncFunction instance definitions.""" + """Document ASyncFunctionAsyncDefault instance definitions.""" objtype = "a_sync_function_async" typ = ASyncFunctionAsyncDefault @@ -133,14 +184,20 @@ class ASyncFunctionAsyncDocumenter(_ASyncFunctionDocumenter): class ASyncFunctionDirective(_ASyncFunctionDirective): + """Directive for ASyncFunction instances.""" + prefix_env = "a_sync_function_prefix" class ASyncFunctionSyncDirective(_ASyncFunctionDirective): + """Directive for ASyncFunctionSyncDefault instances.""" + prefix_env = "a_sync_function_sync_prefix" class ASyncFunctionAsyncDirective(_ASyncFunctionDirective): + """Directive for ASyncFunctionAsyncDefault instances.""" + prefix_env = "a_sync_function_async_prefix" @@ -153,13 +210,13 @@ class ASyncDescriptorDocumenter(_ASyncMethodDocumenter): class ASyncDescriptorDirective(_ASyncMethodDirective): - """Sphinx task directive.""" + """Directive for ASyncDescriptor instances.""" prefix_env = "a_sync_descriptor_prefix" class ASyncGeneratorFunctionDocumenter(_ASyncFunctionDocumenter): - """Document ASyncFunction instance definitions.""" + """Document ASyncGeneratorFunction instance definitions.""" objtype = "a_sync_generator_function" typ = ASyncGeneratorFunction @@ -167,13 +224,25 @@ class ASyncGeneratorFunctionDocumenter(_ASyncFunctionDocumenter): class ASyncGeneratorFunctionDirective(_ASyncFunctionDirective): - """Sphinx task directive.""" + """Directive for ASyncGeneratorFunction instances.""" prefix_env = "a_sync_generator_function_prefix" def autodoc_skip_member_handler(app, what, name, obj, skip, options): - """Handler for autodoc-skip-member event.""" + """Handler for autodoc-skip-member event. + + Args: + app: The Sphinx application object. + what: The type of the object being documented. + name: The name of the object. + obj: The object itself. + skip: Boolean indicating if the member should be skipped. + options: The options for the autodoc directive. + + Returns: + bool: True if the member should be skipped, False otherwise. + """ if isinstance( obj, (ASyncFunction, ASyncDescriptor, ASyncGeneratorFunction) ) and getattr(obj, "__wrapped__"): @@ -183,7 +252,14 @@ def autodoc_skip_member_handler(app, what, name, obj, skip, options): def setup(app): - """Setup Sphinx extension.""" + """Setup Sphinx extension. + + Args: + app: The Sphinx application object. + + Returns: + dict: A dictionary with metadata about the extension. + """ app.setup_extension("sphinx.ext.autodoc") # function @@ -218,4 +294,4 @@ def setup(app): app.connect("autodoc-skip-member", autodoc_skip_member_handler) - return {"parallel_read_safe": True} + return {"parallel_read_safe": True} \ No newline at end of file diff --git a/a_sync/task.py b/a_sync/task.py index 51992239..8e91b91f 100644 --- a/a_sync/task.py +++ b/a_sync/task.py @@ -1,3 +1,12 @@ +""" +This module provides asynchronous task management utilities, specifically focused on creating and handling mappings of tasks. + +The main components include: +- TaskMapping: A class for managing and asynchronously generating tasks based on input iterables. +- TaskMappingKeys: A view to asynchronously iterate over the keys of a TaskMapping. +- TaskMappingValues: A view to asynchronously iterate over the values of a TaskMapping. +- TaskMappingItems: A view to asynchronously iterate over the items (key-value pairs) of a TaskMapping. +""" import asyncio import contextlib import functools @@ -96,7 +105,7 @@ def __init__( Initialize a TaskMapping instance. Args: - wrapped_func: A function that takes a key (and optional parameters) and returns an Awaitable. + wrapped_func: A callable that takes a key and additional parameters and returns an Awaitable. *iterables: Any number of iterables whose elements will be used as keys for task generation. name: An optional name for the tasks created by this mapping. concurrency: Maximum number of tasks to run concurrently. @@ -207,7 +216,7 @@ def __await__(self) -> Generator[Any, None, Dict[K, V]]: return self.gather(sync=False).__await__() async def __aiter__(self, pop: bool = False) -> AsyncIterator[Tuple[K, V]]: - """aiterate thru all key-task pairs, yielding the key-result pair as each task completes""" + """Asynchronously iterate through all key-task pairs, yielding the key-result pair as each task completes.""" self._if_pop_check_destroyed(pop) # if you inited the TaskMapping with some iterators, we will load those @@ -276,13 +285,13 @@ async def map( Asynchronously map iterables to tasks and yield their results. Args: - *iterables: Iterables to map over. - pop: Whether to remove tasks from the internal storage once they are completed. - yields: Whether to yield 'keys', 'values', or 'both' (key-value pairs). + *iterables: Iterables to map over. + pop: Whether to remove tasks from the internal storage once they are completed. + yields: Whether to yield 'keys', 'values', or 'both' (key-value pairs). Yields: - Depending on `yields`, either keys, values, - or tuples of key-value pairs representing the results of completed tasks. + Depending on `yields`, either keys, values, + or tuples of key-value pairs representing the results of completed tasks. """ self._if_pop_check_destroyed(pop) @@ -371,6 +380,7 @@ async def max(self, pop: bool = True) -> V: @ASyncMethodDescriptorSyncDefault async def min(self, pop: bool = True) -> V: + """Return the minimum result from the tasks in the mapping.""" min = None try: async for key, result in self.__aiter__(pop=pop): @@ -388,6 +398,7 @@ async def min(self, pop: bool = True) -> V: @ASyncMethodDescriptorSyncDefault async def sum(self, pop: bool = False) -> V: + """Return the sum of the results from the tasks in the mapping.""" retval = 0 try: async for key, result in self.__aiter__(pop=pop): @@ -438,21 +449,41 @@ async def gather( @overload def pop( - self, item: K, cancel: bool = False - ) -> "Union[asyncio.Task[V], asyncio.Future[V]]": ... + self, item: K, *, cancel: bool = False + ) -> "Union[asyncio.Task[V], asyncio.Future[V]]": + """Pop a task from the TaskMapping. + + Args: + item: The key to pop. + cancel: Whether to cancel the task when popping it. + """ @overload def pop( - self, item: K, default: K, cancel: bool = False - ) -> "Union[asyncio.Task[V], asyncio.Future[V]]": ... + self, item: K, default: K, *, cancel: bool = False + ) -> "Union[asyncio.Task[V], asyncio.Future[V]]": + """Pop a task from the TaskMapping. + + Args: + item: The key to pop. + default: The default value to return if no matching key is found. + cancel: Whether to cancel the task when popping it. + """ def pop( self, *args: K, cancel: bool = False ) -> "Union[asyncio.Task[V], asyncio.Future[V]]": + """Pop a task from the TaskMapping. + + Args: + *args: One key to pop. + cancel: Whether to cancel the task when popping it. + """ fut_or_task = super().pop(*args) if cancel: fut_or_task.cancel() return fut_or_task def clear(self, cancel: bool = False) -> None: + """# TODO write docs for this """ if cancel and self._init_loader and not self._init_loader.done(): logger.debug("cancelling %s", self._init_loader) # temporary, remove later @@ -559,9 +590,11 @@ class _NoRunningLoop(Exception): ... @overload -def _yield(key: K, value: V, yields: Literal["keys"]) -> K: ... +def _yield(key: K, value: V, yields: Literal["keys"]) -> K: + # TODO write specific docs for this overload @overload -def _yield(key: K, value: V, yields: Literal["both"]) -> Tuple[K, V]: ... +def _yield(key: K, value: V, yields: Literal["both"]) -> Tuple[K, V]: + # TODO write specific docs for this overload def _yield(key: K, value: V, yields: Literal["keys", "both"]) -> Union[K, Tuple[K, V]]: """ Yield either the key, value, or both based on the 'yields' parameter. @@ -643,6 +676,9 @@ def _unwrap( class _TaskMappingView(ASyncGenericBase, Iterable[T], Generic[T, K, V]): + """ + Base class for TaskMapping views that provides common functionality. + """ _get_from_item: Callable[[Tuple[K, V]], T] _pop: bool = False @@ -683,6 +719,9 @@ async def aiterbyvalues(self, reverse: bool = False) -> ASyncIterator[T]: class TaskMappingKeys(_TaskMappingView[K, K, V], Generic[K, V]): + """ + Asynchronous view to iterate over the keys of a TaskMapping. + """ _get_from_item = lambda self, item: _get_key(item) async def __aiter__(self) -> AsyncIterator[K]: @@ -740,6 +779,9 @@ async def __load_init_loader(self, yielded: Set[K]) -> AsyncIterator[K]: class TaskMappingItems(_TaskMappingView[Tuple[K, V], K, V], Generic[K, V]): + """ + Asynchronous view to iterate over the items (key-value pairs) of a TaskMapping. + """ _get_from_item = lambda self, item: item async def __aiter__(self) -> AsyncIterator[Tuple[K, V]]: @@ -755,6 +797,9 @@ async def __aiter__(self) -> AsyncIterator[Tuple[K, V]]: class TaskMappingValues(_TaskMappingView[V, K, V], Generic[K, V]): + """ + Asynchronous view to iterate over the values of a TaskMapping. + """ _get_from_item = lambda self, item: _get_value(item) async def __aiter__(self) -> AsyncIterator[V]: @@ -775,4 +820,4 @@ async def __aiter__(self) -> AsyncIterator[V]: "TaskMappingKeys", "TaskMappingValues", "TaskMappingItems", -] +] \ No newline at end of file diff --git a/a_sync/utils/__init__.py b/a_sync/utils/__init__.py index 8b07e0d4..a1c1fd01 100644 --- a/a_sync/utils/__init__.py +++ b/a_sync/utils/__init__.py @@ -1,3 +1,8 @@ +""" +This module initializes the utility functions for the a_sync library, including functions for handling asynchronous +iterators and implementing asynchronous versions of the built-in any and all functions. +""" + import asyncio from a_sync.utils.iterators import as_yielded, exhaust_iterator, exhaust_iterators diff --git a/a_sync/utils/iterators.py b/a_sync/utils/iterators.py index 93184310..8651812e 100644 --- a/a_sync/utils/iterators.py +++ b/a_sync/utils/iterators.py @@ -1,3 +1,9 @@ +""" +This module provides utility functions for handling and merging asynchronous iterators. It includes functions to +exhaust async iterators, merge multiple async iterators into a single async iterator, and manage the processing +flow of items in an asynchronous context. +""" + import asyncio import asyncio.futures import logging @@ -21,9 +27,6 @@ async def exhaust_iterator( Args: iterator (AsyncIterator[T]): The async iterator to exhaust. queue (Optional[asyncio.Queue]): An optional queue where iterated items will be placed. If None, items are simply consumed. - - Returns: - None """ async for thing in iterator: if queue: @@ -43,9 +46,6 @@ async def exhaust_iterators( iterators: A sequence of async iterators to be exhausted concurrently. queue (Optional[asyncio.Queue]): An optional queue where items from all iterators will be placed. If None, items are simply consumed. join (Optional[bool]): If a queue was provided and join is True, this coroutine will continue to run until all queue items have been processed. - - Returns: - None """ for x in await asyncio.gather( *[exhaust_iterator(iterator, queue=queue) for iterator in iterators], @@ -171,14 +171,13 @@ async def as_yielded(*iterators: AsyncIterator[T]) -> AsyncIterator[T]: # type: The merging process is facilitated by internally managing a queue where items from the source iterators are placed as they are fetched. This mechanism ensures that the merged stream of items is delivered in an order determined by the availability of items from the source iterators, rather than their original sequence. + The function handles exceptions and ensures robustness and reliability by using asyncio tasks and queues. It manages edge cases such as early termination and exception management. The `_Done` sentinel class is used internally to signal the completion of processing. + Args: *iterators: Variable length list of AsyncIterator objects to be merged. - Returns: - AsyncIterator[T]: An async iterator that yields items from the input async iterators as they become available. - Note: - This implementation leverages asyncio tasks and queues to efficiently manage the asynchronous iteration and merging process. It handles edge cases such as early termination and exception management, ensuring robustness and reliability. + This implementation leverages asyncio tasks and queues to efficiently manage the asynchronous iteration and merging process. It handles edge cases such as early termination and exception management, ensuring robustness and reliability. The `_Done` sentinel class is used internally to signal the completion of processing. """ # hypothesis idea: _Done should never be exposed to user, works for all desired input types queue: Queue[Union[T, _Done]] = Queue() @@ -231,11 +230,17 @@ class _Done: """ def __init__(self, exc: Optional[Exception] = None) -> None: + """Initializes the _Done sentinel. + + Args: + exc (Optional[Exception]): An optional exception to be associated with the completion. + """ self._exc = exc @property def _tb(self) -> TracebackType: + """Returns the traceback associated with the exception, if any.""" return self._exc.__traceback__ # type: ignore [union-attr] -__all__ = ["as_yielded", "exhaust_iterator", "exhaust_iterators"] +__all__ = ["as_yielded", "exhaust_iterator", "exhaust_iterators"] \ No newline at end of file diff --git a/tests/test_task.py b/tests/test_task.py index a35dfa49..6b9bd795 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -6,11 +6,25 @@ @pytest.mark.asyncio_cooperative async def test_create_task(): - await create_task(coro=asyncio.sleep(0), name="test") + """Test the creation of an asynchronous task. + + Verifies that a task can be created using the `create_task` + function with a coroutine and a specified name. + """ + t = create_task(coro=asyncio.sleep(0), name="test") + assert t.get_name() == "test", t + await t @pytest.mark.asyncio_cooperative async def test_persistent_task(): + """Test the persistence of a task without a local reference. + + Checks if a task created without a local reference + completes successfully by setting a nonlocal variable. + The test ensures that the task completes by verifying + the change in the nonlocal variable. + """ check = False async def task(): @@ -26,6 +40,11 @@ async def task(): @pytest.mark.asyncio_cooperative async def test_pruning(): + """Test task creation and handling without errors. + + Ensures that tasks can be created without causing errors. + This test does not explicitly check for task pruning. + """ async def task(): return @@ -37,6 +56,12 @@ async def task(): @pytest.mark.asyncio_cooperative async def test_task_mapping_init(): + """Test initialization of TaskMapping. + + Verifies that the TaskMapping class initializes correctly + with the provided coroutine function and arguments. Checks + the handling of function arguments and the task name. + """ tasks = TaskMapping(_coro_fn) assert ( tasks._wrapped_func is _coro_fn @@ -50,6 +75,12 @@ async def test_task_mapping_init(): @pytest.mark.asyncio_cooperative async def test_task_mapping(): + """Test the functionality of TaskMapping. + + Checks the behavior of TaskMapping, including task + creation, retrieval, and execution. Verifies the ability + to await the mapping and checks the return values of tasks. + """ tasks = TaskMapping(_coro_fn) # does it return the correct type assert isinstance(tasks[0], asyncio.Task) @@ -74,6 +105,13 @@ async def test_task_mapping(): @pytest.mark.asyncio_cooperative async def test_task_mapping_map_with_sync_iter(): + """Test TaskMapping with a synchronous iterator. + + Verifies that TaskMapping can map over a synchronous + iterator and correctly handle keys, values, and items. + Ensures that mapping in progress raises a RuntimeError + when attempted concurrently. + """ tasks = TaskMapping(_coro_fn) i = 0 async for k, v in tasks.map(range(5)): @@ -127,6 +165,13 @@ async def test_task_mapping_map_with_sync_iter(): @pytest.mark.asyncio_cooperative async def test_task_mapping_map_with_async_iter(): + """Test TaskMapping with an asynchronous iterator. + + Verifies that TaskMapping can map over an asynchronous + iterator and correctly handle keys, values, and items. + Ensures that mapping in progress raises a RuntimeError + when attempted concurrently. + """ async def async_iter(): for i in range(5): yield i @@ -212,6 +257,12 @@ async def async_iter(): def test_taskmapping_views_sync(): + """Test synchronous views of TaskMapping. + + Checks the synchronous access to keys, values, and items + in TaskMapping. Verifies the state of these views before + and after gathering tasks. + """ tasks = TaskMapping(_coro_fn, range(5)) # keys are currently empty until the loop has a chance to run @@ -243,5 +294,13 @@ def test_taskmapping_views_sync(): async def _coro_fn(i: int) -> str: + """Coroutine function for testing. + + Args: + i: An integer input. + + Returns: + A string representation of the incremented input. + """ i += 1 - return str(i) * i + return str(i) * i \ No newline at end of file