From 3b1c82eb7d3a637652c01fcabfb0f19dd1d5b2e2 Mon Sep 17 00:00:00 2001 From: BobTheBuidler <70677534+BobTheBuidler@users.noreply.github.com> Date: Wed, 1 Mar 2023 02:42:39 -0500 Subject: [PATCH] Big sloppy commit - some feats, some fixes, some refactorings (#17) * feat: wrap sync and async bound methods on ASyncABC subclasses, allow modifier overriding * feat: lru cache * feat: env vars to set system wide defaults * feat: improve type hint accuracy * fix: read modifiers from class def * fix: semaphore * fix: missing async_lru dep * fix: default as first arg for property and cached property * chore: rename Modifiers -> ModifierManager and cleanup imports --- a_sync/_bound.py | 37 ++-- a_sync/_helpers.py | 21 +-- a_sync/_meta.py | 40 ++-- a_sync/_typing.py | 24 +++ a_sync/abstract.py | 5 +- a_sync/config.py | 55 +++++- a_sync/decorator.py | 172 ++++-------------- a_sync/modified.py | 75 ++++++++ a_sync/modifiers.py | 11 -- a_sync/modifiers/__init__.py | 19 ++ a_sync/modifiers/cache/__init__.py | 62 +++++++ a_sync/modifiers/cache/memory.py | 67 +++++++ .../limiter.py} | 20 +- a_sync/modifiers/manager.py | 72 ++++++++ a_sync/property.py | 67 ++++--- a_sync/semaphores.py | 11 +- requirements.txt | 1 + 17 files changed, 500 insertions(+), 259 deletions(-) create mode 100644 a_sync/_typing.py create mode 100644 a_sync/modified.py delete mode 100644 a_sync/modifiers.py create mode 100644 a_sync/modifiers/__init__.py create mode 100644 a_sync/modifiers/cache/__init__.py create mode 100644 a_sync/modifiers/cache/memory.py rename a_sync/{rate_limiting.py => modifiers/limiter.py} (78%) create mode 100644 a_sync/modifiers/manager.py diff --git a/a_sync/_bound.py b/a_sync/_bound.py index 1cfed591..c9096c4a 100644 --- a/a_sync/_bound.py +++ b/a_sync/_bound.py @@ -1,21 +1,16 @@ import functools from inspect import isawaitable -from typing import Awaitable, Callable, TypeVar, overload - -from typing_extensions import ParamSpec # type: ignore [attr-defined] +from typing import Awaitable, Callable, overload from a_sync import _helpers +from a_sync._typing import * from a_sync.decorator import a_sync as unbound_a_sync -from a_sync.modifiers import Modifiers from a_sync.property import (AsyncCachedPropertyDescriptor, AsyncPropertyDescriptor) -P = ParamSpec("P") -T = TypeVar("T") - -def _wrap_bound_method(coro_fn: Callable[P, T], **modifiers: Modifiers) -> Callable[P, T]: # type: ignore [misc] +def _wrap_bound_method(coro_fn: Callable[P, T], **modifiers: ModifierKwargs) -> Callable[P, T]: # type: ignore [misc] from a_sync.abstract import ASyncABC # First we wrap the coro_fn so overriding kwargs are handled automagically. @@ -27,23 +22,23 @@ def bound_a_sync_wrap(self, *args: P.args, **kwargs: P.kwargs) -> T: # type: ig raise RuntimeError(f"{self} must be an instance of a class that inherits from ASyncABC.") # This could either be a coroutine or a return value from an awaited coroutine, # depending on if an overriding kwarg was passed into the function call. - retval_or_coro = wrapped_coro_fn(self, *args, **kwargs) - if not isawaitable(retval_or_coro): + retval = coro = wrapped_coro_fn(self, *args, **kwargs) + if not isawaitable(retval): # The coroutine was already awaited due to the use of an overriding kwarg. # We can return the value. - return retval_or_coro + return retval # The awaitable was not awaited, so now we need to check the flag as defined on 'self' and await if appropriate. - return _helpers._await_if_sync(retval_or_coro, self.__a_sync_should_await__(kwargs)) # type: ignore [call-overload] + return _helpers._await(coro) if self.__a_sync_should_await__(kwargs) else coro # type: ignore [call-overload] return bound_a_sync_wrap @overload -def _wrap_property(async_property: AsyncPropertyDescriptor, **modifiers: Modifiers) -> AsyncPropertyDescriptor: +def _wrap_property(async_property: AsyncPropertyDescriptor, **modifiers: ModifierKwargs) -> AsyncPropertyDescriptor: ... @overload -def _wrap_property(async_property: AsyncCachedPropertyDescriptor, **modifiers: Modifiers) -> AsyncCachedPropertyDescriptor: # type: ignore [misc] +def _wrap_property(async_property: AsyncCachedPropertyDescriptor, **modifiers: ModifierKwargs) -> AsyncCachedPropertyDescriptor: # type: ignore [misc] ... -def _wrap_property(async_property, **modifiers: Modifiers) -> tuple: +def _wrap_property(async_property, **modifiers: ModifierKwargs) -> tuple: if not isinstance(async_property, (AsyncPropertyDescriptor, AsyncCachedPropertyDescriptor)): raise TypeError(f"{async_property} must be one of: AsyncPropertyDescriptor, AsyncCachedPropertyDescriptor") @@ -52,19 +47,19 @@ def _wrap_property(async_property, **modifiers: Modifiers) -> tuple: async_property.hidden_method_name = f"__{async_property.field_name}__" @unbound_a_sync(**modifiers) - async def awaitable(instance: object) -> Awaitable[T]: + async def awaitable(instance: ASyncABC) -> Awaitable[T]: return await async_property.__get__(instance, async_property) @functools.wraps(async_property) - def a_sync_method(self, **kwargs): + def a_sync_method(self: ASyncABC, **kwargs) -> T: if not isinstance(self, ASyncABC): raise RuntimeError(f"{self} must be an instance of a class that inherits from ASyncABC.") - return _helpers._await_if_sync(awaitable(self), self.__a_sync_should_await__(kwargs)) + return _helpers._await(awaitable(self)) if self.__a_sync_should_await__(kwargs) else awaitable(self) @property # type: ignore [misc] @functools.wraps(async_property) - def a_sync_property(self) -> T: - a_sync_method = getattr(self, async_property.hidden_method_name) - return _helpers._await_if_sync(a_sync_method(sync=False), self.__a_sync_should_await__({})) + def a_sync_property(self: ASyncABC) -> T: + coro = getattr(self, async_property.hidden_method_name)(sync=False) + return _helpers._await(coro) if self.__a_sync_should_await__({}) else coro return a_sync_property, a_sync_method diff --git a/a_sync/_helpers.py b/a_sync/_helpers.py index bfd6aac8..dc208078 100644 --- a/a_sync/_helpers.py +++ b/a_sync/_helpers.py @@ -1,14 +1,13 @@ import asyncio from inspect import getfullargspec -from typing import Awaitable, Callable, Literal, TypeVar, Union, overload +from typing import Awaitable, Callable from async_property.base import AsyncPropertyDescriptor from async_property.cached import AsyncCachedPropertyDescriptor from a_sync import _flags - -T = TypeVar("T") +from a_sync._typing import T def _validate_wrapped_fn(fn: Callable) -> None: @@ -22,21 +21,7 @@ def _validate_wrapped_fn(fn: Callable) -> None: running_event_loop_msg = f"You may want to make this an async function by setting one of the following kwargs: {_flags.VIABLE_FLAGS}" -@overload -def _await_if_sync(awaitable: Awaitable[T], sync: Literal[True]) -> T:... - -@overload -def _await_if_sync(awaitable: Awaitable[T], sync: Literal[False]) -> Awaitable[T]:... - -def _await_if_sync(awaitable: Awaitable[T], sync: bool) -> Union[T, Awaitable[T]]: - """ - If 'sync' is True, awaits the awaitable and returns the return value. - If 'sync' is False, simply returns the awaitable. - """ - return _sync(awaitable) if sync else awaitable - - -def _sync(awaitable: Awaitable[T]) -> T: +def _await(awaitable: Awaitable[T]) -> T: try: return asyncio.get_event_loop().run_until_complete(awaitable) except RuntimeError as e: diff --git a/a_sync/_meta.py b/a_sync/_meta.py index 1c26997f..47035a00 100644 --- a/a_sync/_meta.py +++ b/a_sync/_meta.py @@ -1,35 +1,37 @@ import threading from abc import ABCMeta -from asyncio import iscoroutinefunction from typing import Any, Dict, Tuple -from a_sync import _bound -from a_sync.property import (AsyncCachedPropertyDescriptor, - AsyncPropertyDescriptor) +from a_sync import _bound, modifiers +from a_sync.modified import ASyncFunction, Modified +from a_sync.property import PropertyDescriptor class ASyncMeta(ABCMeta): """Any class with metaclass ASyncMeta will have its functions wrapped with a_sync upon class instantiation.""" def __new__(cls, name, bases, attrs): - # Wrap all methods with a_sync abilities for attr_name, attr_value in list(attrs.items()): - - # Read modifiers from class definition - fn_modifiers = cls.__a_sync_modifiers__ if hasattr(cls, '__a_sync_modifiers__') else {} - - # Special handling for functions decorated with async_property and async_cached_property - if isinstance(attr_value, (AsyncPropertyDescriptor, AsyncCachedPropertyDescriptor)): - # Check for modifier overrides defined at the property decorator - fn_modifiers.update(attr_value.modifiers) - # Wrap property - attrs[attr_name], attrs[attr_value.hidden_method_name] = _bound._wrap_property(attr_value, **fn_modifiers) - elif iscoroutinefunction(attr_value): + # Read modifiers from class definition + # NOTE: Open uesion: what do we do when a parent class and subclass define the same modifier differently? + # Currently the parent value is used for functions defined on the parent, + # and the subclass value is used for functions defined on the subclass. + fn_modifiers = modifiers.get_modifiers_from(attrs) + # Special handling for functions decorated with a_sync decorators + if isinstance(attr_value, Modified): + # Check for modifier overrides defined on the Modified object + fn_modifiers.update(attr_value.modifiers._modifiers) + if isinstance(attr_value, PropertyDescriptor): + # Wrap property + attrs[attr_name], attrs[attr_value.hidden_method_name] = _bound._wrap_property(attr_value, **fn_modifiers) + elif isinstance(attr_value, ASyncFunction): + attrs[attr_name] = _bound._wrap_bound_method(attr_value, **fn_modifiers) + else: + raise NotImplementedError(attr_name, attr_value) + + if callable(attr_value) and "__" not in attr_name and not attr_name.startswith("_"): # NOTE We will need to improve this logic if somebody needs to use it with classmethods or staticmethods. - # TODO: update modifiers with override decorators (or maybe the main deco?) - # modifiers.update(overrides) - # Wrap bound method attrs[attr_name] = _bound._wrap_bound_method(attr_value, **fn_modifiers) return super(ASyncMeta, cls).__new__(cls, name, bases, attrs) diff --git a/a_sync/_typing.py b/a_sync/_typing.py new file mode 100644 index 00000000..2fed6857 --- /dev/null +++ b/a_sync/_typing.py @@ -0,0 +1,24 @@ + +import asyncio +from concurrent.futures._base import Executor +from typing import (Awaitable, Callable, Generic, Literal, Optional, TypedDict, + TypeVar, Union, overload) + +from typing_extensions import ParamSpec, Unpack + +T = TypeVar("T") +P = ParamSpec("P") + +class ModifierKwargs(TypedDict, total=False): + default: Literal['sync', 'async', None] + cache_type: Literal['memory', None] + cache_typed: bool + ram_cache_maxsize: Optional[int] + ram_cache_ttl: Optional[int] + runs_per_minute: int + semaphore: Union[int, asyncio.Semaphore] + # sync modifiers + executor: Executor + +class Modified(Generic[T]): + pass diff --git a/a_sync/abstract.py b/a_sync/abstract.py index 68821822..187ffd76 100644 --- a/a_sync/abstract.py +++ b/a_sync/abstract.py @@ -4,6 +4,7 @@ from a_sync import _flags, _kwargs, exceptions, modifiers from a_sync._meta import ASyncMeta +from a_sync._typing import * class ASyncABC(metaclass=ASyncMeta): @@ -43,9 +44,9 @@ def __a_sync_instance_will_be_sync__(cls, kwargs: dict) -> bool: ###################################### @property - def __a_sync_modifiers__(thing: Union[type, object]) -> modifiers.Modifiers: + def __a_sync_modifiers__(self: "ASyncABC") -> ModifierKwargs: """You should not override this.""" - return modifiers.Modifiers({modifier: getattr(thing, modifier) for modifier in modifiers.valid_modifiers if hasattr(thing, modifier)}) + return modifiers.get_modifiers_from(self) #################### # Abstract Methods # diff --git a/a_sync/config.py b/a_sync/config.py index 2791946a..e5b5ef93 100644 --- a/a_sync/config.py +++ b/a_sync/config.py @@ -1,15 +1,56 @@ +import functools import os -from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from concurrent.futures._base import Executor +from a_sync._typing import * +from a_sync.modifiers import ModifierManager + EXECUTOR_TYPE = os.environ.get("A_SYNC_EXECUTOR_TYPE", "threads") EXECUTOR_VALUE = int(os.environ.get("A_SYNC_EXECUTOR_VALUE", 8)) -default_sync_executor: Executor -if EXECUTOR_TYPE.lower().startswith('p'): # p, P, proc, Processes, etc - default_sync_executor = ProcessPoolExecutor(EXECUTOR_VALUE) -elif EXECUTOR_TYPE.lower().startswith('t'): # t, T, thread, THREADS, etc - default_sync_executor = ThreadPoolExecutor(EXECUTOR_VALUE) -else: +@functools.lru_cache(maxsize=1) +def get_default_executor() -> Executor: + if EXECUTOR_TYPE.lower().startswith('p'): # p, P, proc, Processes, etc + return ProcessPoolExecutor(EXECUTOR_VALUE) + elif EXECUTOR_TYPE.lower().startswith('t'): # t, T, thread, THREADS, etc + return ThreadPoolExecutor(EXECUTOR_VALUE) raise ValueError("Invalid value for A_SYNC_EXECUTOR_TYPE. Please use 'threads' or 'processes'.") + +default_sync_executor = get_default_executor() + +null_modifiers = ModifierManager( + ModifierKwargs( + default=None, + cache_type=None, + cache_typed=False, + ram_cache_maxsize=-1, + ram_cache_ttl=None, + runs_per_minute=None, + semaphore=None, + executor=default_sync_executor, + ) +) + +DEFAULT_MODE = os.environ.get("A_SYNC_DEFAULT_MODE") +CACHE_TYPE = typ if (typ := os.environ.get("A_SYNC_CACHE_TYPE", "").lower()) else null_modifiers.cache_type +CACHE_TYPED = bool(os.environ.get("A_SYNC_CACHE_TYPED")) +RAM_CACHE_MAXSIZE = int(os.environ.get("A_SYNC_RAM_CACHE_MAXSIZE", -1)) +RAM_CACHE_TTL = ttl if (ttl := float(os.environ.get("A_SYNC_RAM_CACHE_TTL", 0))) else null_modifiers.ram_cache_ttl + +RUNS_PER_MINUTE = rpm if (rpm := int(os.environ.get("A_SYNC_RUNS_PER_MINUTE", 0))) else null_modifiers.runs_per_minute +SEMAPHORE = rpm if (rpm := int(os.environ.get("A_SYNC_SEMAPHORE", 0))) else null_modifiers.semaphore + +default_modifiers = ModifierManager( + ModifierKwargs( + default=DEFAULT_MODE, + cache_type=CACHE_TYPE, + cache_typed=CACHE_TYPED, + ram_cache_maxsize=RAM_CACHE_MAXSIZE, + ram_cache_ttl=RAM_CACHE_TTL, + runs_per_minute=RUNS_PER_MINUTE, + semaphore=SEMAPHORE, + executor=default_sync_executor, + ) +) diff --git a/a_sync/decorator.py b/a_sync/decorator.py index 8e09e19b..2b92df98 100644 --- a/a_sync/decorator.py +++ b/a_sync/decorator.py @@ -1,17 +1,12 @@ -import asyncio -import functools -from concurrent.futures._base import Executor -from typing import (Awaitable, Callable, Literal, Optional, TypeVar, Union, - overload) +from typing import Awaitable, Callable, Literal, Optional, Union, overload -from typing_extensions import ParamSpec # type: ignore [attr-defined] +from typing_extensions import Unpack # type: ignore [attr-defined] -from a_sync import (_flags, _helpers, _kwargs, config, exceptions, - rate_limiting, semaphores) - -P = ParamSpec("P") -T = TypeVar("T") +from a_sync import _flags, config +from a_sync._typing import ModifierKwargs, P, T +from a_sync.modified import ASyncDecorator +from a_sync.modifiers import ModifierManager ######################## # The a_sync decorator # @@ -29,22 +24,14 @@ def a_sync( coro_fn: Callable[P, Awaitable[T]] = None, # type: ignore [misc] default: Literal[None] = None, - # async settings - runs_per_minute: Optional[int] = None, - semaphore: semaphores.SemaphoreSpec = semaphores.dummy_semaphore, - # sync settings - executor: Executor = config.default_sync_executor, + **modifiers: Unpack[ModifierKwargs], ) -> Callable[P, Awaitable[T]]:... # type: ignore [misc] @overload # sync def none default def a_sync( # type: ignore [misc] coro_fn: Callable[P, T] = None, # type: ignore [misc] default: Literal[None] = None, - # async settings - runs_per_minute: Optional[int] = None, - semaphore: semaphores.SemaphoreSpec = semaphores.dummy_semaphore, - # sync settings - executor: Executor = config.default_sync_executor, + **modifiers: Unpack[ModifierKwargs], ) -> Callable[P, T]:... # type: ignore [misc] # @a_sync(default='async') @@ -61,22 +48,14 @@ def a_sync( # type: ignore [misc] def a_sync( coro_fn: Literal[None] = None, default: Literal['async'] = None, - # async settings - runs_per_minute: Optional[int] = None, - semaphore: semaphores.SemaphoreSpec = semaphores.dummy_semaphore, - # sync settings - executor: Executor = config.default_sync_executor, + **modifiers: Unpack[ModifierKwargs], ) -> Callable[[Union[Callable[P, Awaitable[T]], Callable[P, T]]], Callable[P, Awaitable[T]]]:... # type: ignore [misc] @overload # if you try to use default as the only arg def a_sync( coro_fn: Literal['async'] = None, default: Literal[None] = None, - # async settings - runs_per_minute: Optional[int] = None, - semaphore: semaphores.SemaphoreSpec = semaphores.dummy_semaphore, - # sync settings - executor: Executor = config.default_sync_executor, + **modifiers: Unpack[ModifierKwargs], ) -> Callable[[Union[Callable[P, Awaitable[T]], Callable[P, T]]], Callable[P, Awaitable[T]]]:... # type: ignore [misc] # a_sync(some_fn, default='async') @@ -85,22 +64,14 @@ def a_sync( def a_sync( coro_fn: Callable[P, Awaitable[T]] = None, # type: ignore [misc] default: Literal['async'] = None, - # async settings - runs_per_minute: Optional[int] = None, - semaphore: semaphores.SemaphoreSpec = semaphores.dummy_semaphore, - # sync settings - executor: Executor = config.default_sync_executor, + **modifiers: Unpack[ModifierKwargs], ) -> Callable[P, Awaitable[T]]:... # type: ignore [misc] @overload # sync def async default def a_sync( # type: ignore [misc] coro_fn: Callable[P, T] = None, # type: ignore [misc] default: Literal['async'] = None, - # async settings - runs_per_minute: Optional[int] = None, - semaphore: semaphores.SemaphoreSpec = semaphores.dummy_semaphore, - # sync settings - executor: Executor = config.default_sync_executor, + **modifiers: Unpack[ModifierKwargs], ) -> Callable[P, Awaitable[T]]:... # type: ignore [misc] # a_sync(some_fn, default='sync') @@ -109,22 +80,14 @@ def a_sync( # type: ignore [misc] def a_sync( coro_fn: Callable[P, Awaitable[T]] = None, # type: ignore [misc] default: Literal['sync'] = None, - # async settings - runs_per_minute: Optional[int] = None, - semaphore: semaphores.SemaphoreSpec = semaphores.dummy_semaphore, - # sync settings - executor: Executor = config.default_sync_executor, + **modifiers: Unpack[ModifierKwargs], ) -> Callable[P, T]:... # type: ignore [misc] @overload # sync def async default def a_sync( # type: ignore [misc] coro_fn: Callable[P, T] = None, # type: ignore [misc] default: Literal['sync'] = None, - # async settings - runs_per_minute: Optional[int] = None, - semaphore: semaphores.SemaphoreSpec = semaphores.dummy_semaphore, - # sync settings - executor: Executor = config.default_sync_executor, + **modifiers: Unpack[ModifierKwargs], ) -> Callable[P, T]:... # type: ignore [misc] # @a_sync(default='sync') @@ -141,33 +104,34 @@ def a_sync( # type: ignore [misc] def a_sync( # type: ignore [misc] coro_fn: Literal[None] = None, default: Literal['sync'] = None, - # async settings - runs_per_minute: Optional[int] = None, - semaphore: semaphores.SemaphoreSpec = semaphores.dummy_semaphore, - # sync settings - executor: Executor = config.default_sync_executor, + **modifiers: Unpack[ModifierKwargs], ) -> Callable[[Union[Callable[P, Awaitable[T]], Callable[P, T]]], Callable[P, T]]:... # type: ignore [misc] @overload # if you try to use default as the only arg def a_sync( coro_fn: Literal['sync'] = None, default: Literal[None] = None, - # async settings - runs_per_minute: Optional[int] = None, - semaphore: semaphores.SemaphoreSpec = semaphores.dummy_semaphore, - # sync settings - executor: Executor = config.default_sync_executor, + **modifiers: Unpack[ModifierKwargs], ) -> Callable[[Union[Callable[P, Awaitable[T]], Callable[P, T]]], Callable[P, T]]:... # type: ignore [misc] +''' +lib defaults: +async settings + cache_type: Optional[Literal['memory']] = None, + cache_typed: bool = False, + ram_cache_maxsize: Optional[int] = -1, + ram_cache_ttl: Optional[int] = None, + runs_per_minute: Optional[int] = None, + semaphore: semaphores.SemaphoreSpec = semaphores.dummy_semaphore, +sync settings + executor: Executor = config.default_sync_executor +''' + # catchall def a_sync( coro_fn: Optional[Union[Callable[P, Awaitable[T]], Callable[P, T]]] = None, # type: ignore [misc] - default: Literal['sync', 'async', None] = None, - # async settings - runs_per_minute: Optional[int] = None, - semaphore: semaphores.SemaphoreSpec = semaphores.dummy_semaphore, - # sync settings - executor: Executor = config.default_sync_executor, + default: Literal['sync', 'async', None] = config.DEFAULT_MODE, + **modifiers: Unpack[ModifierKwargs], # default values are set by passing these kwargs into a ModifierManager object. ) -> Union[ # type: ignore [misc] # sync coro_fn, default=None Callable[P, T], @@ -254,6 +218,7 @@ def some_fn(): some_fn() == True await some_fn(sync=False) == True """ + modifiers: ModifierManager = ModifierManager(modifiers) # If the dev tried passing a default as an arg instead of a kwarg, ie: @a_sync('sync')... if coro_fn in ['async', 'sync']: @@ -261,78 +226,9 @@ def some_fn(): coro_fn = None if default not in ['async', 'sync', None]: - raise ValueError(f"'default' must be either 'sync', 'async', or None. You passed {default}.") - - # Modifiers - Additional functionality will be added by stacking decorators here. - - def apply_async_modifiers(coro_fn: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]: - @functools.wraps(coro_fn) - @semaphores.apply_semaphore(semaphore) - @rate_limiting.apply_rate_limit(runs_per_minute) - async def async_modifier_wrap(*args: P.args, **kwargs: P.kwargs) -> T: - return await coro_fn(*args, **kwargs) - return async_modifier_wrap - - def apply_sync_modifiers(function: Callable[P, T]) -> Callable[P, T]: - @functools.wraps(function) - # NOTE There are no sync modifiers at this time but they will be added here for my convenience. - def sync_modifier_wrap(*args: P.args, **kwargs: P.kwargs) -> T: - return function(*args, **kwargs) - return sync_modifier_wrap - + raise # Decorator - def a_sync_deco(function: Callable[P, Awaitable[T]]) -> Union[Callable[P, Awaitable[T]], Callable[P, T]]: # type: ignore [misc] - - # First, are we using this decorator correctly? - _helpers._validate_wrapped_fn(function) - - # What kind of function are we decorating? - if asyncio.iscoroutinefunction(function): - # NOTE: The following code applies to async functions defined with 'async def' - - @functools.wraps(function) - def async_wrap(*args: P.args, **kwargs: P.kwargs) -> Union[Awaitable[T], T]: # type: ignore [name-defined] - should_await = _run_sync(kwargs, default or 'async') # Must take place before coro is created. - modified_function = apply_async_modifiers(function) - coro = modified_function(*args, **kwargs) - return apply_sync_modifiers(_helpers._sync)(coro) if should_await else coro - return async_wrap - - elif callable(function): - # NOTE: The following code applies to sync functions defined with 'def' - modified_sync_function = apply_sync_modifiers(function) - - @apply_async_modifiers - @functools.wraps(function) - async def create_awaitable(*args: P.args, **kwargs: P.kwargs) -> T: - return await apply_async_modifiers(asyncio.get_event_loop().run_in_executor)( - executor, modified_sync_function, *args, **kwargs - ) - - @functools.wraps(function) - def sync_wrap(*args: P.args, **kwargs: P.kwargs) -> Union[Awaitable[T], T]: # type: ignore [name-defined] - if _run_sync(kwargs, default = default or 'sync'): - return modified_sync_function(*args, **kwargs) - return create_awaitable(*args, **kwargs) - - return sync_wrap - - raise RuntimeError(f"a_sync's first arg must be callable. You passed {function}.") - + a_sync_deco = ASyncDecorator(modifiers, default) return a_sync_deco if coro_fn is None else a_sync_deco(coro_fn) - - -def _run_sync(kwargs: dict, default: Literal['sync', 'async']): - # If a flag was specified in the kwargs, we will defer to it. - try: - return _kwargs.is_sync(kwargs, pop_flag=True) - except exceptions.NoFlagsFound: - # No flag specified in the kwargs, we will defer to 'default'. - if default == 'sync': - return True - elif default == 'async': - return False - else: - raise NotImplementedError(default) diff --git a/a_sync/modified.py b/a_sync/modified.py new file mode 100644 index 00000000..dcfe757a --- /dev/null +++ b/a_sync/modified.py @@ -0,0 +1,75 @@ + +import functools + +from a_sync import _helpers, _kwargs, exceptions +from a_sync._typing import * +from a_sync.modifiers import ModifierManager + + +class ASyncDecorator(Modified[T]): + def __init__(self, modifiers: ModifierManager, default=None) -> None: + self.default = default + self.modifiers = modifiers + self.validate_inputs() + def __call__(self, func): + return ASyncFunction(func, self.modifiers, self.default) + def validate_inputs(self) -> None: + if self.default not in ['sync', 'async', None]: + raise ValueError(f"'default' must be either 'sync', 'async', or None. You passed {self.default}.") + if not isinstance(self.modifiers, ModifierManager): + raise TypeError(f"'modifiers should be of type 'ModifierManager'. You passed {self.modifiers}") + + +class ASyncFunction(Modified[T]): + def __init__(self, fn: Callable[P, T], modifiers: ModifierManager, default=None) -> None: + self.default = default + self.modifiers = modifiers + self.async_def = asyncio.iscoroutinefunction(fn) + self.decorated_fn = self.__decorate(fn) + + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Union[Awaitable[T], T]: + return self.decorated_fn(*args, **kwargs) + + def __decorate(self, func): + _helpers._validate_wrapped_fn(func) + if asyncio.iscoroutinefunction(func): + return self.__decorate_async(func) + elif callable(func): + return self.__decorate_sync(func) + raise NotImplementedError(f'Unable to decorate {func}') + + @property + def _sync_default(self) -> bool: + """If user did not specify a default, we defer to the function. 'def' vs 'async def'""" + return True if self.default == 'sync' else False if self.default == 'async' else not self.async_def + + def __run_sync(self, kwargs: dict): + try: + # If a flag was specified in the kwargs, we will defer to it. + return _kwargs.is_sync(kwargs, pop_flag=True) + except exceptions.NoFlagsFound: + # No flag specified in the kwargs, we will defer to 'default'. + return self._sync_default + + def __decorate_async(self, func): + modified_async_function = self.modifiers.apply_async_modifiers(func) + _await = self.modifiers.apply_sync_modifiers(_helpers._await) + @functools.wraps(func) + def async_wrap(*args: P.args, **kwargs: P.kwargs) -> Union[Awaitable[T], T]: # type: ignore [name-defined] + should_await = self.__run_sync(kwargs) # Must take place before coro is created, we're popping a kwarg. + coro = modified_async_function(*args, **kwargs) + return _await(coro) if should_await else coro + return async_wrap + + def __decorate_sync(self, func): + modified_sync_function = self.modifiers.apply_sync_modifiers(func) + @functools.wraps(func) + @self.modifiers.apply_async_modifiers + async def create_awaitable(*args: P.args, **kwargs: P.kwargs) -> T: + return await asyncio.get_event_loop().run_in_executor(self.modifiers.executor, modified_sync_function, *args, **kwargs) + @functools.wraps(func) + def sync_wrap(*args: P.args, **kwargs: P.kwargs) -> Union[Awaitable[T], T]: # type: ignore [name-defined] + if self.__run_sync(kwargs): + return modified_sync_function(*args, **kwargs) + return create_awaitable(*args, **kwargs) + return sync_wrap diff --git a/a_sync/modifiers.py b/a_sync/modifiers.py deleted file mode 100644 index 3e487f02..00000000 --- a/a_sync/modifiers.py +++ /dev/null @@ -1,11 +0,0 @@ - -import asyncio -from concurrent.futures._base import Executor -from typing import TypedDict, Union - -class Modifiers(TypedDict, total=False): - runs_per_minute: int - executor: Executor - semaphore: Union[int, asyncio.Semaphore] - -valid_modifiers = [key for key in Modifiers.__annotations__ if not key.startswith('_') and not key.endswith('_')] diff --git a/a_sync/modifiers/__init__.py b/a_sync/modifiers/__init__.py new file mode 100644 index 00000000..d724d638 --- /dev/null +++ b/a_sync/modifiers/__init__.py @@ -0,0 +1,19 @@ +from aiolimiter import AsyncLimiter + +from a_sync._typing import * +from a_sync.modifiers.manager import ModifierManager, valid_modifiers +from a_sync.semaphores import ThreadsafeSemaphore + + +def get_modifiers_from(thing: Union[dict, type, object]) -> ModifierKwargs: + if isinstance(thing, dict): + apply_class_defined_modifiers(thing) + return ModifierKwargs({modifier: thing[modifier] for modifier in valid_modifiers if modifier in thing}) + return ModifierKwargs({modifier: getattr(thing, modifier) for modifier in valid_modifiers if hasattr(thing, modifier)}) + +def apply_class_defined_modifiers(attrs_from_metaclass: dict): + if 'semaphore' in attrs_from_metaclass and isinstance(val := attrs_from_metaclass['semaphore'], int): + attrs_from_metaclass['semaphore'] = ThreadsafeSemaphore(val) + if "runs_per_minute" in attrs_from_metaclass and isinstance(val := attrs_from_metaclass['runs_per_minute'], int): + attrs_from_metaclass['runs_per_minute'] = AsyncLimiter(val) + \ No newline at end of file diff --git a/a_sync/modifiers/cache/__init__.py b/a_sync/modifiers/cache/__init__.py new file mode 100644 index 00000000..a86f0bce --- /dev/null +++ b/a_sync/modifiers/cache/__init__.py @@ -0,0 +1,62 @@ + +import asyncio + +from a_sync import exceptions +from a_sync._typing import * +from a_sync.modifiers.cache.memory import apply_async_memory_cache + + +class CacheArgs(TypedDict): + cache_type: Literal['memory', None] + cache_typed: bool + ram_cache_maxsize: Optional[int] + ram_cache_ttl: Optional[int] + +@overload +def apply_async_cache( + coro_fn: Literal[None] = None, + **modifiers: Unpack[CacheArgs], +) -> Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[T]]]:... + +@overload +def apply_async_cache( + coro_fn: int = None, + **modifiers: Unpack[CacheArgs], +) -> Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[T]]]:... + +@overload +def apply_async_cache( + coro_fn: Callable[P, Awaitable[T]] = None, + **modifiers: Unpack[CacheArgs], +) -> Callable[P, Awaitable[T]]:... + +def apply_async_cache( + coro_fn: Union[Callable[P, Awaitable[T]], Literal['memory']] = None, + cache_type: Literal['memory'] = 'memory', + cache_typed: bool = False, + ram_cache_maxsize: Optional[int] = None, + ram_cache_ttl: Optional[int] = None, +) -> Union[ + Callable[P, Awaitable[T]], + Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[T]]], +]: + + # Parse Inputs + if isinstance(coro_fn, int): + assert ram_cache_maxsize is None + ram_cache_maxsize = coro_fn + coro_fn = None + + # Validate + elif coro_fn is None: + if ram_cache_maxsize is not None and not isinstance(ram_cache_maxsize, int): + raise TypeError("'lru_cache_maxsize' must be an integer or None.", ram_cache_maxsize) + elif not asyncio.iscoroutinefunction(coro_fn): + raise exceptions.FunctionNotAsync(coro_fn) + + if cache_type == 'memory': + cache_decorator = apply_async_memory_cache(maxsize=ram_cache_maxsize, ttl=ram_cache_ttl, typed=cache_typed) + return cache_decorator if coro_fn is None else cache_decorator(coro_fn) + elif cache_type == 'disk': + pass + raise NotImplementedError(f"cache_type: {cache_type}") \ No newline at end of file diff --git a/a_sync/modifiers/cache/memory.py b/a_sync/modifiers/cache/memory.py new file mode 100644 index 00000000..e96202d6 --- /dev/null +++ b/a_sync/modifiers/cache/memory.py @@ -0,0 +1,67 @@ + + +import asyncio +from typing import Awaitable, Callable, Literal, Optional, Union, overload + +from async_lru import alru_cache + +from a_sync import exceptions +from a_sync._typing import P, T + + +@overload +def apply_async_memory_cache( + coro_fn: Literal[None] = None, + maxsize: int = None, + ttl: Optional[int] = None, + typed: bool = False, +) -> Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[T]]]:... + +@overload +def apply_async_memory_cache( + coro_fn: Literal[None] = None, + maxsize: Literal[None] = None, + ttl: int = None, + typed: bool = False, +) -> Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[T]]]:... + +@overload +def apply_async_memory_cache( + coro_fn: int = None, + maxsize: Literal[None] = None, + ttl: Optional[int] = None, + typed: bool = False, +) -> Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[T]]]:... + +@overload +def apply_async_memory_cache( + coro_fn: Callable[P, Awaitable[T]] = None, + maxsize: int = None, + ttl: Optional[int] = None, + typed: bool = False, +) -> Callable[P, Awaitable[T]]:... + +def apply_async_memory_cache( + coro_fn: Optional[Union[Callable[P, Awaitable[T]], int]] = None, + maxsize: Optional[int] = None, + ttl: Optional[int] = None, + typed: bool = False, +) -> Union[ + Callable[P, Awaitable[T]], + Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[T]]], +]: + # Parse Inputs + if isinstance(coro_fn, int): + assert maxsize is None + maxsize = coro_fn + coro_fn = None + + # Validate + elif coro_fn is None: + if maxsize is not None and not isinstance(maxsize, int): + raise TypeError("'lru_cache_maxsize' must be an integer or None.", maxsize) + elif not asyncio.iscoroutinefunction(coro_fn): + raise exceptions.FunctionNotAsync(coro_fn) + + cache_decorator = alru_cache(maxsize=maxsize, ttl=ttl, typed=typed) + return cache_decorator if coro_fn is None else cache_decorator(coro_fn) diff --git a/a_sync/rate_limiting.py b/a_sync/modifiers/limiter.py similarity index 78% rename from a_sync/rate_limiting.py rename to a_sync/modifiers/limiter.py index 83913fab..50a8ef30 100644 --- a/a_sync/rate_limiting.py +++ b/a_sync/modifiers/limiter.py @@ -1,17 +1,13 @@ import asyncio -from typing import (Awaitable, Callable, Literal, Optional, TypeVar, Union, - overload) +from typing import Awaitable, Callable, Literal, Optional, Union, overload from aiolimiter import AsyncLimiter -from typing_extensions import ParamSpec, TypeVar -from a_sync import exceptions, aliases +from a_sync import aliases, exceptions +from a_sync._typing import P, T + -T = TypeVar('T') -P = ParamSpec('P') - - @overload def apply_rate_limit( coro_fn: Literal[None] = None, @@ -30,6 +26,12 @@ def apply_rate_limit( runs_per_minute: int = None, ) -> Callable[P, Awaitable[T]]:... +@overload +def apply_rate_limit( + coro_fn: Callable[P, Awaitable[T]] = None, + runs_per_minute: AsyncLimiter = None, +) -> Callable[P, Awaitable[T]]:... + def apply_rate_limit( coro_fn: Optional[Union[Callable[P, Awaitable[T]], int]] = None, runs_per_minute: Optional[int] = None, @@ -51,7 +53,7 @@ def apply_rate_limit( raise exceptions.FunctionNotAsync(coro_fn) def rate_limit_decorator(coro_fn: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]: - limiter = AsyncLimiter(runs_per_minute) if runs_per_minute else aliases.dummy + limiter = runs_per_minute if isinstance(runs_per_minute, AsyncLimiter) else AsyncLimiter(runs_per_minute) if runs_per_minute else aliases.dummy async def rate_limit_wrap(*args: P.args, **kwargs: P.kwargs) -> T: async with limiter: return await coro_fn(*args, **kwargs) diff --git a/a_sync/modifiers/manager.py b/a_sync/modifiers/manager.py new file mode 100644 index 00000000..9b29c0f3 --- /dev/null +++ b/a_sync/modifiers/manager.py @@ -0,0 +1,72 @@ + +import functools +from typing import Any, Awaitable, Callable + +from a_sync import config, semaphores +from a_sync._typing import * +from a_sync.modifiers import cache, limiter + +valid_modifiers = [key for key in ModifierKwargs.__annotations__ if not key.startswith('_') and not key.endswith('_')] + +class ModifierManager: + def __init__(self, modifiers: ModifierKwargs = None) -> None: + if modifiers: + for key in modifiers.keys(): + if key not in valid_modifiers: + raise ValueError(f"'{key}' is not a supported modifier.") + self._modifiers = modifiers + else: + self._modifiers = ModifierKwargs() + def __repr__(self) -> str: + return str(self._modifiers) + def __getitem__(self, modifier_key: str): + return self._modifiers[modifier_key] + def __getattribute__(self, modifier_key: str) -> Any: + if modifier_key == '__dict__' or modifier_key not in valid_modifiers: + return super().__getattribute__(modifier_key) + try: + return self[modifier_key] + except: + return config.default_modifiers[modifier_key] + + @property + def use_limiter(self) -> bool: + return self.runs_per_minute != config.null_modifiers.runs_per_minute + @property + def use_semaphore(self) -> bool: + return self.semaphore != config.null_modifiers.semaphore + @property + def use_cache(self) -> bool: + return any([ + self.cache_type != config.null_modifiers.cache_type, + self.ram_cache_maxsize != config.null_modifiers.ram_cache_maxsize, + self.ram_cache_ttl != config.null_modifiers.ram_cache_ttl, + self.cache_typed != config.null_modifiers.cache_typed, + ]) + + def apply_async_modifiers(self, coro_fn: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]: + # NOTE: THESE STACK IN REVERSE ORDER + if self.use_limiter is not None: + coro_fn = limiter.apply_rate_limit(coro_fn, self.runs_per_minute) + if self.use_semaphore: + coro_fn = semaphores.apply_semaphore(coro_fn, self.semaphore) + if self.use_cache: + coro_fn = cache.apply_async_cache( + coro_fn, + cache_type=self.cache_type or 'memory', + cache_typed=self.cache_typed, + ram_cache_maxsize=self.ram_cache_maxsize, + ram_cache_ttl=self.ram_cache_ttl + ) + return coro_fn + + def apply_sync_modifiers(self, function: Callable[P, T]) -> Callable[P, T]: + @functools.wraps(function) + def sync_modifier_wrap(*args: P.args, **kwargs: P.kwargs) -> T: + return function(*args, **kwargs) + # NOTE There are no sync modifiers at this time but they will be added here for my convenience. + return sync_modifier_wrap + + + + diff --git a/a_sync/property.py b/a_sync/property.py index 2220275b..da51be2e 100644 --- a/a_sync/property.py +++ b/a_sync/property.py @@ -1,66 +1,77 @@ import asyncio -from typing import Callable, Literal, Optional, TypeVar, Union, overload import async_property as ap -from a_sync import modifiers -T = TypeVar('T') +from a_sync import config +from a_sync._typing import * +from a_sync.modified import Modified +from a_sync.modifiers import ModifierManager -class AsyncPropertyDescriptor(ap.base.AsyncPropertyDescriptor): - def __init__(self, _fget, field_name=None, **modifiers: modifiers.Modifiers): + +class PropertyDescriptor(Modified[T]): + def __init__(self, _fget, field_name=None, modifiers: ModifierManager = ModifierManager()): self.modifiers = modifiers super().__init__(_fget, field_name=field_name) + +class AsyncPropertyDescriptor(PropertyDescriptor[T], ap.base.AsyncPropertyDescriptor): + pass -class AsyncCachedPropertyDescriptor(ap.cached.AsyncCachedPropertyDescriptor): - def __init__(self, _fget, field_name=None, **modifiers: modifiers.Modifiers): - self.modifiers = modifiers - super().__init__(_fget, field_name=field_name) +class AsyncCachedPropertyDescriptor(PropertyDescriptor[T], ap.cached.AsyncCachedPropertyDescriptor): + pass @overload def a_sync_property( func: Literal[None] = None, - **modifiers: modifiers.Modifiers, -) -> Callable[[Callable[..., T]], AsyncPropertyDescriptor]:... + default: Literal['sync', 'async', None] = config.DEFAULT_MODE, + **modifiers: ModifierKwargs, +) -> Callable[[Callable[..., T]], AsyncPropertyDescriptor[T]]:... @overload def a_sync_property( func: Callable[..., T] = None, - **modifiers: modifiers.Modifiers, -) -> AsyncPropertyDescriptor:... + default: Literal['sync', 'async', None] = config.DEFAULT_MODE, + **modifiers: ModifierKwargs, +) -> AsyncPropertyDescriptor[T]:... def a_sync_property( func: Optional[Callable[..., T]] = None, - **modifiers: modifiers.Modifiers, + default: Literal['sync', 'async', None] = config.DEFAULT_MODE, + **modifiers: ModifierKwargs, ) -> Union[ - AsyncPropertyDescriptor, - Callable[[Callable[..., T]], AsyncPropertyDescriptor], + AsyncPropertyDescriptor[T], + Callable[[Callable[..., T]], AsyncPropertyDescriptor[T]], ]: - def modifier_wrap(func) -> AsyncPropertyDescriptor: - assert asyncio.iscoroutinefunction(func), 'Can only use with async def' - return AsyncPropertyDescriptor(func, **modifiers) + if func in ['sync', 'async']: + modifiers['default'] = func + func = None + def modifier_wrap(func) -> AsyncPropertyDescriptor[T]: + assert asyncio.iscoroutinefunction(func), func #'Can only use with async def' + return AsyncPropertyDescriptor(func, modifiers=ModifierManager(modifiers)) return modifier_wrap if func is None else modifier_wrap(func) @overload def a_sync_cached_property( func: Literal[None] = None, - **modifiers: modifiers.Modifiers, -) -> Callable[[Callable[..., T]], AsyncCachedPropertyDescriptor]:... + default: Literal['sync', 'async', None] = config.DEFAULT_MODE, + **modifiers: ModifierKwargs, +) -> Callable[[Callable[..., T]], AsyncCachedPropertyDescriptor[T]]:... @overload def a_sync_cached_property( func: Callable[..., T] = None, - **modifiers: modifiers.Modifiers, -) -> AsyncCachedPropertyDescriptor:... + default: Literal['sync', 'async', None] = config.DEFAULT_MODE, + **modifiers: ModifierKwargs, +) -> AsyncCachedPropertyDescriptor[T]:... def a_sync_cached_property( func: Optional[Callable[..., T]] = None, - **modifiers: modifiers.Modifiers, + **modifiers: ModifierKwargs, ) -> Union[ - AsyncCachedPropertyDescriptor, - Callable[[Callable[..., T]], AsyncCachedPropertyDescriptor], + AsyncCachedPropertyDescriptor[T], + Callable[[Callable[..., T]], AsyncCachedPropertyDescriptor[T]], ]: - def modifier_wrap(func) -> AsyncCachedPropertyDescriptor: + def modifier_wrap(func) -> AsyncCachedPropertyDescriptor[T]: assert asyncio.iscoroutinefunction(func), 'Can only use with async def' - return AsyncCachedPropertyDescriptor(func, **modifiers) + return AsyncCachedPropertyDescriptor(func, modifiers=ModifierManager(modifiers)) return modifier_wrap if func is None else modifier_wrap(func) diff --git a/a_sync/semaphores.py b/a_sync/semaphores.py index 67750624..be7add0e 100644 --- a/a_sync/semaphores.py +++ b/a_sync/semaphores.py @@ -2,14 +2,12 @@ import functools from collections import defaultdict from threading import Thread, current_thread -from typing import (Awaitable, Callable, DefaultDict, Literal, Optional, - TypeVar, Union, overload) +from typing import (Awaitable, Callable, DefaultDict, Literal, Optional, Union, + overload) -from typing_extensions import ParamSpec from a_sync import exceptions +from a_sync._typing import P, T -T = TypeVar('T') -P = ParamSpec('P') class ThreadsafeSemaphore(asyncio.Semaphore): """ @@ -21,13 +19,14 @@ class ThreadsafeSemaphore(asyncio.Semaphore): """ def __init__(self, value: Optional[int]) -> None: + assert isinstance(value, int), f"{value} should be an integer." self._value = value self.semaphores: DefaultDict[Thread, asyncio.Semaphore] = defaultdict(lambda: asyncio.Semaphore(value)) self.dummy = DummySemaphore() @property def use_dummy(self) -> bool: - return self.value is None + return self._value is None @property def semaphore(self) -> asyncio.Semaphore: diff --git a/requirements.txt b/requirements.txt index d5290591..20efcdf6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ aiolimiter==1.0.0 +async_lru==2.0.2 async_property==0.2.1 typing_extensions>=0.4.0 \ No newline at end of file