Skip to content

Commit

Permalink
fix: PrioritySemaphore name-mangling issue (#447)
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler authored Nov 26, 2024
1 parent 1db391b commit b70bc75
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 70 deletions.
5 changes: 0 additions & 5 deletions a_sync/primitives/locks/counter.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,6 @@ cdef class CounterLock(_DebugDaemonMixin):
)
await asyncio.sleep(300)
def __dealloc__(self):
# Free the memory allocated for __name
if self.__name is not NULL:
free(self.__name)
class CounterLockCluster:
"""
Expand Down
1 change: 0 additions & 1 deletion a_sync/primitives/locks/prio_semaphore.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
cdef list _potential_lost_waiters
cdef object _top_priority
cdef object _context_manager_class
cdef list[_AbstractPrioritySemaphoreContextManager] __waiters
cdef object c_getitem(self, object priority)
cdef dict[object, int] _count_waiters(self)

Expand Down
63 changes: 36 additions & 27 deletions a_sync/primitives/locks/prio_semaphore.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
self._context_managers = {}
"""A dictionary mapping priorities to their context managers."""

self.__waiters = []
self._Semaphore__waiters = []
"""A heap queue of context managers, sorted by priority."""

# NOTE: This should (hopefully) be temporary
Expand Down Expand Up @@ -144,7 +144,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
context_manager = self._context_manager_class(
self, priority, name=self.name
)
heappush(self.__waiters, context_manager) # type: ignore [misc]
heappush(self._Semaphore__waiters, context_manager) # type: ignore [misc]
self._context_managers[priority] = context_manager
return self._context_managers[priority]

Expand All @@ -163,14 +163,17 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
cdef bint c_locked(self):
cdef _AbstractPrioritySemaphoreContextManager cm
cdef object w
return self._Semaphore__value == 0 or (
any(
cm._waiters and any(not w.cancelled() for w in cm._waiters)
for cm in (self._context_managers.values() or ())
)
if self._Semaphore__value == 0:
return True
cdef dict[object, _AbstractPrioritySemaphoreContextManager] cms = self._context_managers
if not cms:
return False
return any(
cm._Semaphore__waiters and any(not w.cancelled() for w in cm._Semaphore__waiters)
for cm in cms.values()
)

cdef dict[object, int] _count_waiters(self):
cdef dict[object, Py_ssize_t] _count_waiters(self):
"""Counts the number of waiters for each priority.
Returns:
Expand All @@ -181,10 +184,9 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
>>> semaphore._count_waiters()
"""
cdef _AbstractPrioritySemaphoreContextManager manager
return {
manager._priority: len(manager._waiters)
for manager in sorted(self.__waiters, key=lambda m: m._priority)
}
cdef list[_AbstractPrioritySemaphoreContextManager] waiters = self._Semaphore__waiters
return {manager._priority: len(manager._Semaphore__waiters) for manager in sorted(waiters)}


def _wake_up_next(self) -> None:
"""Wakes up the next waiter in line.
Expand All @@ -199,10 +201,16 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
>>> semaphore = _AbstractPrioritySemaphore(5)
>>> semaphore._wake_up_next()
"""
self._c_wake_up_next()

cdef void _c_wake_up_next(self):
cdef _AbstractPrioritySemaphoreContextManager manager
cdef Py_ssize_t start_len, end_len
cdef bint woke_up

cdef bint debug_logs = c_logger.isEnabledFor(DEBUG)
while self.__waiters:
manager = heappop(self.__waiters)
while <list>self._Semaphore__waiters:
manager = heappop(<list>self._Semaphore__waiters)
if len(manager) == 0:
# There are no more waiters, get rid of the empty manager
if debug_logs:
Expand All @@ -219,11 +227,11 @@ cdef class _AbstractPrioritySemaphore(Semaphore):

if debug_logs:
c_logger._log(DEBUG, "waking up next for %s", (manager._c_repr_no_parent_(), ))
if not manager._waiters:
c_logger._log(DEBUG, "not manager._waiters")
if not manager._Semaphore__waiters:
c_logger._log(DEBUG, "not manager._Semaphore__waiters")

while manager._waiters:
waiter = manager._waiters.popleft()
while manager._Semaphore__waiters:
waiter = manager._Semaphore__waiters.popleft()
self._potential_lost_waiters.remove(waiter)
if not waiter.done():
waiter.set_result(None)
Expand All @@ -242,7 +250,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore):

if end_len:
# There are still waiters, put the manager back
heappush(self.__waiters, manager) # type: ignore [misc]
heappush(<list>self._Semaphore__waiters, manager) # type: ignore [misc]
else:
# There are no more waiters, get rid of the empty manager
self._context_managers.pop(manager._priority)
Expand All @@ -255,6 +263,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
if not waiter.done():
waiter.set_result(None)
return
return

while self._potential_lost_waiters:
waiter = self._potential_lost_waiters.pop(0)
Expand Down Expand Up @@ -293,14 +302,14 @@ cdef class _AbstractPrioritySemaphoreContextManager(Semaphore):
>>> context_manager = _AbstractPrioritySemaphoreContextManager(parent_semaphore, priority=1)
"""

