diff --git a/src/engine/src/core/Server.py b/src/engine/src/core/Server.py index 6a58921d..fdda17a1 100644 --- a/src/engine/src/core/Server.py +++ b/src/engine/src/core/Server.py @@ -9,7 +9,7 @@ import pika -from pika.exceptions import AMQPError +from pika.exceptions import AMQPError, ChannelClosedByBroker from pika.exchange_type import ExchangeType from conf.constants import ( @@ -80,14 +80,14 @@ def __call__(self): # Inbound exchange and queue handles workflow submissions or resubmissions channel.exchange_declare(INBOUND_EXCHANGE, exchange_type=ExchangeType.fanout) - inbound_queue = channel.queue_declare(queue=INBOUND_QUEUE, exclusive=True) + inbound_queue = self._declare_queue(channel, INBOUND_QUEUE, exclusive=True) channel.queue_bind(exchange=INBOUND_EXCHANGE, queue=inbound_queue.method.queue) # TODO Future implementation # Deferred exchange and queue stores workflow submissions that await execution # of workflows with the same idempotency key channel.exchange_declare(DEFERRED_EXCHANGE, exchange_type=ExchangeType.fanout) - deferred_queue = channel.queue_declare(queue=DEFERRED_QUEUE, exclusive=True) + deferred_queue = self._declare_queue(channel, DEFERRED_QUEUE, exclusive=True) channel.queue_bind(exchange=DEFERRED_EXCHANGE, queue=deferred_queue.method.queue) # TODO Future Implementation @@ -95,14 +95,14 @@ def __call__(self): # processed yet in the event that there are no workers available, or the workflow # executor has an ApplicationError channel.exchange_declare(RETRY_EXCHANGE, exchange_type=ExchangeType.fanout) - retry_queue = channel.queue_declare(queue=RETRY_QUEUE) + retry_queue = self._declare_queue(channel, RETRY_QUEUE, exclusive=True) channel.queue_bind(exchange=RETRY_EXCHANGE, queue=retry_queue.method.queue) # TODO Future Implementation # Messages that are retried too many times get sent to the deadletter exchange # for inspection channel.exchange_declare(DEAD_LETTER_EXCHANGE, exchange_type=ExchangeType.fanout) - dead_letter_queue = channel.queue_declare(queue=DEAD_LETTER_QUEUE) + dead_letter_queue = self._declare_queue(channel, DEAD_LETTER_QUEUE, exclusive=True) channel.queue_bind(exchange=DEAD_LETTER_EXCHANGE, queue=dead_letter_queue.method.queue) # The threads that will be started within the on_message callback @@ -130,11 +130,9 @@ def __call__(self): # Occurs when basic_consume recieves the wrong args except ValueError as e: logger.critical(f"Critical Workflow Executor Error: {e}") - # Cathes all ampq errors from .start_consuming() except AMQPError as e: logger.error(f"{e.__class__.__name__} - {e}") - # Catch all other exceptions except Exception as e: logger.error(e) @@ -189,7 +187,6 @@ def _start_worker(self, body, connection, channel, delivery_tag): logger.error(e) channel.basic_reject(delivery_tag, requeue=False) return - except NoAvailableWorkers: logger.info(f"{lbuf('[SERVER]')} Insufficient workers available. RETRYING (10s)") connection.add_callback_threadsafe( @@ -202,15 +199,8 @@ def _start_worker(self, body, connection, channel, delivery_tag): ) ) return - - # TODO probably not needed - # except WorkflowTerminated as e: - # logger.info(f"{lbuf('[SERVER]')} {e}") - # worker.reset() - except Exception as e: logger.error(e) - # Nack the message if it has not already been ack # TODO Nack the message into a retry queue. # Or reject? Why would it not be rejected? @@ -332,6 +322,19 @@ def _deregister_worker(self, worker, terminated=False): def _get_active_workers(self, key): return [worker for worker in self.active_workers if worker.key == key] + + def _declare_queue_retry(self, channel, queue, exclusive=True, retry_count=0, max_retries=25): + max_retries = 25 + try: + channel.queue_declare(queue=queue, exclusive=exclusive) + except ChannelClosedByBroker as e: + logger.info(f"{lbuf('[SERVER]')} Warning: Duplicate Submission Policy of 'DEFER' not implemented. Handling as 'ALLOW'") + time.sleep(CONNECTION_RETRY_DELAY) + if retry_count >= max_retries: + logger.critical(f"\nExclusive Queue Declaration Error: Maximum retry attempts reached({MAX_CONNECTION_ATTEMPTS}) | {e}") + sys.exit(1) + retry_count+=1 + self._declare_queue_retry(channel, queue, exclusive=exclusive, retry_count=retry_count) def _resolve_idempotency_key(self, request): # Check the context's meta for an idempotency key. This will be used