diff --git a/aio_pika/channel.py b/aio_pika/channel.py index c7175786..9ab2f4ed 100644 --- a/aio_pika/channel.py +++ b/aio_pika/channel.py @@ -16,6 +16,7 @@ from .exchange import Exchange, ExchangeType from .message import IncomingMessage from .queue import Queue +from .tools import OPERATION_TIMEOUT from .transaction import Transaction from .types import ReturnCallbackType, CloseCallbackType, TimeoutType @@ -107,6 +108,12 @@ def channel(self) -> aiormq.Channel: def number(self): return self.channel.number if self._channel else None + def _get_operation_timeout(self, timeout: TimeoutType): + return ( + self._connection.operation_timeout if timeout is OPERATION_TIMEOUT + else timeout + ) + def __str__(self): return "{0}".format( self.number or "Not initialized channel" @@ -159,12 +166,14 @@ async def _create_channel(self) -> aiormq.Channel: channel_number=self._channel_number, ) - async def initialize(self, timeout: TimeoutType = None) -> None: + async def initialize(self, + timeout: TimeoutType = OPERATION_TIMEOUT) -> None: if self._channel is not None: raise RuntimeError("Can't initialize channel") self._channel = await asyncio.wait_for( - self._create_channel(), timeout=timeout + self._create_channel(), + timeout=self._get_operation_timeout(timeout) ) self._delivery_tag = 0 @@ -190,7 +199,7 @@ async def declare_exchange( self, name: str, type: Union[ExchangeType, str] = ExchangeType.DIRECT, durable: bool = None, auto_delete: bool = False, internal: bool = False, passive: bool = False, arguments: dict = None, - timeout: TimeoutType = None + timeout: TimeoutType = OPERATION_TIMEOUT ) -> Exchange: """ Declare an exchange. @@ -228,7 +237,7 @@ async def declare_queue( self, name: str = None, *, durable: bool = None, exclusive: bool = False, passive: bool = False, auto_delete: bool = False, arguments: dict = None, - timeout: TimeoutType = None + timeout: TimeoutType = OPERATION_TIMEOUT ) -> Queue: """ @@ -248,7 +257,7 @@ async def declare_queue( """ queue = self.QUEUE_CLASS( - connection=self, + connection=self._connection, channel=self.channel, name=name, durable=durable, @@ -264,7 +273,7 @@ async def declare_queue( async def set_qos( self, prefetch_count: int = 0, prefetch_size: int = 0, - global_: bool = False, timeout: TimeoutType = None, + global_: bool = False, timeout: TimeoutType = OPERATION_TIMEOUT, all_channels: bool = None ) -> aiormq.spec.Basic.QosOk: if all_channels is not None: @@ -277,11 +286,11 @@ async def set_qos( prefetch_size=prefetch_size, global_=global_ ), - timeout=timeout + timeout=self._get_operation_timeout(timeout) ) async def queue_delete( - self, queue_name: str, timeout: TimeoutType = None, + self, queue_name: str, timeout: TimeoutType = OPERATION_TIMEOUT, if_unused: bool = False, if_empty: bool = False, nowait: bool = False ) -> aiormq.spec.Queue.DeleteOk: @@ -292,11 +301,11 @@ async def queue_delete( if_empty=if_empty, nowait=nowait, ), - timeout=timeout + timeout=self._get_operation_timeout(timeout) ) async def exchange_delete( - self, exchange_name: str, timeout: TimeoutType = None, + self, exchange_name: str, timeout: TimeoutType = OPERATION_TIMEOUT, if_unused: bool = False, nowait: bool = False ) -> aiormq.spec.Exchange.DeleteOk: @@ -306,7 +315,7 @@ async def exchange_delete( if_unused=if_unused, nowait=nowait, ), - timeout=timeout + timeout=self._get_operation_timeout(timeout) ) def transaction(self) -> Transaction: @@ -314,7 +323,7 @@ def transaction(self) -> Transaction: raise RuntimeError("Cannot create transaction when publisher " "confirms are enabled") - return Transaction(self._channel) + return Transaction(connection=self._connection, channel=self._channel) async def flow(self, active: bool = True) -> aiormq.spec.Channel.FlowOk: return await self.channel.flow(active=active) diff --git a/aio_pika/connection.py b/aio_pika/connection.py index 8e80c33d..3712619c 100644 --- a/aio_pika/connection.py +++ b/aio_pika/connection.py @@ -46,9 +46,11 @@ def _parse_kwargs(cls, kwargs): result[key] = parser(kwargs.get(key, default)) return result - def __init__(self, url, loop=None, **kwargs): + def __init__(self, url, operation_timeout: TimeoutType = None, loop=None, + **kwargs): self.loop = loop or asyncio.get_event_loop() self.url = URL(url) + self.operation_timeout = operation_timeout self.kwargs = self._parse_kwargs(kwargs or self.url.query) @@ -219,6 +221,7 @@ async def connect( login: str = 'guest', password: str = 'guest', virtualhost: str = '/', ssl: bool = False, loop: asyncio.AbstractEventLoop = None, ssl_options: dict = None, timeout: TimeoutType = None, + operation_timeout: TimeoutType = None, connection_class: Type[ConnectionType] = Connection, client_properties: dict = None, **kwargs ) -> ConnectionType: @@ -291,6 +294,7 @@ async def main(): :param ssl: use SSL for connection. Should be used with addition kwargs. :param ssl_options: A dict of values for the SSL connection. :param timeout: connection timeout in seconds + :param operation_timeout: execution timeout in seconds :param loop: Event loop (:func:`asyncio.get_event_loop()` when :class:`None`) :param connection_class: Factory of a new connection @@ -318,7 +322,11 @@ async def main(): query=kw ) - connection = connection_class(url, loop=loop) + connection = connection_class( + url, + operation_timeout=operation_timeout, + loop=loop + ) await connection.connect( timeout=timeout, client_properties=client_properties, loop=loop diff --git a/aio_pika/exchange.py b/aio_pika/exchange.py index fdc526e6..28a554bd 100644 --- a/aio_pika/exchange.py +++ b/aio_pika/exchange.py @@ -5,6 +5,7 @@ import aiormq from .message import Message +from .tools import OPERATION_TIMEOUT from .types import ExchangeType as ExchangeType_, TimeoutType @@ -36,6 +37,7 @@ def __init__(self, connection, channel: aiormq.Channel, name: str, if not arguments: arguments = {} + self._connection = connection self._channel = channel self.__type = type.value if isinstance(type, ExchangeType) else type self.name = name @@ -52,6 +54,12 @@ def channel(self) -> aiormq.Channel: return self._channel + def _get_operation_timeout(self, timeout: TimeoutType): + return ( + self._connection.operation_timeout if timeout is OPERATION_TIMEOUT + else timeout + ) + def __str__(self): return self.name @@ -61,7 +69,7 @@ def __repr__(self): ) async def declare( - self, timeout: TimeoutType = None + self, timeout: TimeoutType = OPERATION_TIMEOUT ) -> aiormq.spec.Exchange.DeclareOk: return await asyncio.wait_for(self.channel.exchange_declare( self.name, @@ -71,7 +79,7 @@ async def declare( internal=self.internal, passive=self.passive, arguments=self.arguments, - ), timeout=timeout) + ), timeout=self._get_operation_timeout(timeout)) @staticmethod def _get_exchange_name(exchange: ExchangeType_): @@ -85,7 +93,7 @@ def _get_exchange_name(exchange: ExchangeType_): async def bind( self, exchange: ExchangeType_, routing_key: str = '', *, - arguments: dict = None, timeout: TimeoutType = None + arguments: dict = None, timeout: TimeoutType = OPERATION_TIMEOUT ) -> aiormq.spec.Exchange.BindOk: """ A binding can also be a relationship between two exchanges. @@ -133,12 +141,12 @@ async def bind( destination=self.name, routing_key=routing_key, source=self._get_exchange_name(exchange), - ), timeout=timeout + ), timeout=self._get_operation_timeout(timeout) ) async def unbind( self, exchange: ExchangeType_, routing_key: str = '', - arguments: dict = None, timeout: TimeoutType = None + arguments: dict = None, timeout: TimeoutType = OPERATION_TIMEOUT ) -> aiormq.spec.Exchange.UnbindOk: """ Remove exchange-to-exchange binding for this @@ -163,12 +171,12 @@ async def unbind( destination=self.name, routing_key=routing_key, source=self._get_exchange_name(exchange), - ), timeout=timeout + ), timeout=self._get_operation_timeout(timeout) ) async def publish( self, message: Message, routing_key, *, mandatory: bool = True, - immediate: bool = False, timeout: TimeoutType = None + immediate: bool = False, timeout: TimeoutType = OPERATION_TIMEOUT ) -> Optional[aiormq.types.ConfirmationFrameType]: """ Publish the message to the queue. `aio-pika` uses @@ -197,11 +205,11 @@ async def publish( properties=message.properties, mandatory=mandatory, immediate=immediate - ), timeout=timeout + ), timeout=self._get_operation_timeout(timeout) ) async def delete( - self, if_unused: bool = False, timeout: TimeoutType = None + self, if_unused: bool = False, timeout: TimeoutType = OPERATION_TIMEOUT ) -> aiormq.spec.Exchange.DeleteOk: """ Delete the queue @@ -213,7 +221,7 @@ async def delete( log.info("Deleting %r", self) return await asyncio.wait_for( self.channel.exchange_delete(self.name, if_unused=if_unused), - timeout=timeout + timeout=self._get_operation_timeout(timeout) ) diff --git a/aio_pika/queue.py b/aio_pika/queue.py index ec2a947d..2d586045 100644 --- a/aio_pika/queue.py +++ b/aio_pika/queue.py @@ -9,9 +9,9 @@ from .exceptions import QueueEmpty from .exchange import Exchange -from aio_pika.types import ExchangeType as ExchangeType_ +from .types import ExchangeType as ExchangeType_, TimeoutType from .message import IncomingMessage -from .tools import create_task, shield +from .tools import OPERATION_TIMEOUT, create_task, shield log = getLogger(__name__) @@ -36,6 +36,7 @@ def __init__(self, connection, channel: aiormq.Channel, name, self.loop = connection.loop + self._connection = connection self._channel = channel self.name = name or '' self.durable = durable @@ -52,6 +53,12 @@ def channel(self) -> aiormq.Channel: raise RuntimeError("Channel not opened") return self._channel + def _get_operation_timeout(self, timeout: TimeoutType): + return ( + self._connection.operation_timeout if timeout is OPERATION_TIMEOUT + else timeout + ) + def __str__(self): return "%s" % self.name @@ -70,7 +77,9 @@ def __repr__(self): self.arguments, ) - async def declare(self, timeout: int=None) -> aiormq.spec.Queue.DeclareOk: + async def declare( + self, timeout: TimeoutType = OPERATION_TIMEOUT + ) -> aiormq.spec.Queue.DeclareOk: """ Declare queue. :param timeout: execution timeout @@ -84,7 +93,7 @@ async def declare(self, timeout: int=None) -> aiormq.spec.Queue.DeclareOk: queue=self.name, durable=self.durable, exclusive=self.exclusive, auto_delete=self.auto_delete, arguments=self.arguments, passive=self.passive, - ), timeout=timeout + ), timeout=self._get_operation_timeout(timeout) ) # type: aiormq.spec.Queue.DeclareOk self.name = self.declaration_result.queue @@ -92,7 +101,7 @@ async def declare(self, timeout: int=None) -> aiormq.spec.Queue.DeclareOk: async def bind( self, exchange: ExchangeType_, routing_key: str=None, *, - arguments=None, timeout: int=None + arguments=None, timeout: TimeoutType = OPERATION_TIMEOUT ) -> aiormq.spec.Queue.BindOk: """ A binding is a relationship between an exchange and a queue. @@ -126,12 +135,12 @@ async def bind( exchange=Exchange._get_exchange_name(exchange), routing_key=routing_key, arguments=arguments - ), timeout=timeout + ), timeout=self._get_operation_timeout(timeout) ) async def unbind( self, exchange: ExchangeType_, routing_key: str=None, - arguments: dict=None, timeout: int=None + arguments: dict=None, timeout: TimeoutType = OPERATION_TIMEOUT ) -> aiormq.spec.Queue.UnbindOk: """ Remove binding from exchange for this :class:`Queue` instance @@ -159,13 +168,13 @@ async def unbind( exchange=Exchange._get_exchange_name(exchange), routing_key=routing_key, arguments=arguments - ), timeout=timeout + ), timeout=self._get_operation_timeout(timeout) ) async def consume( self, callback: Callable[[IncomingMessage], Any], no_ack: bool = False, exclusive: bool = False, arguments: dict = None, - consumer_tag=None, timeout=None + consumer_tag=None, timeout: TimeoutType = OPERATION_TIMEOUT ) -> ConsumerTag: """ Start to consuming the :class:`Queue`. @@ -203,11 +212,13 @@ async def consume( arguments=arguments, consumer_tag=consumer_tag, ), - timeout=timeout + timeout=self._get_operation_timeout(timeout) )).consumer_tag - async def cancel(self, consumer_tag: ConsumerTag, timeout=None, - nowait: bool=False) -> aiormq.spec.Basic.CancelOk: + async def cancel( + self, consumer_tag: ConsumerTag, + timeout: TimeoutType = OPERATION_TIMEOUT, nowait: bool=False + ) -> aiormq.spec.Basic.CancelOk: """ This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number @@ -230,11 +241,12 @@ async def cancel(self, consumer_tag: ConsumerTag, timeout=None, consumer_tag=consumer_tag, nowait=nowait ), - timeout=timeout + timeout=self._get_operation_timeout(timeout) ) async def get( - self, *, no_ack=False, fail=True, timeout=5 + self, *, no_ack=False, fail=True, + timeout: TimeoutType = OPERATION_TIMEOUT ) -> Optional[IncomingMessage]: """ Get message from the queue. @@ -249,7 +261,7 @@ async def get( msg = await asyncio.wait_for(self.channel.basic_get( self.name, no_ack=no_ack - ), timeout=timeout + ), timeout=self._get_operation_timeout(timeout) ) # type: Optional[DeliveredMessage] if msg is None: @@ -260,7 +272,7 @@ async def get( return IncomingMessage(msg, no_ack=no_ack) async def purge( - self, no_wait=False, timeout=None + self, no_wait=False, timeout: TimeoutType = OPERATION_TIMEOUT ) -> aiormq.spec.Queue.PurgeOk: """ Purge all messages from the queue. @@ -275,11 +287,13 @@ async def purge( self.channel.queue_purge( self.name, nowait=no_wait, - ), timeout=timeout + ), timeout=self._get_operation_timeout(timeout) ) - async def delete(self, *, if_unused=True, if_empty=True, - timeout=None) -> aiormq.spec.Queue.DeclareOk: + async def delete( + self, *, if_unused=True, if_empty=True, + timeout: TimeoutType = OPERATION_TIMEOUT + ) -> aiormq.spec.Queue.DeclareOk: """ Delete the queue. @@ -294,7 +308,7 @@ async def delete(self, *, if_unused=True, if_empty=True, return await asyncio.wait_for( self.channel.queue_delete( self.name, if_unused=if_unused, if_empty=if_empty - ), timeout=timeout + ), timeout=self._get_operation_timeout(timeout) ) def __aiter__(self) -> 'QueueIterator': diff --git a/aio_pika/robust_channel.py b/aio_pika/robust_channel.py index be29136a..91019827 100644 --- a/aio_pika/robust_channel.py +++ b/aio_pika/robust_channel.py @@ -12,6 +12,7 @@ from .exchange import Exchange, ExchangeType from .queue import Queue +from .tools import OPERATION_TIMEOUT from .types import TimeoutType from .channel import Channel from .robust_queue import RobustQueue @@ -72,7 +73,7 @@ def _on_channel_close(self, exc: Exception): else: log.debug("Robust channel %r has been closed.", self) - async def initialize(self, timeout: TimeoutType = None): + async def initialize(self, timeout: TimeoutType = OPERATION_TIMEOUT): result = await super().initialize() self.add_close_callback(self._on_channel_close) @@ -87,7 +88,8 @@ async def initialize(self, timeout: TimeoutType = None): return result async def set_qos(self, prefetch_count: int = 0, prefetch_size: int = 0, - global_: bool = False, timeout: TimeoutType = None, + global_: bool = False, + timeout: TimeoutType = OPERATION_TIMEOUT, all_channels: bool = None): if all_channels is not None: warn('Use "global_" instead of "all_channels"', DeprecationWarning) @@ -106,7 +108,7 @@ async def declare_exchange( self, name: str, type: Union[ExchangeType, str] = ExchangeType.DIRECT, durable: bool = None, auto_delete: bool = False, internal: bool = False, passive: bool = False, arguments: dict = None, - timeout: TimeoutType = None, robust: bool = True + timeout: TimeoutType = OPERATION_TIMEOUT, robust: bool = True ) -> Exchange: exchange = await super().declare_exchange( name=name, type=type, durable=durable, auto_delete=auto_delete, @@ -120,7 +122,8 @@ async def declare_exchange( return exchange async def exchange_delete(self, exchange_name: str, - timeout: TimeoutType = None, if_unused=False, + timeout: TimeoutType = OPERATION_TIMEOUT, + if_unused=False, nowait=False) -> aiormq.spec.Exchange.DeleteOk: result = await super().exchange_delete( @@ -135,7 +138,7 @@ async def exchange_delete(self, exchange_name: str, async def declare_queue(self, name: str = None, *, durable: bool = None, exclusive: bool = False, passive: bool = False, auto_delete: bool = False, arguments: dict = None, - timeout: TimeoutType = None, + timeout: TimeoutType = OPERATION_TIMEOUT, robust: bool = True) -> Queue: queue = await super().declare_queue( @@ -149,7 +152,8 @@ async def declare_queue(self, name: str = None, *, durable: bool = None, return queue - async def queue_delete(self, queue_name: str, timeout: TimeoutType = None, + async def queue_delete(self, queue_name: str, + timeout: TimeoutType = OPERATION_TIMEOUT, if_unused: bool = False, if_empty: bool = False, nowait: bool = False): result = await super().queue_delete( diff --git a/aio_pika/robust_connection.py b/aio_pika/robust_connection.py index da2e4e2b..9f10b297 100644 --- a/aio_pika/robust_connection.py +++ b/aio_pika/robust_connection.py @@ -186,6 +186,7 @@ async def connect_robust( login: str = 'guest', password: str = 'guest', virtualhost: str = '/', ssl: bool = False, loop: asyncio.AbstractEventLoop = None, ssl_options: dict = None, timeout: TimeoutType = None, + operation_timeout: TimeoutType = None, connection_class: Type[ConnectionType] = RobustConnection, client_properties: dict = None, **kwargs ) -> ConnectionType: @@ -241,6 +242,7 @@ async def main(): :param ssl: use SSL for connection. Should be used with addition kwargs. :param ssl_options: A dict of values for the SSL connection. :param timeout: connection timeout in seconds + :param operation_timeout: execution timeout in seconds :param loop: Event loop (:func:`asyncio.get_event_loop()` when :class:`None`) :param connection_class: Factory of a new connection @@ -257,6 +259,7 @@ async def main(): password=password, virtualhost=virtualhost, ssl=ssl, loop=loop, connection_class=connection_class, ssl_options=ssl_options, timeout=timeout, + operation_timeout=operation_timeout, client_properties=client_properties, **kwargs ) ) diff --git a/aio_pika/robust_exchange.py b/aio_pika/robust_exchange.py index f87e7e1c..ebf0535b 100644 --- a/aio_pika/robust_exchange.py +++ b/aio_pika/robust_exchange.py @@ -5,6 +5,8 @@ from .exchange import Exchange, ExchangeType from .channel import Channel +from .tools import OPERATION_TIMEOUT +from .types import TimeoutType log = getLogger(__name__) @@ -42,7 +44,8 @@ async def on_reconnect(self, channel: Channel): await self.bind(exchange, **kwargs) async def bind(self, exchange, routing_key: str='', *, - arguments=None, timeout: int=None, robust: bool = True): + arguments=None, timeout: TimeoutType = OPERATION_TIMEOUT, + robust: bool = True): result = await super().bind( exchange, routing_key=routing_key, arguments=arguments, timeout=timeout @@ -56,7 +59,8 @@ async def bind(self, exchange, routing_key: str='', *, return result async def unbind(self, exchange, routing_key: str = '', - arguments: dict=None, timeout: int=None): + arguments: dict=None, + timeout: TimeoutType = OPERATION_TIMEOUT): result = await super().unbind(exchange, routing_key, arguments=arguments, timeout=timeout) diff --git a/aio_pika/robust_queue.py b/aio_pika/robust_queue.py index 555045e0..f60681ef 100644 --- a/aio_pika/robust_queue.py +++ b/aio_pika/robust_queue.py @@ -7,7 +7,8 @@ import aiormq from .channel import Channel -from aio_pika.types import ExchangeType as ExchangeType_ +from .tools import OPERATION_TIMEOUT +from .types import ExchangeType as ExchangeType_, TimeoutType from .queue import Queue, ConsumerTag log = getLogger(__name__) @@ -56,7 +57,8 @@ async def on_reconnect(self, channel: Channel): await self.consume(consumer_tag=consumer_tag, **kwargs) async def bind(self, exchange: ExchangeType_, routing_key: str=None, *, - arguments=None, timeout: int=None, robust: bool = True): + arguments=None, timeout: TimeoutType = OPERATION_TIMEOUT, + robust: bool = True): if routing_key is None: routing_key = self.name @@ -75,7 +77,8 @@ async def bind(self, exchange: ExchangeType_, routing_key: str=None, *, return result async def unbind(self, exchange: ExchangeType_, routing_key: str=None, - arguments: dict=None, timeout: int=None): + arguments: dict=None, + timeout: TimeoutType = OPERATION_TIMEOUT): if routing_key is None: routing_key = self.name @@ -89,7 +92,8 @@ async def unbind(self, exchange: ExchangeType_, routing_key: str=None, async def consume(self, callback: FunctionType, no_ack: bool=False, exclusive: bool=False, arguments: dict=None, - consumer_tag=None, timeout=None, + consumer_tag=None, + timeout: TimeoutType = OPERATION_TIMEOUT, robust: bool = True) -> ConsumerTag: kwargs = dict( @@ -108,7 +112,8 @@ async def consume(self, callback: FunctionType, no_ack: bool=False, return consumer_tag - async def cancel(self, consumer_tag: ConsumerTag, timeout=None, + async def cancel(self, consumer_tag: ConsumerTag, + timeout: TimeoutType = OPERATION_TIMEOUT, nowait: bool = False): result = await super().cancel(consumer_tag, timeout, nowait) diff --git a/aio_pika/tools.py b/aio_pika/tools.py index 6e590e0e..e48ab23b 100644 --- a/aio_pika/tools.py +++ b/aio_pika/tools.py @@ -10,6 +10,8 @@ log = logging.getLogger(__name__) +OPERATION_TIMEOUT = object() + def iscoroutinepartial(fn): """ diff --git a/aio_pika/transaction.py b/aio_pika/transaction.py index 50c6fba0..09af7e3d 100644 --- a/aio_pika/transaction.py +++ b/aio_pika/transaction.py @@ -3,6 +3,9 @@ import aiormq +from .tools import OPERATION_TIMEOUT +from .types import TimeoutType + class TransactionStates(Enum): created = 'created' @@ -15,8 +18,9 @@ class Transaction: def __str__(self): return self.state.value - def __init__(self, channel): - self.loop = channel.loop + def __init__(self, connection, channel): + self.loop = connection.loop + self._connection = connection self._channel = channel self.state = TransactionStates.created # type: TransactionStates @@ -30,24 +34,35 @@ def channel(self) -> aiormq.Channel: return self._channel - async def select(self, timeout=None) -> aiormq.spec.Tx.SelectOk: + def _get_operation_timeout(self, timeout: TimeoutType): + return ( + self._connection.operation_timeout if timeout is OPERATION_TIMEOUT + else timeout + ) + + async def select( + self, timeout: TimeoutType = OPERATION_TIMEOUT + ) -> aiormq.spec.Tx.SelectOk: result = await asyncio.wait_for( - self.channel.tx_select(), timeout=timeout + self.channel.tx_select(), + timeout=self._get_operation_timeout(timeout) ) self.state = TransactionStates.started return result - async def rollback(self, timeout=None): + async def rollback(self, timeout: TimeoutType = OPERATION_TIMEOUT): result = await asyncio.wait_for( - self.channel.tx_rollback(), timeout=timeout + self.channel.tx_rollback(), + timeout=self._get_operation_timeout(timeout) ) self.state = TransactionStates.rolled_back return result - async def commit(self, timeout=None): + async def commit(self, timeout: TimeoutType = OPERATION_TIMEOUT): result = await asyncio.wait_for( - self.channel.tx_commit(), timeout=timeout + self.channel.tx_commit(), + timeout=self._get_operation_timeout(timeout) ) self.state = TransactionStates.commited diff --git a/aio_pika/types.py b/aio_pika/types.py index 1edc0964..8f75043b 100644 --- a/aio_pika/types.py +++ b/aio_pika/types.py @@ -4,5 +4,5 @@ ReturnCallbackType = Callable[[aiormq.types.DeliveredMessage], Any] CloseCallbackType = Callable[[Exception], Any] -TimeoutType = Union[int, float] +TimeoutType = Union[int, float, None] ExchangeType = Union['Exchange', str]