Skip to content

Commit

Permalink
Bugfix, Add find_queue_by_task method
Browse files Browse the repository at this point in the history
  • Loading branch information
TheLazzziest committed Nov 26, 2024
1 parent d614cf1 commit c3aea64
Showing 1 changed file with 26 additions and 2 deletions.
28 changes: 26 additions & 2 deletions src/exporter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# pylint: disable=protected-access,,attribute-defined-outside-init
from functools import lru_cache
import json
import re
import sys
Expand All @@ -7,7 +8,7 @@
from typing import Callable, Optional

from celery import Celery
from celery.events.state import State # type: ignore
from celery.events.state import State, Task # type: ignore
from celery.utils import nodesplit # type: ignore
from celery.utils.time import utcoffset # type: ignore
from kombu.exceptions import ChannelError # type: ignore
Expand Down Expand Up @@ -141,6 +142,27 @@ def scrape(self):
self.track_timed_out_workers()
self.track_queue_metrics()

@lru_cache(maxsize=32)
def find_queue_by_task(self, target: Task) -> str:
"""Provider a queue name based on metadata coming from eiether a worker or a task being processed by it"""

try:
queue_name = None
# https://github.com/celery/celery/issues/5321
# task_info = self.app.control.inspect().query_task(task.id)
# As the received tasks are considered as active ones
task_set = self.app.control.inspect().registered().get(target.hostname)
task_info = [task for task in task_set if task['id'] == target.id].pop()
queue_name = task_info["delivery_info"]["routing_key"]
except TimeoutError as error:
# the broker doesn't respond
logger.error(f"Couldn't fetch the task info of {target.id}: {error.strerror}")
except IndexError as error:
# couldn't find the target task by id
# the dictionary path is missing
logger.warning(f"Couldn't find the target task by its id: {target.id}")
return queue_name

def forget_worker(self, hostname):
if hostname in self.worker_last_seen:
self.celery_worker_up.labels(hostname=hostname).set(0)
Expand Down Expand Up @@ -263,7 +285,9 @@ def track_task_event(self, event):
labels = {
"name": task.name,
"hostname": get_hostname(task.hostname),
"queue_name": getattr(task, "queue", "celery"),
# queue property should be available when a task is called with a passed queue name
# otherwise, we need to query its meta using the celery instance
"queue_name": getattr(task, "queue", self.find_queue_by_task(task) or "celery"),
}
if event["type"] == "task-sent" and self.generic_hostname_task_sent_metric:
labels["hostname"] = "generic"
Expand Down

0 comments on commit c3aea64

Please sign in to comment.