Skip to content

Commit

Permalink
Add error handling to gracefully shutdown Server when unable to decla…
Browse files Browse the repository at this point in the history
…re exclusive lock on a queue
  • Loading branch information
nathandf committed Jan 8, 2024
1 parent d1a79d6 commit fc37bc5
Showing 1 changed file with 18 additions and 15 deletions.
33 changes: 18 additions & 15 deletions src/engine/src/core/Server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import pika

from pika.exceptions import AMQPError
from pika.exceptions import AMQPError, ChannelClosedByBroker
from pika.exchange_type import ExchangeType

from conf.constants import (
Expand Down Expand Up @@ -80,29 +80,29 @@ def __call__(self):

# Inbound exchange and queue handles workflow submissions or resubmissions
channel.exchange_declare(INBOUND_EXCHANGE, exchange_type=ExchangeType.fanout)
inbound_queue = channel.queue_declare(queue=INBOUND_QUEUE, exclusive=True)
inbound_queue = self._declare_queue(channel, INBOUND_QUEUE, exclusive=True)
channel.queue_bind(exchange=INBOUND_EXCHANGE, queue=inbound_queue.method.queue)

# TODO Future implementation
# Deferred exchange and queue stores workflow submissions that await execution
# of workflows with the same idempotency key
channel.exchange_declare(DEFERRED_EXCHANGE, exchange_type=ExchangeType.fanout)
deferred_queue = channel.queue_declare(queue=DEFERRED_QUEUE, exclusive=True)
deferred_queue = self._declare_queue(channel, DEFERRED_QUEUE, exclusive=True)
channel.queue_bind(exchange=DEFERRED_EXCHANGE, queue=deferred_queue.method.queue)

# TODO Future Implementation
# Retry exchange and queue is a temporary hold for messages that havent been
# processed yet in the event that there are no workers available, or the workflow
# executor has an ApplicationError
channel.exchange_declare(RETRY_EXCHANGE, exchange_type=ExchangeType.fanout)
retry_queue = channel.queue_declare(queue=RETRY_QUEUE)
retry_queue = self._declare_queue(channel, RETRY_QUEUE, exclusive=True)
channel.queue_bind(exchange=RETRY_EXCHANGE, queue=retry_queue.method.queue)

# TODO Future Implementation
# Messages that are retried too many times get sent to the deadletter exchange
# for inspection
channel.exchange_declare(DEAD_LETTER_EXCHANGE, exchange_type=ExchangeType.fanout)
dead_letter_queue = channel.queue_declare(queue=DEAD_LETTER_QUEUE)
dead_letter_queue = self._declare_queue(channel, DEAD_LETTER_QUEUE, exclusive=True)
channel.queue_bind(exchange=DEAD_LETTER_EXCHANGE, queue=dead_letter_queue.method.queue)

# The threads that will be started within the on_message callback
Expand Down Expand Up @@ -130,11 +130,9 @@ def __call__(self):
# Occurs when basic_consume recieves the wrong args
except ValueError as e:
logger.critical(f"Critical Workflow Executor Error: {e}")

# Cathes all ampq errors from .start_consuming()
except AMQPError as e:
logger.error(f"{e.__class__.__name__} - {e}")

# Catch all other exceptions
except Exception as e:
logger.error(e)
Expand Down Expand Up @@ -189,7 +187,6 @@ def _start_worker(self, body, connection, channel, delivery_tag):
logger.error(e)
channel.basic_reject(delivery_tag, requeue=False)
return

except NoAvailableWorkers:
logger.info(f"{lbuf('[SERVER]')} Insufficient workers available. RETRYING (10s)")
connection.add_callback_threadsafe(
Expand All @@ -202,15 +199,8 @@ def _start_worker(self, body, connection, channel, delivery_tag):
)
)
return

# TODO probably not needed
# except WorkflowTerminated as e:
# logger.info(f"{lbuf('[SERVER]')} {e}")
# worker.reset()

except Exception as e:
logger.error(e)

# Nack the message if it has not already been ack
# TODO Nack the message into a retry queue.
# Or reject? Why would it not be rejected?
Expand Down Expand Up @@ -332,6 +322,19 @@ def _deregister_worker(self, worker, terminated=False):

def _get_active_workers(self, key):
return [worker for worker in self.active_workers if worker.key == key]

def _declare_queue_retry(self, channel, queue, exclusive=True, retry_count=0, max_retries=25):
max_retries = 25
try:
channel.queue_declare(queue=queue, exclusive=exclusive)
except ChannelClosedByBroker as e:
logger.info(f"{lbuf('[SERVER]')} Warning: Duplicate Submission Policy of 'DEFER' not implemented. Handling as 'ALLOW'")
time.sleep(CONNECTION_RETRY_DELAY)
if retry_count >= max_retries:
logger.critical(f"\nExclusive Queue Declaration Error: Maximum retry attempts reached({MAX_CONNECTION_ATTEMPTS}) | {e}")
sys.exit(1)
retry_count+=1
self._declare_queue_retry(channel, queue, exclusive=exclusive, retry_count=retry_count)

def _resolve_idempotency_key(self, request):
# Check the context's meta for an idempotency key. This will be used
Expand Down

0 comments on commit fc37bc5

Please sign in to comment.