Skip to content

Commit

Permalink
Prevent deadlock in RobustChannel.reopen()
Browse files Browse the repository at this point in the history
  • Loading branch information
Darsstar committed Mar 16, 2024
1 parent a3ef44b commit 5b2f6c8
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 6 deletions.
4 changes: 2 additions & 2 deletions aio_pika/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,12 @@ def __str__(self) -> str:
return "{}".format(self.number or "Not initialized channel")

async def _open(self) -> None:
await self._connection.ready()

transport = self._connection.transport
if transport is None:
raise ChannelInvalidStateError("No active transport in channel")

await transport.ready()

channel = await UnderlayChannel.create(
transport.connection,
self._on_close,
Expand Down
4 changes: 2 additions & 2 deletions aio_pika/robust_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ async def __close_callback(self, _: Any, exc: BaseException) -> None:

await self.restore()

async def _open(self) -> None:
await super()._open()
async def reopen(self) -> None:
await super().reopen()
await self.reopen_callbacks()

async def _on_open(self) -> None:
Expand Down
6 changes: 4 additions & 2 deletions tests/test_amqp_robust_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import aio_pika
from aio_pika.abc import AbstractRobustChannel
from aio_pika.exceptions import QueueEmpty
from aio_pika.exceptions import QueueEmpty, CONNECTION_EXCEPTIONS
from aio_pika.message import Message
from aio_pika.robust_channel import RobustChannel
from aio_pika.robust_connection import RobustConnection
Expand Down Expand Up @@ -565,6 +565,7 @@ async def test_channel_reconnect_after_5kb(

assert messages

assert on_reconnect.is_set()
await connection.close()
await direct_connection.close()

Expand Down Expand Up @@ -666,7 +667,7 @@ async def test_channel_reconnect_stairway(
try:
await channel.set_qos(prefetch_count=1)
break
except aiormq.ChannelInvalidStateError:
except CONNECTION_EXCEPTIONS:
await asyncio.sleep(0.1)
continue

Expand All @@ -689,5 +690,6 @@ async def test_channel_reconnect_stairway(

assert messages

assert on_reconnect.is_set()
await connection.close()
await direct_connection.close()

0 comments on commit 5b2f6c8

Please sign in to comment.