diff --git a/a_sync/primitives/locks/counter.pyi b/a_sync/primitives/locks/counter.pyi new file mode 100644 index 00000000..82a86bfb --- /dev/null +++ b/a_sync/primitives/locks/counter.pyi @@ -0,0 +1,135 @@ +from _typeshed import Incomplete +from a_sync.primitives._debug import _DebugDaemonMixin +from a_sync.primitives.locks.event import Event as Event +from typing import Iterable, Optional + +class CounterLock(_DebugDaemonMixin): + """ + An async primitive that uses an internal counter to manage task synchronization. + + A coroutine can `await counter.wait_for(3)` and it will wait until the internal counter >= 3. + If some other task executes `counter.value = 5` or `counter.set(5)`, the first coroutine will proceed as 5 >= 3. + + The internal counter can only be set to a value greater than the current value. + + See Also: + :class:`CounterLockCluster` for managing multiple :class:`CounterLock` instances. + """ + is_ready: Incomplete + def __init__(self, start_value: int = 0, name: Optional[str] = None) -> None: + ''' + Initializes the :class:`CounterLock` with a starting value and an optional name. + + Args: + start_value: The initial value of the counter. + name: An optional name for the counter, used in debug logs. + + Examples: + >>> counter = CounterLock(start_value=0, name="example_counter") + >>> counter.value + 0 + ''' + async def wait_for(self, value: int) -> bool: + """ + Waits until the counter reaches or exceeds the specified value. + + This method will ensure the debug daemon is running if the counter is not ready. + + Args: + value: The value to wait for. + + Examples: + >>> counter = CounterLock(start_value=0) + >>> await counter.wait_for(5) # This will block until counter.value >= 5 + + See Also: + :meth:`CounterLock.set` to set the counter value. + """ + def set(self, value: int) -> None: + """ + Sets the counter to the specified value. + + This method internally uses the `value` property to enforce that the new value must be strictly greater than the current value. + + Args: + value: The value to set the counter to. Must be strictly greater than the current value. + + Raises: + ValueError: If the new value is less than or equal to the current value. + + Examples: + >>> counter = CounterLock(start_value=0) + >>> counter.set(5) + >>> counter.value + 5 + + See Also: + :meth:`CounterLock.value` for direct value assignment. + """ + @property + def value(self) -> int: + """ + Gets the current value of the counter. + + Examples: + >>> counter = CounterLock(start_value=0) + >>> counter.value + 0 + """ + @value.setter + def value(self, value: int) -> None: + """ + Sets the counter to a new value, waking up any waiters if the value increases beyond the value they are awaiting. + + Args: + value: The new value of the counter. + + Raises: + ValueError: If the new value is less than the current value. + + Examples: + >>> counter = CounterLock(start_value=0) + >>> counter.value = 5 + >>> counter.value + 5 + >>> counter.value = 3 + Traceback (most recent call last): + ... + ValueError: You cannot decrease the value. + """ + +class CounterLockCluster: + """ + An asyncio primitive that represents a collection of :class:`CounterLock` objects. + + `wait_for(i)` will wait until the value of all :class:`CounterLock` objects is >= i. + + See Also: + :class:`CounterLock` for managing individual counters. + """ + locks: Incomplete + def __init__(self, counter_locks: Iterable[CounterLock]) -> None: + """ + Initializes the :class:`CounterLockCluster` with a collection of :class:`CounterLock` objects. + + Args: + counter_locks: The :class:`CounterLock` objects to manage. + + Examples: + >>> lock1 = CounterLock(start_value=0) + >>> lock2 = CounterLock(start_value=0) + >>> cluster = CounterLockCluster([lock1, lock2]) + """ + async def wait_for(self, value: int) -> bool: + """ + Waits until the value of all :class:`CounterLock` objects in the cluster reaches or exceeds the specified value. + + Args: + value: The value to wait for. + + Examples: + >>> lock1 = CounterLock(start_value=0) + >>> lock2 = CounterLock(start_value=0) + >>> cluster = CounterLockCluster([lock1, lock2]) + >>> await cluster.wait_for(5) # This will block until all locks have value >= 5 + """ diff --git a/a_sync/primitives/locks/event.pyi b/a_sync/primitives/locks/event.pyi new file mode 100644 index 00000000..651e645d --- /dev/null +++ b/a_sync/primitives/locks/event.pyi @@ -0,0 +1,28 @@ +from a_sync._typing import * +import asyncio +from a_sync.primitives._debug import _DebugDaemonMixin + +class Event(asyncio.Event, _DebugDaemonMixin): + """ + An asyncio.Event with additional debug logging to help detect deadlocks. + + This event class extends asyncio.Event by adding debug logging capabilities. It logs + detailed information about the event state and waiters, which can be useful for + diagnosing and debugging potential deadlocks. + """ + def __init__(self, name: str = '', debug_daemon_interval: int = 300, *, loop: Optional[asyncio.AbstractEventLoop] = None) -> None: + """ + Initializes the Event. + + Args: + name (str): An optional name for the event, used in debug logs. + debug_daemon_interval (int): The interval in seconds for the debug daemon to log information. + loop (Optional[asyncio.AbstractEventLoop]): The event loop to use. + """ + async def wait(self) -> Literal[True]: + """ + Wait until the event is set. + + Returns: + True when the event is set. + """ diff --git a/a_sync/primitives/locks/semaphore.pyi b/a_sync/primitives/locks/semaphore.pyi new file mode 100644 index 00000000..290a42c6 --- /dev/null +++ b/a_sync/primitives/locks/semaphore.pyi @@ -0,0 +1,167 @@ +from a_sync._typing import * +import asyncio +import functools +from _typeshed import Incomplete +from a_sync.primitives._debug import _DebugDaemonMixin +from threading import Thread as Thread + +logger: Incomplete + +class Semaphore(asyncio.Semaphore, _DebugDaemonMixin): + """ + A semaphore with additional debugging capabilities inherited from :class:`_DebugDaemonMixin`. + + This semaphore includes debug logging capabilities that are activated when the semaphore has waiters. + It allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator. + + Example: + You can write this pattern: + + ``` + semaphore = Semaphore(5) + + async def limited(): + async with semaphore: + return 1 + ``` + + like this: + + ``` + semaphore = Semaphore(5) + + @semaphore + async def limited(): + return 1 + ``` + + See Also: + :class:`_DebugDaemonMixin` for more details on debugging capabilities. + """ + name: Incomplete + def __init__(self, value: int, name: Incomplete | None = None, **kwargs) -> None: + """ + Initialize the semaphore with a given value and optional name for debugging. + + Args: + value: The initial value for the semaphore. + name (optional): An optional name used only to provide useful context in debug logs. + """ + def __call__(self, fn: CoroFn[P, T]) -> CoroFn[P, T]: + """ + Decorator method to wrap coroutine functions with the semaphore. + + This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator. + + Example: + semaphore = Semaphore(5) + + @semaphore + async def limited(): + return 1 + """ + def __len__(self) -> int: ... + def decorate(self, fn: CoroFn[P, T]) -> CoroFn[P, T]: + """ + Wrap a coroutine function to ensure it runs with the semaphore. + + Example: + semaphore = Semaphore(5) + + @semaphore + async def limited(): + return 1 + """ + async def acquire(self) -> Literal[True]: + """ + Acquire the semaphore, ensuring that debug logging is enabled if there are waiters. + + If the semaphore value is zero or less, the debug daemon is started to log the state of the semaphore. + + Returns: + True when the semaphore is successfully acquired. + """ + +class DummySemaphore(asyncio.Semaphore): + """ + A dummy semaphore that implements the standard :class:`asyncio.Semaphore` API but does nothing. + + This class is useful for scenarios where a semaphore interface is required but no actual synchronization is needed. + + Example: + dummy_semaphore = DummySemaphore() + + async def no_op(): + async with dummy_semaphore: + return 1 + """ + name: Incomplete + def __init__(self, name: Optional[str] = None) -> None: + """ + Initialize the dummy semaphore with an optional name. + + Args: + name (optional): An optional name for the dummy semaphore. + """ + async def acquire(self) -> Literal[True]: + """Acquire the dummy semaphore, which is a no-op.""" + def release(self) -> None: + """No-op release method.""" + async def __aenter__(self): + """No-op context manager entry.""" + async def __aexit__(self, *args) -> None: + """No-op context manager exit.""" + +class ThreadsafeSemaphore(Semaphore): + """ + A semaphore that works in a multi-threaded environment. + + This semaphore ensures that the program functions correctly even when used with multiple event loops. + It provides a workaround for edge cases involving multiple threads and event loops by using a separate semaphore + for each thread. + + Example: + semaphore = ThreadsafeSemaphore(5) + + async def limited(): + async with semaphore: + return 1 + + See Also: + :class:`Semaphore` for the base class implementation. + """ + semaphores: Incomplete + dummy: Incomplete + def __init__(self, value: Optional[int], name: Optional[str] = None) -> None: + """ + Initialize the threadsafe semaphore with a given value and optional name. + + Args: + value: The initial value for the semaphore, should be an integer. + name (optional): An optional name for the semaphore. + """ + def __len__(self) -> int: ... + @functools.cached_property + def use_dummy(self) -> bool: + """ + Determine whether to use a dummy semaphore. + + Returns: + True if the semaphore value is None, indicating the use of a dummy semaphore. + """ + @property + def semaphore(self) -> Semaphore: + """ + Returns the appropriate semaphore for the current thread. + + NOTE: We can't cache this property because we need to check the current thread every time we access it. + + Example: + semaphore = ThreadsafeSemaphore(5) + + async def limited(): + async with semaphore.semaphore: + return 1 + """ + async def __aenter__(self) -> None: ... + async def __aexit__(self, *args) -> None: ...