Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QueueIterator hangs forever when RabbitMQ disconnects #379

Open
Phibonacci opened this issue Feb 23, 2021 · 10 comments
Open

QueueIterator hangs forever when RabbitMQ disconnects #379

Phibonacci opened this issue Feb 23, 2021 · 10 comments

Comments

@Phibonacci
Copy link

Phibonacci commented Feb 23, 2021

If RabbitMQ disconnects then QueueIterator will hang forever or until the coroutine is cancelled.

        queue = await channel.declare_queue(queue_name, durable=True)
        try:
            async for message in queue:
                pass
@rob-blackbourn
Copy link

I see this too. When I set logging.basicConfig(level=logging.DEBUG) the following error is reported:

DEBUG:aio_pika.connection:Closing AMQP connection None
DEBUG:aiormq.connection:Reader task cancelled:
Traceback (most recent call last):
  File "/home/BHDGSYSTEMATIC.COM/rblackbourn/dev/scratch/python/aio_pika-example1/.venv/lib/python3.8/site-packages/aiormq/connection.py", line 383, in __reader
    return await self.close(
  File "/home/BHDGSYSTEMATIC.COM/rblackbourn/dev/scratch/python/aio_pika-example1/.venv/lib/python3.8/site-packages/aiormq/base.py", line 149, in close
    await self.loop.create_task(self.__closer(exc))
asyncio.exceptions.CancelledError

As a workaround this error can be caught by adding a connection.add_close_callback which does detect the failure:

def close_callback(connection: aio_pika.Connection, error: Exception) -> None:
    if isinstance(error, ConnectionClosed):
        code, text = error.args
        if code == 320:  # "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
            print("Handle shutdown here ...")

@rob-blackbourn
Copy link

I have this helper method:

"""Helpers for asyncio"""

import asyncio
from asyncio import Event, Future, Task
from typing import AsyncIterator, Set, TypeVar, cast

T = TypeVar('T')

async def cancel_task_and_await(task: Task) -> None:
    """Cancel a task and await it

    :param task: The task
    :type task: Task
    """
    task.cancel()
    try:
        # If the task has remaining await statements it must be awaited.
        await task
    except asyncio.CancelledError:
        pass


async def aiter_cancellable(
        async_iterator: AsyncIterator[T],
        cancellation_event: Event
) -> AsyncIterator[T]:
    """Create a cancellable async iterator.

    :param async_iterator: The async iterator to wrap
    :param cancellation_event: The asyncio Event to controll cancellation.
    :return: The wrapped async iterator
    """
    cancellation_task = asyncio.create_task(cancellation_event.wait())
    result_iter = async_iterator.__aiter__()
    pending: Set[Future] = {
        cancellation_task
    }
    while not cancellation_event.is_set():
        try:
            anext_task = asyncio.create_task(result_iter.__anext__())
            pending.add(anext_task)
            done, pending = await asyncio.wait(
                pending,
                return_when=asyncio.FIRST_COMPLETED
            )
            if anext_task in done:
                yield anext_task.result()
        except StopAsyncIteration:
            await cancel_task_and_await(cancellation_task)
            return

    for task in pending:
        await cancel_task_and_await(cast(Task, task))

Then this code:

async def main_async():

    # Create an event that gets set when the connection fails.
    cancellation_event = asyncio.Event()

    def close_callback(_conn: aio_pika.Connection, error: Exception) -> None:
        if isinstance(error, ConnectionClosed):
            code, text = error.args
            if code == 320:  # "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
                LOGGER.error("Connection failure (%s) %s", code, text)
                cancellation_event.set()

    async with await aio_pika.connect(
            host='localhost',
            port=5672,
            virtualhost='/',
            login='guest',
            password='guest'
    ) as connection:

        connection.add_close_callback(close_callback)

        async with await connection.channel() as channel:
            queue = await channel.declare_queue('some.queue', passive=True)

            async with queue.iterator() as queue_iter:
                # Use the cancellation event HERE!!!
                async for message in aiter_cancellable(queue_iter, cancellation_event):
                    try:
                        async with message.process():
                            print(message.body.decode())
                    except:
                        print('Failed to process message')

@edmondburnett
Copy link

edmondburnett commented Apr 25, 2021

+1

There are several open tickets related to this, would love to see an official solution.

@jgayfer
Copy link

jgayfer commented Dec 29, 2021

Any update on this?

@mknithin
Copy link

Any update for this issue , still getting this error in version : 8.1.1

@wallyhall
Copy link

I believe this may be related to: #358 ... I've added a comment there with a work-around which solves both issues for us.

@WaFeeAL
Copy link

WaFeeAL commented Jun 6, 2023

Any official update for this issue? Still getting this error in version : 9.1.2

@Darsstar
Copy link
Contributor

Does #615 resolve the problem? (That is the purose of the PR)

@lukaszdudek-trueenergy
Copy link

What's the status here? Is there anyone reviewing #615? Is there anything I can do to help speed this up?
We've encountered this issue in production & it's not fun to see your workers silently hang indefinitely.

@Darsstar
Copy link
Contributor

Darsstar commented Nov 21, 2024

What's the status here? Is there anyone reviewing #615? Is there anything I can do to help speed this up? We've encountered this issue in production & it's not fun to see your workers silently hang indefinitely.

It got merged an hour ago 😄
It is available as 9.5.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

10 participants