Skip to content

Commit

Permalink
feat: type stubs for primitives converted to cython
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler committed Nov 22, 2024
1 parent 07ac20c commit dba037b
Show file tree
Hide file tree
Showing 3 changed files with 330 additions and 0 deletions.
135 changes: 135 additions & 0 deletions a_sync/primitives/locks/counter.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from _typeshed import Incomplete
from a_sync.primitives._debug import _DebugDaemonMixin
from a_sync.primitives.locks.event import Event as Event
from typing import Iterable, Optional

class CounterLock(_DebugDaemonMixin):
"""
An async primitive that uses an internal counter to manage task synchronization.
A coroutine can `await counter.wait_for(3)` and it will wait until the internal counter >= 3.
If some other task executes `counter.value = 5` or `counter.set(5)`, the first coroutine will proceed as 5 >= 3.
The internal counter can only be set to a value greater than the current value.
See Also:
:class:`CounterLockCluster` for managing multiple :class:`CounterLock` instances.
"""
is_ready: Incomplete
def __init__(self, start_value: int = 0, name: Optional[str] = None) -> None:
'''
Initializes the :class:`CounterLock` with a starting value and an optional name.
Args:
start_value: The initial value of the counter.
name: An optional name for the counter, used in debug logs.
Examples:
>>> counter = CounterLock(start_value=0, name="example_counter")
>>> counter.value
0
'''
async def wait_for(self, value: int) -> bool:
"""
Waits until the counter reaches or exceeds the specified value.
This method will ensure the debug daemon is running if the counter is not ready.
Args:
value: The value to wait for.
Examples:
>>> counter = CounterLock(start_value=0)
>>> await counter.wait_for(5) # This will block until counter.value >= 5
See Also:
:meth:`CounterLock.set` to set the counter value.
"""
def set(self, value: int) -> None:
"""
Sets the counter to the specified value.
This method internally uses the `value` property to enforce that the new value must be strictly greater than the current value.
Args:
value: The value to set the counter to. Must be strictly greater than the current value.
Raises:
ValueError: If the new value is less than or equal to the current value.
Examples:
>>> counter = CounterLock(start_value=0)
>>> counter.set(5)
>>> counter.value
5
See Also:
:meth:`CounterLock.value` for direct value assignment.
"""
@property
def value(self) -> int:
"""
Gets the current value of the counter.
Examples:
>>> counter = CounterLock(start_value=0)
>>> counter.value
0
"""
@value.setter
def value(self, value: int) -> None:
"""
Sets the counter to a new value, waking up any waiters if the value increases beyond the value they are awaiting.
Args:
value: The new value of the counter.
Raises:
ValueError: If the new value is less than the current value.
Examples:
>>> counter = CounterLock(start_value=0)
>>> counter.value = 5
>>> counter.value
5
>>> counter.value = 3
Traceback (most recent call last):
...
ValueError: You cannot decrease the value.
"""

class CounterLockCluster:
"""
An asyncio primitive that represents a collection of :class:`CounterLock` objects.
`wait_for(i)` will wait until the value of all :class:`CounterLock` objects is >= i.
See Also:
:class:`CounterLock` for managing individual counters.
"""
locks: Incomplete
def __init__(self, counter_locks: Iterable[CounterLock]) -> None:
"""
Initializes the :class:`CounterLockCluster` with a collection of :class:`CounterLock` objects.
Args:
counter_locks: The :class:`CounterLock` objects to manage.
Examples:
>>> lock1 = CounterLock(start_value=0)
>>> lock2 = CounterLock(start_value=0)
>>> cluster = CounterLockCluster([lock1, lock2])
"""
async def wait_for(self, value: int) -> bool:
"""
Waits until the value of all :class:`CounterLock` objects in the cluster reaches or exceeds the specified value.
Args:
value: The value to wait for.
Examples:
>>> lock1 = CounterLock(start_value=0)
>>> lock2 = CounterLock(start_value=0)
>>> cluster = CounterLockCluster([lock1, lock2])
>>> await cluster.wait_for(5) # This will block until all locks have value >= 5
"""
28 changes: 28 additions & 0 deletions a_sync/primitives/locks/event.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from a_sync._typing import *
import asyncio
from a_sync.primitives._debug import _DebugDaemonMixin

