From b76895393d660b0aac2f2bbe19bcd0d74ebf2016 Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Mon, 8 Jan 2024 16:31:37 -0600 Subject: [PATCH] Remove unused future implementation of seperate queues --- src/engine/src/core/Server.py | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/src/engine/src/core/Server.py b/src/engine/src/core/Server.py index 183ad6fe..ab029db0 100644 --- a/src/engine/src/core/Server.py +++ b/src/engine/src/core/Server.py @@ -83,28 +83,6 @@ def __call__(self): inbound_queue = self._declare_queue(channel, INBOUND_QUEUE, exclusive=True) channel.queue_bind(exchange=INBOUND_EXCHANGE, queue=inbound_queue.method.queue) - # TODO Future implementation - # Deferred exchange and queue stores workflow submissions that await execution - # of workflows with the same idempotency key - channel.exchange_declare(DEFERRED_EXCHANGE, exchange_type=ExchangeType.fanout) - deferred_queue = self._declare_queue(channel, DEFERRED_QUEUE, exclusive=True) - channel.queue_bind(exchange=DEFERRED_EXCHANGE, queue=deferred_queue.method.queue) - - # TODO Future Implementation - # Retry exchange and queue is a temporary hold for messages that havent been - # processed yet in the event that there are no workers available, or the workflow - # executor has an ApplicationError - channel.exchange_declare(RETRY_EXCHANGE, exchange_type=ExchangeType.fanout) - retry_queue = self._declare_queue(channel, RETRY_QUEUE, exclusive=True) - channel.queue_bind(exchange=RETRY_EXCHANGE, queue=retry_queue.method.queue) - - # TODO Future Implementation - # Messages that are retried too many times get sent to the deadletter exchange - # for inspection - channel.exchange_declare(DEAD_LETTER_EXCHANGE, exchange_type=ExchangeType.fanout) - dead_letter_queue = self._declare_queue(channel, DEAD_LETTER_QUEUE, exclusive=True) - channel.queue_bind(exchange=DEAD_LETTER_EXCHANGE, queue=dead_letter_queue.method.queue) - # The threads that will be started within the on_message callback threads = [] @@ -328,7 +306,6 @@ def _declare_queue(self, channel, queue, exclusive=True): logger.critical(f"{lbuf('[SERVER]')} Exclusive queue declaration error for queue '{queue}' | {e}") sys.exit(1) - def _resolve_idempotency_key(self, request): # Check the context's meta for an idempotency key. This will be used # to identify duplicate workflow submissions and handle them according