Skip to content

Commit

Permalink
feat: use heap in CounterLock (#428)
Browse files Browse the repository at this point in the history
* feat: use heap in CounterLock

* feat: use c_set instead of set
  • Loading branch information
BobTheBuidler authored Nov 22, 2024
1 parent 4d46782 commit c8e7b4d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
3 changes: 2 additions & 1 deletion a_sync/primitives/locks/counter.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ from a_sync.primitives.locks.event cimport CythonEvent as Event
cdef class CounterLock(_DebugDaemonMixin):
cdef char* __name
cdef long long _value
cdef list _heap
cdef dict[long long, Event] _events
cpdef bint is_ready(self, long long v)
cdef bint c_is_ready(self, long long v)
cpdef void set(self, long long value)
cdef void c_set(self, long long value)
cdef void c_set(self, long long value)
22 changes: 13 additions & 9 deletions a_sync/primitives/locks/counter.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ These primitives manage synchronization of tasks that must wait for an internal
"""

import asyncio
from collections import defaultdict
import heapq
from libc.string cimport strcpy
from libc.stdlib cimport malloc, free
from libc.time cimport time
from typing import DefaultDict, Iterable
from typing import Iterable

from a_sync.primitives._debug cimport _DebugDaemonMixin
from a_sync.primitives.locks.event cimport CythonEvent as Event
Expand All @@ -35,6 +35,8 @@ cdef class CounterLock(_DebugDaemonMixin):
self._events = {}
"""A defaultdict that maps each awaited value to an :class:`Event` that manages the waiters for that value."""

self._heap = []

def __init__(self, start_value: int = 0, str name = ""):
"""
Initializes the :class:`CounterLock` with a starting value and an optional name.
Expand Down Expand Up @@ -111,6 +113,7 @@ cdef class CounterLock(_DebugDaemonMixin):
if event is None:
event = Event()
self._events[value] = event
heapq.heappush(self._heap, value)
self._c_ensure_debug_daemon((),{})
await (<Event>event).c_wait()
return True
Expand Down Expand Up @@ -174,15 +177,16 @@ cdef class CounterLock(_DebugDaemonMixin):
self.c_set(value)
cdef void c_set(self, long long value):
cdef long long key
if value > self._value:
self._value = value
ready = [
self._events.pop(key)
for key in list(self._events.keys())
if key <= self._value
]
for event in ready:
event.set()
while self._heap:
key = heapq.heappop(self._heap)
if key <= self._value:
(<Event>self._events.pop(key)).c_set()
else:
heapq.heappush(self._heap, key)
return
elif value < self._value:
raise ValueError("You cannot decrease the value.")
Expand Down

0 comments on commit c8e7b4d

Please sign in to comment.