Skip to content

Commit

Permalink
Merge pull request #15 from kerrermanisNL/fix_all_arg_ordering_issues
Browse files Browse the repository at this point in the history
Fix all ordering issues for newer pika methods
  • Loading branch information
Bono de Visser authored May 28, 2021
2 parents 33fa48e + 3b49dac commit 179861d
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 11 deletions.
21 changes: 11 additions & 10 deletions amqpconsumer/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ def connect(self):
:rtype: pika.SelectConnection
"""
logger.debug('Connecting to %s', self._url)
return pika.SelectConnection(pika.URLParameters(self._url),
self.on_connection_open)
return pika.SelectConnection(parameters=pika.URLParameters(self._url),
on_open_callback=self.on_connection_open)

def on_connection_open(self, _):
"""Called by pika once the connection to RabbitMQ has been established.
Expand All @@ -104,7 +104,7 @@ def add_on_connection_close_callback(self):
RabbitMQ closes the connection to the consumer unexpectedly.
"""
logger.debug('Adding connection close callback')
self._connection.add_on_close_callback(self.on_connection_closed)
self._connection.add_on_close_callback(callback=self.on_connection_closed)

def on_connection_closed(self, _, error):
"""Called by pika when the connection to RabbitMQ is closed
Expand All @@ -120,7 +120,7 @@ def on_connection_closed(self, _, error):
self._connection.ioloop.stop()
else:
logger.warning('Connection closed, reopening in 5 seconds: {}'.format(error))
self._connection.ioloop.call_later(5, self.reconnect)
self._connection.ioloop.call_later(delay=5, callback=self.reconnect)

def reconnect(self):
"""Will be invoked by the IOLoop timer if the connection is closed.
Expand Down Expand Up @@ -200,7 +200,8 @@ def on_queue_declareok(self, _):
"""
logger.debug("Binding %s to %s with %s", self._exchange, self._queue, self._routing_key)
if self._exchange:
self._channel.queue_bind(self.on_bindok, self._queue, self._exchange, self._routing_key)
self._channel.queue_bind(queue=self._queue, exchange=self._exchange, routing_key=self._routing_key,
callback=self.on_bindok)
else:
self.start_consuming()

Expand All @@ -219,7 +220,7 @@ def add_on_channel_close_callback(self):
unexpectedly closes the channel.
"""
logger.debug('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)
self._channel.add_on_close_callback(callback=self.on_channel_closed)

def on_channel_closed(self, channel, closing_reason):
"""Called by pika when RabbitMQ unexpectedly closes the channel.
Expand Down Expand Up @@ -249,7 +250,7 @@ def start_consuming(self):
when a message is fully received.
"""
self.add_on_cancel_callback()
self._consumer_tag = self._channel.basic_consume(self._queue, self.on_message)
self._consumer_tag = self._channel.basic_consume(queue=self._queue, on_message_callback=self.on_message)
logger.info('Listening')

def add_on_cancel_callback(self):
Expand All @@ -260,7 +261,7 @@ def add_on_cancel_callback(self):
invoked by pika.
"""
logger.debug('Adding consumer cancellation callback')
self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
self._channel.add_on_cancel_callback(callback=self.on_consumer_cancelled)

def on_consumer_cancelled(self, method_frame):
"""Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
Expand Down Expand Up @@ -306,7 +307,7 @@ def acknowledge_message(self, delivery_tag):
:param int delivery_tag: The delivery tag from the Basic.Deliver frame
"""
logger.debug('Acknowledging message %s', delivery_tag)
self._channel.basic_ack(delivery_tag)
self._channel.basic_ack(delivery_tag=delivery_tag)

def open_channel(self):
"""Open a new channel with RabbitMQ.
Expand Down Expand Up @@ -347,7 +348,7 @@ def stop_consuming(self):
"""Tell RabbitMQ that we would like to stop consuming."""
if self._channel:
logger.debug('Sending a Basic.Cancel RPC command to RabbitMQ')
self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
self._channel.basic_cancel(consumer_tag=self._consumer_tag, callback=self.on_cancelok)

def on_cancelok(self, _):
"""Called by pika when RabbitMQ acknowledges the cancellation of a
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

setup(
name='amqpconsumer',
version='1.7.3',
version='1.7.4',
description='AMQP event listener',
url='https://github.com/ByteInternet/amqpconsumer',
author='Byte B.V.',
Expand Down

0 comments on commit 179861d

Please sign in to comment.