Skip to content

Commit

Permalink
Remove unused future implementation of seperate queues
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Jan 8, 2024
1 parent c832e31 commit b768953
Showing 1 changed file with 0 additions and 23 deletions.
23 changes: 0 additions & 23 deletions src/engine/src/core/Server.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,28 +83,6 @@ def __call__(self):
inbound_queue = self._declare_queue(channel, INBOUND_QUEUE, exclusive=True)
channel.queue_bind(exchange=INBOUND_EXCHANGE, queue=inbound_queue.method.queue)

# TODO Future implementation
# Deferred exchange and queue stores workflow submissions that await execution
# of workflows with the same idempotency key
channel.exchange_declare(DEFERRED_EXCHANGE, exchange_type=ExchangeType.fanout)
deferred_queue = self._declare_queue(channel, DEFERRED_QUEUE, exclusive=True)
channel.queue_bind(exchange=DEFERRED_EXCHANGE, queue=deferred_queue.method.queue)

# TODO Future Implementation
# Retry exchange and queue is a temporary hold for messages that havent been
# processed yet in the event that there are no workers available, or the workflow
# executor has an ApplicationError
channel.exchange_declare(RETRY_EXCHANGE, exchange_type=ExchangeType.fanout)
retry_queue = self._declare_queue(channel, RETRY_QUEUE, exclusive=True)
channel.queue_bind(exchange=RETRY_EXCHANGE, queue=retry_queue.method.queue)

# TODO Future Implementation
# Messages that are retried too many times get sent to the deadletter exchange
# for inspection
channel.exchange_declare(DEAD_LETTER_EXCHANGE, exchange_type=ExchangeType.fanout)
dead_letter_queue = self._declare_queue(channel, DEAD_LETTER_QUEUE, exclusive=True)
channel.queue_bind(exchange=DEAD_LETTER_EXCHANGE, queue=dead_letter_queue.method.queue)

# The threads that will be started within the on_message callback
threads = []

Expand Down Expand Up @@ -328,7 +306,6 @@ def _declare_queue(self, channel, queue, exclusive=True):
logger.critical(f"{lbuf('[SERVER]')} Exclusive queue declaration error for queue '{queue}' | {e}")
sys.exit(1)


def _resolve_idempotency_key(self, request):
# Check the context's meta for an idempotency key. This will be used
# to identify duplicate workflow submissions and handle them according
Expand Down

0 comments on commit b768953

Please sign in to comment.