Skip to content

Commit

Permalink
chore: add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler committed Apr 22, 2024
1 parent 5d1ec30 commit 8899ada
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions a_sync/primitives/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,12 @@ def _validate_args(i: int, can_return_less: bool) -> None:
raise ValueError(f"`i` must be an integer greater than 1. You passed {i}")


_Args = Tuple[Any]
_Kwargs = Tuple[Tuple[str, Any]]
_Key = Tuple[_Args, _Kwargs]

class SmartFuture(asyncio.Future, Generic[T]):
def __init__(self, queue: "SmartProcessingQueue", key: "_Key", *, loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
def __init__(self, queue: "SmartProcessingQueue", key: _Key, *, loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
super().__init__(loop=loop)
self._queue = queue
self._key = key
Expand Down Expand Up @@ -206,6 +210,7 @@ def _get(self, heappop=heapq.heappop):
return heappop(self._queue)

class PriorityProcessingQueue(_PriorityQueueMixin[T], ProcessingQueue[T, V]):
# NOTE: WIP
async def put(self, priority: Any, *args: P.args, **kwargs: P.kwargs) -> "asyncio.Future[V]":
self._ensure_workers()
fut = asyncio.get_event_loop().create_future()
Expand All @@ -227,15 +232,14 @@ def _get(self, heapify=heapq.heapify, heappop=heapq.heappop):
heapify(self._queue)
# take the job with the most waiters
return heappop(self._queue)
def _get_key(self, *args, **kwargs) -> "_Key":
def _get_key(self, *args, **kwargs) -> _Key:
return (args, tuple((kwarg, kwargs[kwarg]) for kwarg in sorted(kwargs)))
def _create_future(self, key: "_Key") -> "asyncio.Future[V]":
def _create_future(self, key: _Key) -> "asyncio.Future[V]":
return SmartFuture(self, key, loop=asyncio.get_event_loop())

class VariablePriorityQueue(_VariablePriorityQueueMixin[T], asyncio.PriorityQueue):
"""A PriorityQueue subclass that allows priorities to be updated (or computed) on the fly"""

_Key = Tuple[Tuple[Any], Tuple[Tuple[str, Any]]]
# NOTE: WIP

class SmartProcessingQueue(_VariablePriorityQueueMixin[T], ProcessingQueue[Concatenate[T, P], V]):
"""A PriorityProcessingQueue subclass that will execute jobs with the most waiters first"""
Expand Down

0 comments on commit 8899ada

Please sign in to comment.