-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
QueueIterator hangs forever when RabbitMQ disconnects #379
Comments
I see this too. When I set DEBUG:aio_pika.connection:Closing AMQP connection None
DEBUG:aiormq.connection:Reader task cancelled:
Traceback (most recent call last):
File "/home/BHDGSYSTEMATIC.COM/rblackbourn/dev/scratch/python/aio_pika-example1/.venv/lib/python3.8/site-packages/aiormq/connection.py", line 383, in __reader
return await self.close(
File "/home/BHDGSYSTEMATIC.COM/rblackbourn/dev/scratch/python/aio_pika-example1/.venv/lib/python3.8/site-packages/aiormq/base.py", line 149, in close
await self.loop.create_task(self.__closer(exc))
asyncio.exceptions.CancelledError As a workaround this error can be caught by adding a def close_callback(connection: aio_pika.Connection, error: Exception) -> None:
if isinstance(error, ConnectionClosed):
code, text = error.args
if code == 320: # "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
print("Handle shutdown here ...") |
I have this helper method: """Helpers for asyncio"""
import asyncio
from asyncio import Event, Future, Task
from typing import AsyncIterator, Set, TypeVar, cast
T = TypeVar('T')
async def cancel_task_and_await(task: Task) -> None:
"""Cancel a task and await it
:param task: The task
:type task: Task
"""
task.cancel()
try:
# If the task has remaining await statements it must be awaited.
await task
except asyncio.CancelledError:
pass
async def aiter_cancellable(
async_iterator: AsyncIterator[T],
cancellation_event: Event
) -> AsyncIterator[T]:
"""Create a cancellable async iterator.
:param async_iterator: The async iterator to wrap
:param cancellation_event: The asyncio Event to controll cancellation.
:return: The wrapped async iterator
"""
cancellation_task = asyncio.create_task(cancellation_event.wait())
result_iter = async_iterator.__aiter__()
pending: Set[Future] = {
cancellation_task
}
while not cancellation_event.is_set():
try:
anext_task = asyncio.create_task(result_iter.__anext__())
pending.add(anext_task)
done, pending = await asyncio.wait(
pending,
return_when=asyncio.FIRST_COMPLETED
)
if anext_task in done:
yield anext_task.result()
except StopAsyncIteration:
await cancel_task_and_await(cancellation_task)
return
for task in pending:
await cancel_task_and_await(cast(Task, task)) Then this code: async def main_async():
# Create an event that gets set when the connection fails.
cancellation_event = asyncio.Event()
def close_callback(_conn: aio_pika.Connection, error: Exception) -> None:
if isinstance(error, ConnectionClosed):
code, text = error.args
if code == 320: # "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
LOGGER.error("Connection failure (%s) %s", code, text)
cancellation_event.set()
async with await aio_pika.connect(
host='localhost',
port=5672,
virtualhost='/',
login='guest',
password='guest'
) as connection:
connection.add_close_callback(close_callback)
async with await connection.channel() as channel:
queue = await channel.declare_queue('some.queue', passive=True)
async with queue.iterator() as queue_iter:
# Use the cancellation event HERE!!!
async for message in aiter_cancellable(queue_iter, cancellation_event):
try:
async with message.process():
print(message.body.decode())
except:
print('Failed to process message') |
+1 There are several open tickets related to this, would love to see an official solution. |
Any update on this? |
Any update for this issue , still getting this error in version : 8.1.1 |
I believe this may be related to: #358 ... I've added a comment there with a work-around which solves both issues for us. |
Any official update for this issue? Still getting this error in version : 9.1.2 |
Does #615 resolve the problem? (That is the purose of the PR) |
What's the status here? Is there anyone reviewing #615? Is there anything I can do to help speed this up? |
If RabbitMQ disconnects then QueueIterator will hang forever or until the coroutine is cancelled.
The text was updated successfully, but these errors were encountered: