Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: cleanup Semaphore debug daemon #502

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions a_sync/primitives/locks/semaphore.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ This module provides various semaphore implementations, including a debug-enable
a dummy semaphore that does nothing, and a threadsafe semaphore for use in multi-threaded applications.
"""

import asyncio
import functools
from asyncio import CancelledError, Future, iscoroutinefunction, sleep
from collections import defaultdict, deque
from functools import wraps
from libc.string cimport strcpy
from libc.stdlib cimport malloc, free
from threading import Thread, current_thread
Expand Down Expand Up @@ -158,10 +158,10 @@ cdef class Semaphore(_DebugDaemonMixin):
async def limited():
return 1
"""
if not asyncio.iscoroutinefunction(fn):
if not iscoroutinefunction(fn):
raise TypeError(f"{fn} must be a coroutine function")

@functools.wraps(fn)
@wraps(fn)
async def semaphore_wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
async with self:
return await fn(*args, **kwargs)
Expand Down Expand Up @@ -218,7 +218,7 @@ cdef class Semaphore(_DebugDaemonMixin):
await fut
finally:
self._Semaphore__waiters.remove(fut)
except asyncio.exceptions.CancelledError:
except CancelledError:
if _is_not_cancelled(fut):
self._Semaphore__value += 1
self._c_wake_up_next()
Expand Down Expand Up @@ -260,7 +260,7 @@ cdef class Semaphore(_DebugDaemonMixin):
self._Semaphore__value = value

@property
def _waiters(self) -> List[asyncio.Future]:
def _waiters(self) -> List[Future]:
# required for subclass compatability
return self._Semaphore__waiters

Expand Down Expand Up @@ -304,20 +304,30 @@ cdef class Semaphore(_DebugDaemonMixin):
async def monitor():
await semaphore._debug_daemon()
"""
cdef set decorated = self._decorated
cdef object log = self.get_logger().debug
while self._Semaphore__waiters:
await asyncio.sleep(60)
self.get_logger().debug(
"%s has %s waiters for any of: %s",
self,
len(self),
self._decorated,
)


cdef inline bint _is_not_done(fut: asyncio.Future):
await sleep(60)
if len(decorated) == 1:
log(
"%s has %s waiters for %s",
self,
len(self),
next(iter(decorated)),
)
else:
log(
"%s has %s waiters for any of: %s",
self,
len(self),
decorated,
)


cdef inline bint _is_not_done(fut: Future):
return <str>fut._state == "PENDING"

cdef inline bint _is_not_cancelled(fut: asyncio.Future):
cdef inline bint _is_not_cancelled(fut: Future):
return <str>fut._state != "CANCELLED"


Expand Down
Loading