diff --git a/a_sync/primitives/_debug.pxd b/a_sync/primitives/_debug.pxd index 164ee3a4..cf77d705 100644 --- a/a_sync/primitives/_debug.pxd +++ b/a_sync/primitives/_debug.pxd @@ -7,3 +7,4 @@ cdef class _LoopBoundMixin(_LoggerMixin): cdef class _DebugDaemonMixin(_LoopBoundMixin): cdef object _daemon + cdef object _c_ensure_debug_daemon(self, tuple[object] args, dict[str, object] kwargs) diff --git a/a_sync/primitives/_debug.pyx b/a_sync/primitives/_debug.pyx index 7a00cb25..b94bfd98 100644 --- a/a_sync/primitives/_debug.pyx +++ b/a_sync/primitives/_debug.pyx @@ -153,6 +153,9 @@ cdef class _DebugDaemonMixin(_LoopBoundMixin): See Also: :meth:`_start_debug_daemon` for starting the daemon. """ + return self._c_ensure_debug_daemon(args, kwargs) + + cdef object _c_ensure_debug_daemon(self, tuple[object] args, dict[str, object] kwargs): cdef object daemon = self._daemon if daemon is None: if self.check_debug_logs_enabled(): diff --git a/a_sync/primitives/locks/__init__.pxd b/a_sync/primitives/locks/__init__.pxd index 045c7d34..45c78254 100644 --- a/a_sync/primitives/locks/__init__.pxd +++ b/a_sync/primitives/locks/__init__.pxd @@ -1,2 +1,8 @@ -from a_sync.primitives.locks.counter import CounterLock -from a_sync.primitives.locks.event cimport CythonEvent as Event \ No newline at end of file +from a_sync.primitives.locks.counter cimport CounterLock +from a_sync.primitives.locks.event cimport CythonEvent as Event +from a_sync.primitives.locks.prio_semaphore cimport PrioritySemaphore +from a_sync.primitives.locks.semaphore cimport ( + DummySemaphore, + Semaphore, + ThreadsafeSemaphore, +) diff --git a/a_sync/primitives/locks/__init__.py b/a_sync/primitives/locks/__init__.py index 1cbec00e..d51dfe89 100644 --- a/a_sync/primitives/locks/__init__.py +++ b/a_sync/primitives/locks/__init__.py @@ -1,8 +1,8 @@ from a_sync.primitives.locks.counter import CounterLock from a_sync.primitives.locks.event import CythonEvent as Event +from a_sync.primitives.locks.prio_semaphore import PrioritySemaphore from a_sync.primitives.locks.semaphore import ( DummySemaphore, Semaphore, ThreadsafeSemaphore, ) -from a_sync.primitives.locks.prio_semaphore import PrioritySemaphore diff --git a/a_sync/primitives/locks/counter.pyx b/a_sync/primitives/locks/counter.pyx index 6e21ed71..b8c25b63 100644 --- a/a_sync/primitives/locks/counter.pyx +++ b/a_sync/primitives/locks/counter.pyx @@ -102,7 +102,7 @@ cdef class CounterLock(_DebugDaemonMixin): :meth:`CounterLock.set` to set the counter value. """ if not self.is_ready(value): - self._ensure_debug_daemon() + self._c_ensure_debug_daemon((),{}) await self._events[value].c_wait() return True diff --git a/a_sync/primitives/locks/event.pyx b/a_sync/primitives/locks/event.pyx index 43b50f73..4179b699 100644 --- a/a_sync/primitives/locks/event.pyx +++ b/a_sync/primitives/locks/event.pyx @@ -145,7 +145,7 @@ cdef class CythonEvent(_DebugDaemonMixin): if self._value: return _return_true() - self._ensure_debug_daemon() + self._c_ensure_debug_daemon((),{}) cdef object fut = self._c_get_loop().create_future() self._waiters.append(fut) diff --git a/a_sync/primitives/locks/prio_semaphore.pxd b/a_sync/primitives/locks/prio_semaphore.pxd new file mode 100644 index 00000000..5b1cd10c --- /dev/null +++ b/a_sync/primitives/locks/prio_semaphore.pxd @@ -0,0 +1,24 @@ +from a_sync.primitives.locks.semaphore cimport Semaphore + +cdef class _AbstractPrioritySemaphore(Semaphore): + cdef dict[object, _AbstractPrioritySemaphoreContextManager] _context_managers + cdef Py_ssize_t _capacity + cdef list _potential_lost_waiters + cdef object _top_priority + cdef object _context_manager_class + cdef list[_AbstractPrioritySemaphoreContextManager] __waiters + cdef object c_getitem(self, object priority) + cdef dict[object, int] _count_waiters(self) + +cdef class _AbstractPrioritySemaphoreContextManager(Semaphore): + cdef _AbstractPrioritySemaphore _parent + cdef object _priority + cdef str _priority_name + cpdef str _repr_no_parent_(self) + cdef str _c_repr_no_parent_(self) + +cdef class _PrioritySemaphoreContextManager(_AbstractPrioritySemaphoreContextManager): + pass + +cdef class PrioritySemaphore(_AbstractPrioritySemaphore): + pass diff --git a/a_sync/primitives/locks/prio_semaphore.pyx b/a_sync/primitives/locks/prio_semaphore.pyx index 4976647d..9bf7e44c 100644 --- a/a_sync/primitives/locks/prio_semaphore.pyx +++ b/a_sync/primitives/locks/prio_semaphore.pyx @@ -40,31 +40,6 @@ cdef class _AbstractPrioritySemaphore(Semaphore): :class:`PrioritySemaphore` for an implementation using numeric priorities. """ - name: Optional[str] - cdef dict[object, _AbstractPrioritySemaphoreContextManager] _context_managers - cdef Py_ssize_t _capacity - cdef list _potential_lost_waiters - cdef object _top_priority - cdef object _context_manager_class - cdef list[_AbstractPrioritySemaphoreContextManager] __waiters - - #@property - #def _context_manager_class( - # self, - #) -> Type["_AbstractPrioritySemaphoreContextManager[PT]"]: - # raise NotImplementedError - - #@property - #def _top_priority(self) -> PT: - # """Defines the top priority for the semaphore. - # - # Subclasses must implement this property to specify the default top priority. - # - # Raises: - # NotImplementedError: If not implemented in a subclass. - # """ - # raise NotImplementedError - def __cinit__(self) -> None: self._context_managers = {} """A dictionary mapping priorities to their context managers.""" @@ -277,12 +252,6 @@ cdef class _AbstractPrioritySemaphoreContextManager(Semaphore): the acquisition and release of the semaphore for waiters with that priority. """ - _loop: asyncio.AbstractEventLoop - __waiters: Deque[asyncio.Future] # type: ignore [assignment] - cdef _AbstractPrioritySemaphore _parent - cdef object _priority - cdef str _priority_name - def __init__( self, parent: _AbstractPrioritySemaphore, @@ -358,12 +327,12 @@ cdef class _AbstractPrioritySemaphoreContextManager(Semaphore): >>> await context_manager.acquire() """ if self._parent._Semaphore__value <= 0: - self._ensure_debug_daemon() + self._c_ensure_debug_daemon((),{}) return self.__acquire() cdef object c_acquire(self): if self._parent._Semaphore__value <= 0: - self._ensure_debug_daemon() + self._c_ensure_debug_daemon((),{}) return self.__acquire() async def __acquire(self) -> Literal[True]: diff --git a/a_sync/primitives/locks/semaphore.pxd b/a_sync/primitives/locks/semaphore.pxd index 49693a7d..6ac023cc 100644 --- a/a_sync/primitives/locks/semaphore.pxd +++ b/a_sync/primitives/locks/semaphore.pxd @@ -14,4 +14,11 @@ cdef class Semaphore(_DebugDaemonMixin): cdef void c_release(self) cpdef void _wake_up_next(self) cdef void _c_wake_up_next(self) - \ No newline at end of file + +cdef class DummySemaphore(Semaphore): + pass + +cdef class ThreadsafeSemaphore(Semaphore): + cdef object semaphores, dummy + cdef bint use_dummy + cdef Semaphore c_get_semaphore(self) diff --git a/a_sync/primitives/locks/semaphore.pyx b/a_sync/primitives/locks/semaphore.pyx index 48c93bad..f84a6f0a 100644 --- a/a_sync/primitives/locks/semaphore.pyx +++ b/a_sync/primitives/locks/semaphore.pyx @@ -166,7 +166,7 @@ cdef class Semaphore(_DebugDaemonMixin): True when the semaphore is successfully acquired. """ if self.__value <= 0: - self._ensure_debug_daemon() + self._c_ensure_debug_daemon((),{}) if not self.c_locked(): self.__value -= 1 @@ -189,11 +189,11 @@ cdef class Semaphore(_DebugDaemonMixin): except asyncio.exceptions.CancelledError: if not fut.cancelled(): self.__value += 1 - self._wake_up_next() + self._c_wake_up_next() raise if self.__value > 0: - self._wake_up_next() + self._c_wake_up_next() return True @@ -204,11 +204,11 @@ cdef class Semaphore(_DebugDaemonMixin): become larger than zero again, wake up that coroutine. """ self.__value += 1 - self._wake_up_next() + self._c_wake_up_next() cdef void c_release(self): self.__value += 1 - self._wake_up_next() + self._c_wake_up_next() @property def name(self) -> str: @@ -350,9 +350,6 @@ cdef class ThreadsafeSemaphore(Semaphore): :class:`Semaphore` for the base class implementation. """ - cdef object semaphores, dummy - cdef bint use_dummy - def __init__(self, value: Optional[int], name: Optional[str] = None) -> None: """ Initialize the threadsafe semaphore with a given value and optional name.