From 1793c382d37d9d635262fd7b93cab89846167ad0 Mon Sep 17 00:00:00 2001 From: gluker Date: Sun, 21 Apr 2024 22:59:43 +0300 Subject: [PATCH 1/2] QueueIterator: speedup via __anext_impl w/wo timeout in __init__ --- aio_pika/queue.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/aio_pika/queue.py b/aio_pika/queue.py index 2f0f9d45..6a09904c 100644 --- a/aio_pika/queue.py +++ b/aio_pika/queue.py @@ -484,6 +484,11 @@ def __init__(self, queue: Queue, **kwargs: Any): self._queue = asyncio.Queue() self._consume_kwargs = kwargs + if kwargs.get("timeout"): + self.__anext_impl = self.__anext_with_timeout + else: + self.__anext_impl = self.__anext_without_timeout + self._amqp_queue.close_callbacks.add(self.close) async def on_message(self, message: AbstractIncomingMessage) -> None: @@ -511,6 +516,9 @@ async def __aexit__( await self.close() async def __anext__(self) -> IncomingMessage: + return await self.__anext_impl() + + async def __anext_with_timeout(self) -> IncomingMessage: if not hasattr(self, "_consumer_tag"): await self.consume() try: @@ -530,5 +538,22 @@ async def __anext__(self) -> IncomingMessage: await asyncio.wait_for(self.close(), timeout=timeout) raise + async def __anext_without_timeout(self) -> IncomingMessage: + if not hasattr(self, "_consumer_tag"): + await self.consume() + try: + return await self._queue.get() + except asyncio.CancelledError: + timeout = self._consume_kwargs.get( + "timeout", + self.DEFAULT_CLOSE_TIMEOUT, + ) + log.info( + "%r closing with timeout %d seconds", + self, timeout, + ) + await asyncio.wait_for(self.close(), timeout=timeout) + raise + __all__ = ("Queue", "QueueIterator", "ConsumerTag") From 4a89ff06e7989eb1aeba307cdf5cde88e4a3402c Mon Sep 17 00:00:00 2001 From: gluker Date: Thu, 25 Apr 2024 17:18:23 +0300 Subject: [PATCH 2/2] QueueIterator: __anext_without_timeout: except optimized --- aio_pika/queue.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/aio_pika/queue.py b/aio_pika/queue.py index 6a09904c..939237ee 100644 --- a/aio_pika/queue.py +++ b/aio_pika/queue.py @@ -544,15 +544,7 @@ async def __anext_without_timeout(self) -> IncomingMessage: try: return await self._queue.get() except asyncio.CancelledError: - timeout = self._consume_kwargs.get( - "timeout", - self.DEFAULT_CLOSE_TIMEOUT, - ) - log.info( - "%r closing with timeout %d seconds", - self, timeout, - ) - await asyncio.wait_for(self.close(), timeout=timeout) + await self.close() raise