class Event(asyncio.Event, _DebugDaemonMixin):
"""
An asyncio.Event with additional debug logging to help detect deadlocks.
This event class extends asyncio.Event by adding debug logging capabilities. It logs
detailed information about the event state and waiters, which can be useful for
diagnosing and debugging potential deadlocks.
"""
def __init__(self, name: str = '', debug_daemon_interval: int = 300, *, loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
"""
Initializes the Event.
Args:
name (str): An optional name for the event, used in debug logs.
debug_daemon_interval (int): The interval in seconds for the debug daemon to log information.
loop (Optional[asyncio.AbstractEventLoop]): The event loop to use.
"""
async def wait(self) -> Literal[True]:
"""
Wait until the event is set.
Returns:
True when the event is set.
"""
167 changes: 167 additions & 0 deletions a_sync/primitives/locks/semaphore.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from a_sync._typing import *
import asyncio
import functools
from _typeshed import Incomplete
from a_sync.primitives._debug import _DebugDaemonMixin
from threading import Thread as Thread

logger: Incomplete

class Semaphore(asyncio.Semaphore, _DebugDaemonMixin):
"""
A semaphore with additional debugging capabilities inherited from :class:`_DebugDaemonMixin`.
This semaphore includes debug logging capabilities that are activated when the semaphore has waiters.
It allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.
Example:
You can write this pattern:
```
semaphore = Semaphore(5)
async def limited():
async with semaphore:
return 1
```
like this:
```
semaphore = Semaphore(5)
@semaphore
async def limited():
return 1
```
See Also:
:class:`_DebugDaemonMixin` for more details on debugging capabilities.
"""
name: Incomplete
def __init__(self, value: int, name: Incomplete | None = None, **kwargs) -> None:
"""
Initialize the semaphore with a given value and optional name for debugging.
Args:
value: The initial value for the semaphore.
name (optional): An optional name used only to provide useful context in debug logs.
"""
def __call__(self, fn: CoroFn[P, T]) -> CoroFn[P, T]:
"""
Decorator method to wrap coroutine functions with the semaphore.
This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.
Example:
semaphore = Semaphore(5)
@semaphore
async def limited():
return 1
"""
def __len__(self) -> int: ...
def decorate(self, fn: CoroFn[P, T]) -> CoroFn[P, T]:
"""
Wrap a coroutine function to ensure it runs with the semaphore.
Example:
semaphore = Semaphore(5)
@semaphore
async def limited():
return 1
"""
async def acquire(self) -> Literal[True]:
"""
Acquire the semaphore, ensuring that debug logging is enabled if there are waiters.
If the semaphore value is zero or less, the debug daemon is started to log the state of the semaphore.
Returns:
True when the semaphore is successfully acquired.
"""

class DummySemaphore(asyncio.Semaphore):
"""
A dummy semaphore that implements the standard :class:`asyncio.Semaphore` API but does nothing.
This class is useful for scenarios where a semaphore interface is required but no actual synchronization is needed.
Example:
dummy_semaphore = DummySemaphore()
async def no_op():
async with dummy_semaphore:
return 1
"""
name: Incomplete
def __init__(self, name: Optional[str] = None) -> None:
"""
Initialize the dummy semaphore with an optional name.
Args:
name (optional): An optional name for the dummy semaphore.
"""
async def acquire(self) -> Literal[True]:
"""Acquire the dummy semaphore, which is a no-op."""
def release(self) -> None:
"""No-op release method."""
async def __aenter__(self):
"""No-op context manager entry."""
async def __aexit__(self, *args) -> None:
"""No-op context manager exit."""

class ThreadsafeSemaphore(Semaphore):
"""
A semaphore that works in a multi-threaded environment.
This semaphore ensures that the program functions correctly even when used with multiple event loops.
It provides a workaround for edge cases involving multiple threads and event loops by using a separate semaphore
for each thread.
Example:
semaphore = ThreadsafeSemaphore(5)
async def limited():
async with semaphore:
return 1
See Also:
:class:`Semaphore` for the base class implementation.
"""
semaphores: Incomplete
dummy: Incomplete
def __init__(self, value: Optional[int], name: Optional[str] = None) -> None:
"""
Initialize the threadsafe semaphore with a given value and optional name.
Args:
value: The initial value for the semaphore, should be an integer.
name (optional): An optional name for the semaphore.
"""
def __len__(self) -> int: ...
@functools.cached_property
def use_dummy(self) -> bool:
"""
Determine whether to use a dummy semaphore.
Returns:
True if the semaphore value is None, indicating the use of a dummy semaphore.
"""
@property
def semaphore(self) -> Semaphore:
"""
Returns the appropriate semaphore for the current thread.
NOTE: We can't cache this property because we need to check the current thread every time we access it.
Example:
semaphore = ThreadsafeSemaphore(5)
async def limited():
async with semaphore.semaphore:
return 1
"""
async def __aenter__(self) -> None: ...
async def __aexit__(self, *args) -> None: ...

0 comments on commit dba037b

Please sign in to comment.