diff --git a/a_sync/primitives/queue.py b/a_sync/primitives/queue.py index 7b710801..fff0b1c5 100644 --- a/a_sync/primitives/queue.py +++ b/a_sync/primitives/queue.py @@ -167,8 +167,12 @@ def _validate_args(i: int, can_return_less: bool) -> None: raise ValueError(f"`i` must be an integer greater than 1. You passed {i}") +_Args = Tuple[Any] +_Kwargs = Tuple[Tuple[str, Any]] +_Key = Tuple[_Args, _Kwargs] + class SmartFuture(asyncio.Future, Generic[T]): - def __init__(self, queue: "SmartProcessingQueue", key: "_Key", *, loop: Optional[asyncio.AbstractEventLoop] = None) -> None: + def __init__(self, queue: "SmartProcessingQueue", key: _Key, *, loop: Optional[asyncio.AbstractEventLoop] = None) -> None: super().__init__(loop=loop) self._queue = queue self._key = key @@ -206,6 +210,7 @@ def _get(self, heappop=heapq.heappop): return heappop(self._queue) class PriorityProcessingQueue(_PriorityQueueMixin[T], ProcessingQueue[T, V]): + # NOTE: WIP async def put(self, priority: Any, *args: P.args, **kwargs: P.kwargs) -> "asyncio.Future[V]": self._ensure_workers() fut = asyncio.get_event_loop().create_future() @@ -227,15 +232,14 @@ def _get(self, heapify=heapq.heapify, heappop=heapq.heappop): heapify(self._queue) # take the job with the most waiters return heappop(self._queue) - def _get_key(self, *args, **kwargs) -> "_Key": + def _get_key(self, *args, **kwargs) -> _Key: return (args, tuple((kwarg, kwargs[kwarg]) for kwarg in sorted(kwargs))) - def _create_future(self, key: "_Key") -> "asyncio.Future[V]": + def _create_future(self, key: _Key) -> "asyncio.Future[V]": return SmartFuture(self, key, loop=asyncio.get_event_loop()) class VariablePriorityQueue(_VariablePriorityQueueMixin[T], asyncio.PriorityQueue): """A PriorityQueue subclass that allows priorities to be updated (or computed) on the fly""" - -_Key = Tuple[Tuple[Any], Tuple[Tuple[str, Any]]] + # NOTE: WIP class SmartProcessingQueue(_VariablePriorityQueueMixin[T], ProcessingQueue[Concatenate[T, P], V]): """A PriorityProcessingQueue subclass that will execute jobs with the most waiters first"""