From d7508828b1dda091006efcfafbca352c96ed2bb3 Mon Sep 17 00:00:00 2001 From: Bono de Visser Date: Fri, 28 May 2021 16:31:22 +0200 Subject: [PATCH 1/2] Fix all ordering issues for newer pika methods Newer pika versions changed the ordering of some parameters in the method signature. This is kind of hard to test so we keep running into different issues. I've now just made sure that any pika function call has the correct kwarg provided so ordering won't matter anymore. --- amqpconsumer/events.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/amqpconsumer/events.py b/amqpconsumer/events.py index 01320a7..67cce82 100644 --- a/amqpconsumer/events.py +++ b/amqpconsumer/events.py @@ -84,8 +84,8 @@ def connect(self): :rtype: pika.SelectConnection """ logger.debug('Connecting to %s', self._url) - return pika.SelectConnection(pika.URLParameters(self._url), - self.on_connection_open) + return pika.SelectConnection(parameters=pika.URLParameters(self._url), + on_open_callback=self.on_connection_open) def on_connection_open(self, _): """Called by pika once the connection to RabbitMQ has been established. @@ -104,7 +104,7 @@ def add_on_connection_close_callback(self): RabbitMQ closes the connection to the consumer unexpectedly. """ logger.debug('Adding connection close callback') - self._connection.add_on_close_callback(self.on_connection_closed) + self._connection.add_on_close_callback(callback=self.on_connection_closed) def on_connection_closed(self, _, error): """Called by pika when the connection to RabbitMQ is closed @@ -120,7 +120,7 @@ def on_connection_closed(self, _, error): self._connection.ioloop.stop() else: logger.warning('Connection closed, reopening in 5 seconds: {}'.format(error)) - self._connection.ioloop.call_later(5, self.reconnect) + self._connection.ioloop.call_later(delay=5, callback=self.reconnect) def reconnect(self): """Will be invoked by the IOLoop timer if the connection is closed. @@ -200,7 +200,8 @@ def on_queue_declareok(self, _): """ logger.debug("Binding %s to %s with %s", self._exchange, self._queue, self._routing_key) if self._exchange: - self._channel.queue_bind(self.on_bindok, self._queue, self._exchange, self._routing_key) + self._channel.queue_bind(queue=self._queue, exchange=self._exchange, routing_key=self._routing_key, + callback=self.on_bindok) else: self.start_consuming() @@ -219,7 +220,7 @@ def add_on_channel_close_callback(self): unexpectedly closes the channel. """ logger.debug('Adding channel close callback') - self._channel.add_on_close_callback(self.on_channel_closed) + self._channel.add_on_close_callback(callback=self.on_channel_closed) def on_channel_closed(self, channel, closing_reason): """Called by pika when RabbitMQ unexpectedly closes the channel. @@ -249,7 +250,7 @@ def start_consuming(self): when a message is fully received. """ self.add_on_cancel_callback() - self._consumer_tag = self._channel.basic_consume(self._queue, self.on_message) + self._consumer_tag = self._channel.basic_consume(queue=self._queue, on_message_callback=self.on_message) logger.info('Listening') def add_on_cancel_callback(self): @@ -260,7 +261,7 @@ def add_on_cancel_callback(self): invoked by pika. """ logger.debug('Adding consumer cancellation callback') - self._channel.add_on_cancel_callback(self.on_consumer_cancelled) + self._channel.add_on_cancel_callback(callback=self.on_consumer_cancelled) def on_consumer_cancelled(self, method_frame): """Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer @@ -306,7 +307,7 @@ def acknowledge_message(self, delivery_tag): :param int delivery_tag: The delivery tag from the Basic.Deliver frame """ logger.debug('Acknowledging message %s', delivery_tag) - self._channel.basic_ack(delivery_tag) + self._channel.basic_ack(delivery_tag=delivery_tag) def open_channel(self): """Open a new channel with RabbitMQ. @@ -347,7 +348,7 @@ def stop_consuming(self): """Tell RabbitMQ that we would like to stop consuming.""" if self._channel: logger.debug('Sending a Basic.Cancel RPC command to RabbitMQ') - self._channel.basic_cancel(self.on_cancelok, self._consumer_tag) + self._channel.basic_cancel(consumer_tag=self._consumer_tag, callback=self.on_cancelok) def on_cancelok(self, _): """Called by pika when RabbitMQ acknowledges the cancellation of a From 3b49dac915a8096d90bb1f54f67b69dae51fd015 Mon Sep 17 00:00:00 2001 From: Bono de Visser Date: Fri, 28 May 2021 16:33:49 +0200 Subject: [PATCH 2/2] Up version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 9ba0b6c..d51129e 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name='amqpconsumer', - version='1.7.3', + version='1.7.4', description='AMQP event listener', url='https://github.com/ByteInternet/amqpconsumer', author='Byte B.V.',