super().__init__(0, name=name)

self._parent = parent
"""The parent semaphore."""

self._priority = priority
"""The priority associated with this context manager."""

super().__init__(0, name=name)

def __repr__(self) -> str:
"""Returns a string representation of the context manager."""
return f"<{self.__class__.__name__} parent={self._parent} {self._priority_name}={self._priority} waiters={len(self)}>"
Expand Down Expand Up @@ -361,10 +370,10 @@ cdef class _AbstractPrioritySemaphoreContextManager(Semaphore):
async def __acquire(self) -> Literal[True]:
cdef object loop, fut
while self._parent._Semaphore__value <= 0:
if self._AbstractPrioritySemaphoreContextManager__waiters is None:
self._AbstractPrioritySemaphoreContextManager__waiters = deque()
if self._Semaphore__waiters is None:
self._Semaphore__waiters = deque()
fut = self._c_get_loop().create_future()
self._AbstractPrioritySemaphoreContextManager__waiters.append(fut)
self._Semaphore__waiters.append(fut)
self._parent._potential_lost_waiters.append(fut)
try:
await fut
Expand Down Expand Up @@ -406,7 +415,7 @@ cdef class _PrioritySemaphoreContextManager(_AbstractPrioritySemaphoreContextMan
def __cinit__(self):
self._priority_name = "priority"
# Semaphore.__cinit__(self)
self._AbstractPrioritySemaphoreContextManager__waiters = deque()
self._Semaphore__waiters = deque()
self._decorated: Set[str] = set()

def __lt__(self, _PrioritySemaphoreContextManager other) -> bool:
Expand Down Expand Up @@ -460,7 +469,7 @@ cdef class PrioritySemaphore(_AbstractPrioritySemaphore): # type: ignore [type-
self._context_managers = {}
"""A dictionary mapping priorities to their context managers."""

self.__waiters = []
self._Semaphore__waiters = []
"""A heap queue of context managers, sorted by priority."""

# NOTE: This should (hopefully) be temporary
Expand Down Expand Up @@ -503,7 +512,7 @@ cdef class PrioritySemaphore(_AbstractPrioritySemaphore): # type: ignore [type-
if <int>priority not in context_managers:
context_manager = _PrioritySemaphoreContextManager(self, <int>priority, name=self.name)
heappush(
<list[_PrioritySemaphoreContextManager]>self.__waiters,
<list[_PrioritySemaphoreContextManager]>self._Semaphore__waiters,
context_manager,
) # type: ignore [misc]
context_managers[<int>priority] = context_manager
Expand Down
5 changes: 3 additions & 2 deletions a_sync/primitives/locks/semaphore.pxd
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from a_sync.primitives._debug cimport _DebugDaemonMixin

cdef class Semaphore(_DebugDaemonMixin):
cdef str _name
cdef unsigned long long __value
cdef object _waiters
cdef object __waiters
cdef char* _name
cdef str decode_name(self)
cdef set _decorated
cdef dict __dict__
cpdef bint locked(self)
Expand Down
Loading

0 comments on commit b70bc75

Please sign in to comment.