From 6f90651c289fd28ffdc0af91f01d7959f1b413ec Mon Sep 17 00:00:00 2001 From: Michael van Rooijen Date: Tue, 15 Oct 2024 12:12:02 +0200 Subject: [PATCH] Disable Celery heartbeat and manually close channel and connection --- hirefire_resource/macro/celery.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/hirefire_resource/macro/celery.py b/hirefire_resource/macro/celery.py index 3e5230a..900638e 100644 --- a/hirefire_resource/macro/celery.py +++ b/hirefire_resource/macro/celery.py @@ -104,7 +104,7 @@ def job_queue_latency(*queues, broker_url=None): else: broker_url = "redis://localhost:6379/0" - app = Celery(broker=broker_url) + app = Celery(broker=broker_url, broker_heartbeat=0) try: with app.connection_or_acquire() as connection: @@ -115,9 +115,10 @@ def job_queue_latency(*queues, broker_url=None): fn = _job_queue_latency_rabbitmq return max(fn(channel, queue) for queue in queues) - except OperationalError: return 0 + finally: + app.close() async def async_job_queue_latency(*queues, broker_url=None): @@ -251,7 +252,7 @@ def job_queue_size(*queues, broker_url=None): else: broker_url = "redis://localhost:6379/0" - app = Celery(broker=broker_url) + app = Celery(broker=broker_url, broker_heartbeat=0) try: with app.connection_or_acquire() as connection: @@ -259,9 +260,10 @@ def job_queue_size(*queues, broker_url=None): worker_task_count = _job_queue_size_worker(app, queues) broker_task_count = _job_queue_size_broker(channel, queues) return worker_task_count + broker_task_count - except OperationalError: return 0 + finally: + app.close() async def async_job_queue_size(*queues, broker_url=None):