Skip to content

Commit

Permalink
Update semaphore.pyx
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler authored Dec 19, 2024
1 parent c9596f7 commit 10899be
Showing 1 changed file with 27 additions and 17 deletions.
44 changes: 27 additions & 17 deletions a_sync/primitives/locks/semaphore.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ This module provides various semaphore implementations, including a debug-enable
a dummy semaphore that does nothing, and a threadsafe semaphore for use in multi-threaded applications.
"""

import asyncio
import functools
from asyncio import CancelledError, Future, iscoroutinefunction, sleep
from collections import defaultdict, deque
from functools import wraps
from libc.string cimport strcpy
from libc.stdlib cimport malloc, free
from threading import Thread, current_thread
Expand Down Expand Up @@ -158,10 +158,10 @@ cdef class Semaphore(_DebugDaemonMixin):
async def limited():
return 1
"""
if not asyncio.iscoroutinefunction(fn):
if not iscoroutinefunction(fn):
raise TypeError(f"{fn} must be a coroutine function")
@functools.wraps(fn)
@wraps(fn)
async def semaphore_wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
async with self:
return await fn(*args, **kwargs)
Expand Down Expand Up @@ -218,7 +218,7 @@ cdef class Semaphore(_DebugDaemonMixin):
await fut
finally:
self._Semaphore__waiters.remove(fut)
except asyncio.exceptions.CancelledError:
except CancelledError:
if _is_not_cancelled(fut):
self._Semaphore__value += 1
self._c_wake_up_next()
Expand Down Expand Up @@ -260,7 +260,7 @@ cdef class Semaphore(_DebugDaemonMixin):
self._Semaphore__value = value
@property
def _waiters(self) -> List[asyncio.Future]:
def _waiters(self) -> List[Future]:
# required for subclass compatability
return self._Semaphore__waiters
Expand Down Expand Up @@ -304,20 +304,30 @@ cdef class Semaphore(_DebugDaemonMixin):
async def monitor():
await semaphore._debug_daemon()
"""
cdef set decorated = self._decorated
cdef object log = self.get_logger().debug
while self._Semaphore__waiters:
await asyncio.sleep(60)
self.get_logger().debug(
"%s has %s waiters for any of: %s",
self,
len(self),
self._decorated,
)


cdef inline bint _is_not_done(fut: asyncio.Future):
await sleep(60)
if len(decorated) == 1:
log(
"%s has %s waiters for %s",
self,
len(self),
next(iter(decorated)),
)
else:
log(
"%s has %s waiters for any of: %s",
self,
len(self),
decorated,
)


cdef inline bint _is_not_done(fut: Future):
return <str>fut._state == "PENDING"

cdef inline bint _is_not_cancelled(fut: asyncio.Future):
cdef inline bint _is_not_cancelled(fut: Future):
return <str>fut._state != "CANCELLED"


Expand Down

0 comments on commit 10899be

Please sign in to comment.