Skip to content

Commit

Permalink
Big sloppy commit - some feats, some fixes, some refactorings (#17)
Browse files Browse the repository at this point in the history
* feat: wrap sync and async bound methods on ASyncABC subclasses, allow modifier overriding

* feat: lru cache

* feat: env vars to set system wide defaults

* feat: improve type hint accuracy

* fix: read modifiers from class def

* fix: semaphore

* fix: missing async_lru dep

* fix: default as first arg for property and cached property

* chore: rename Modifiers -> ModifierManager and cleanup imports
  • Loading branch information
BobTheBuidler authored Mar 1, 2023
1 parent 9d81bbf commit 3b1c82e
Show file tree
Hide file tree
Showing 17 changed files with 500 additions and 259 deletions.
37 changes: 16 additions & 21 deletions a_sync/_bound.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@

import functools
from inspect import isawaitable
from typing import Awaitable, Callable, TypeVar, overload

from typing_extensions import ParamSpec # type: ignore [attr-defined]
from typing import Awaitable, Callable, overload

from a_sync import _helpers
from a_sync._typing import *
from a_sync.decorator import a_sync as unbound_a_sync
from a_sync.modifiers import Modifiers
from a_sync.property import (AsyncCachedPropertyDescriptor,
AsyncPropertyDescriptor)

P = ParamSpec("P")
T = TypeVar("T")


def _wrap_bound_method(coro_fn: Callable[P, T], **modifiers: Modifiers) -> Callable[P, T]: # type: ignore [misc]
def _wrap_bound_method(coro_fn: Callable[P, T], **modifiers: ModifierKwargs) -> Callable[P, T]: # type: ignore [misc]
from a_sync.abstract import ASyncABC

# First we wrap the coro_fn so overriding kwargs are handled automagically.
Expand All @@ -27,23 +22,23 @@ def bound_a_sync_wrap(self, *args: P.args, **kwargs: P.kwargs) -> T: # type: ig
raise RuntimeError(f"{self} must be an instance of a class that inherits from ASyncABC.")
# This could either be a coroutine or a return value from an awaited coroutine,
# depending on if an overriding kwarg was passed into the function call.
retval_or_coro = wrapped_coro_fn(self, *args, **kwargs)
if not isawaitable(retval_or_coro):
retval = coro = wrapped_coro_fn(self, *args, **kwargs)
if not isawaitable(retval):
# The coroutine was already awaited due to the use of an overriding kwarg.
# We can return the value.
return retval_or_coro
return retval
# The awaitable was not awaited, so now we need to check the flag as defined on 'self' and await if appropriate.
return _helpers._await_if_sync(retval_or_coro, self.__a_sync_should_await__(kwargs)) # type: ignore [call-overload]
return _helpers._await(coro) if self.__a_sync_should_await__(kwargs) else coro # type: ignore [call-overload]
return bound_a_sync_wrap


@overload
def _wrap_property(async_property: AsyncPropertyDescriptor, **modifiers: Modifiers) -> AsyncPropertyDescriptor:
def _wrap_property(async_property: AsyncPropertyDescriptor, **modifiers: ModifierKwargs) -> AsyncPropertyDescriptor:
...
@overload
def _wrap_property(async_property: AsyncCachedPropertyDescriptor, **modifiers: Modifiers) -> AsyncCachedPropertyDescriptor: # type: ignore [misc]
def _wrap_property(async_property: AsyncCachedPropertyDescriptor, **modifiers: ModifierKwargs) -> AsyncCachedPropertyDescriptor: # type: ignore [misc]
...
def _wrap_property(async_property, **modifiers: Modifiers) -> tuple:
def _wrap_property(async_property, **modifiers: ModifierKwargs) -> tuple:
if not isinstance(async_property, (AsyncPropertyDescriptor, AsyncCachedPropertyDescriptor)):
raise TypeError(f"{async_property} must be one of: AsyncPropertyDescriptor, AsyncCachedPropertyDescriptor")

Expand All @@ -52,19 +47,19 @@ def _wrap_property(async_property, **modifiers: Modifiers) -> tuple:
async_property.hidden_method_name = f"__{async_property.field_name}__"

@unbound_a_sync(**modifiers)
async def awaitable(instance: object) -> Awaitable[T]:
async def awaitable(instance: ASyncABC) -> Awaitable[T]:
return await async_property.__get__(instance, async_property)

@functools.wraps(async_property)
def a_sync_method(self, **kwargs):
def a_sync_method(self: ASyncABC, **kwargs) -> T:
if not isinstance(self, ASyncABC):
raise RuntimeError(f"{self} must be an instance of a class that inherits from ASyncABC.")
return _helpers._await_if_sync(awaitable(self), self.__a_sync_should_await__(kwargs))
return _helpers._await(awaitable(self)) if self.__a_sync_should_await__(kwargs) else awaitable(self)

@property # type: ignore [misc]
@functools.wraps(async_property)
def a_sync_property(self) -> T:
a_sync_method = getattr(self, async_property.hidden_method_name)
return _helpers._await_if_sync(a_sync_method(sync=False), self.__a_sync_should_await__({}))
def a_sync_property(self: ASyncABC) -> T:
coro = getattr(self, async_property.hidden_method_name)(sync=False)
return _helpers._await(coro) if self.__a_sync_should_await__({}) else coro

return a_sync_property, a_sync_method
21 changes: 3 additions & 18 deletions a_sync/_helpers.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@

import asyncio
from inspect import getfullargspec
from typing import Awaitable, Callable, Literal, TypeVar, Union, overload
from typing import Awaitable, Callable

from async_property.base import AsyncPropertyDescriptor
from async_property.cached import AsyncCachedPropertyDescriptor

from a_sync import _flags

T = TypeVar("T")
from a_sync._typing import T


def _validate_wrapped_fn(fn: Callable) -> None:
Expand All @@ -22,21 +21,7 @@ def _validate_wrapped_fn(fn: Callable) -> None:

running_event_loop_msg = f"You may want to make this an async function by setting one of the following kwargs: {_flags.VIABLE_FLAGS}"

@overload
def _await_if_sync(awaitable: Awaitable[T], sync: Literal[True]) -> T:...

@overload
def _await_if_sync(awaitable: Awaitable[T], sync: Literal[False]) -> Awaitable[T]:...

def _await_if_sync(awaitable: Awaitable[T], sync: bool) -> Union[T, Awaitable[T]]:
"""
If 'sync' is True, awaits the awaitable and returns the return value.
If 'sync' is False, simply returns the awaitable.
"""
return _sync(awaitable) if sync else awaitable


def _sync(awaitable: Awaitable[T]) -> T:
def _await(awaitable: Awaitable[T]) -> T:
try:
return asyncio.get_event_loop().run_until_complete(awaitable)
except RuntimeError as e:
Expand Down
40 changes: 21 additions & 19 deletions a_sync/_meta.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,37 @@

import threading
from abc import ABCMeta
from asyncio import iscoroutinefunction
from typing import Any, Dict, Tuple

from a_sync import _bound
from a_sync.property import (AsyncCachedPropertyDescriptor,
AsyncPropertyDescriptor)
from a_sync import _bound, modifiers
from a_sync.modified import ASyncFunction, Modified
from a_sync.property import PropertyDescriptor


class ASyncMeta(ABCMeta):
"""Any class with metaclass ASyncMeta will have its functions wrapped with a_sync upon class instantiation."""
def __new__(cls, name, bases, attrs):

# Wrap all methods with a_sync abilities
for attr_name, attr_value in list(attrs.items()):

# Read modifiers from class definition
fn_modifiers = cls.__a_sync_modifiers__ if hasattr(cls, '__a_sync_modifiers__') else {}

# Special handling for functions decorated with async_property and async_cached_property
if isinstance(attr_value, (AsyncPropertyDescriptor, AsyncCachedPropertyDescriptor)):
# Check for modifier overrides defined at the property decorator
fn_modifiers.update(attr_value.modifiers)
# Wrap property
attrs[attr_name], attrs[attr_value.hidden_method_name] = _bound._wrap_property(attr_value, **fn_modifiers)
elif iscoroutinefunction(attr_value):
# Read modifiers from class definition
# NOTE: Open uesion: what do we do when a parent class and subclass define the same modifier differently?
# Currently the parent value is used for functions defined on the parent,
# and the subclass value is used for functions defined on the subclass.
fn_modifiers = modifiers.get_modifiers_from(attrs)
# Special handling for functions decorated with a_sync decorators
if isinstance(attr_value, Modified):
# Check for modifier overrides defined on the Modified object
fn_modifiers.update(attr_value.modifiers._modifiers)
if isinstance(attr_value, PropertyDescriptor):
# Wrap property
attrs[attr_name], attrs[attr_value.hidden_method_name] = _bound._wrap_property(attr_value, **fn_modifiers)
elif isinstance(attr_value, ASyncFunction):
attrs[attr_name] = _bound._wrap_bound_method(attr_value, **fn_modifiers)
else:
raise NotImplementedError(attr_name, attr_value)

if callable(attr_value) and "__" not in attr_name and not attr_name.startswith("_"):
# NOTE We will need to improve this logic if somebody needs to use it with classmethods or staticmethods.
# TODO: update modifiers with override decorators (or maybe the main deco?)
# modifiers.update(overrides)
# Wrap bound method
attrs[attr_name] = _bound._wrap_bound_method(attr_value, **fn_modifiers)
return super(ASyncMeta, cls).__new__(cls, name, bases, attrs)

Expand Down
24 changes: 24 additions & 0 deletions a_sync/_typing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

import asyncio
from concurrent.futures._base import Executor
from typing import (Awaitable, Callable, Generic, Literal, Optional, TypedDict,
TypeVar, Union, overload)

from typing_extensions import ParamSpec, Unpack

T = TypeVar("T")
P = ParamSpec("P")

class ModifierKwargs(TypedDict, total=False):
default: Literal['sync', 'async', None]
cache_type: Literal['memory', None]
cache_typed: bool
ram_cache_maxsize: Optional[int]
ram_cache_ttl: Optional[int]
runs_per_minute: int
semaphore: Union[int, asyncio.Semaphore]
# sync modifiers
executor: Executor

class Modified(Generic[T]):
pass
5 changes: 3 additions & 2 deletions a_sync/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from a_sync import _flags, _kwargs, exceptions, modifiers
from a_sync._meta import ASyncMeta
from a_sync._typing import *


class ASyncABC(metaclass=ASyncMeta):
Expand Down Expand Up @@ -43,9 +44,9 @@ def __a_sync_instance_will_be_sync__(cls, kwargs: dict) -> bool:
######################################

@property
def __a_sync_modifiers__(thing: Union[type, object]) -> modifiers.Modifiers:
def __a_sync_modifiers__(self: "ASyncABC") -> ModifierKwargs:
"""You should not override this."""
return modifiers.Modifiers({modifier: getattr(thing, modifier) for modifier in modifiers.valid_modifiers if hasattr(thing, modifier)})
return modifiers.get_modifiers_from(self)

####################
# Abstract Methods #
Expand Down
55 changes: 48 additions & 7 deletions a_sync/config.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,56 @@

import functools
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from concurrent.futures._base import Executor

from a_sync._typing import *
from a_sync.modifiers import ModifierManager

EXECUTOR_TYPE = os.environ.get("A_SYNC_EXECUTOR_TYPE", "threads")
EXECUTOR_VALUE = int(os.environ.get("A_SYNC_EXECUTOR_VALUE", 8))

default_sync_executor: Executor
if EXECUTOR_TYPE.lower().startswith('p'): # p, P, proc, Processes, etc
default_sync_executor = ProcessPoolExecutor(EXECUTOR_VALUE)
elif EXECUTOR_TYPE.lower().startswith('t'): # t, T, thread, THREADS, etc
default_sync_executor = ThreadPoolExecutor(EXECUTOR_VALUE)
else:
@functools.lru_cache(maxsize=1)
def get_default_executor() -> Executor:
if EXECUTOR_TYPE.lower().startswith('p'): # p, P, proc, Processes, etc
return ProcessPoolExecutor(EXECUTOR_VALUE)
elif EXECUTOR_TYPE.lower().startswith('t'): # t, T, thread, THREADS, etc
return ThreadPoolExecutor(EXECUTOR_VALUE)
raise ValueError("Invalid value for A_SYNC_EXECUTOR_TYPE. Please use 'threads' or 'processes'.")

default_sync_executor = get_default_executor()

null_modifiers = ModifierManager(
ModifierKwargs(
default=None,
cache_type=None,
cache_typed=False,
ram_cache_maxsize=-1,
ram_cache_ttl=None,
runs_per_minute=None,
semaphore=None,
executor=default_sync_executor,
)
)

DEFAULT_MODE = os.environ.get("A_SYNC_DEFAULT_MODE")
CACHE_TYPE = typ if (typ := os.environ.get("A_SYNC_CACHE_TYPE", "").lower()) else null_modifiers.cache_type
CACHE_TYPED = bool(os.environ.get("A_SYNC_CACHE_TYPED"))
RAM_CACHE_MAXSIZE = int(os.environ.get("A_SYNC_RAM_CACHE_MAXSIZE", -1))
RAM_CACHE_TTL = ttl if (ttl := float(os.environ.get("A_SYNC_RAM_CACHE_TTL", 0))) else null_modifiers.ram_cache_ttl

RUNS_PER_MINUTE = rpm if (rpm := int(os.environ.get("A_SYNC_RUNS_PER_MINUTE", 0))) else null_modifiers.runs_per_minute
SEMAPHORE = rpm if (rpm := int(os.environ.get("A_SYNC_SEMAPHORE", 0))) else null_modifiers.semaphore

default_modifiers = ModifierManager(
ModifierKwargs(
default=DEFAULT_MODE,
cache_type=CACHE_TYPE,
cache_typed=CACHE_TYPED,
ram_cache_maxsize=RAM_CACHE_MAXSIZE,
ram_cache_ttl=RAM_CACHE_TTL,
runs_per_minute=RUNS_PER_MINUTE,
semaphore=SEMAPHORE,
executor=default_sync_executor,
)
)
Loading

0 comments on commit 3b1c82e

Please sign in to comment.