diff --git a/conftest.py b/conftest.py index b92883a..d60c936 100644 --- a/conftest.py +++ b/conftest.py @@ -1,7 +1,9 @@ import socket import threading +import time import pytest +from celery import shared_task from src.exporter import Exporter @@ -117,3 +119,19 @@ def threaded_exporter(exporter_instance): @pytest.fixture() def hostname(): return socket.gethostname() + + +@pytest.fixture(autouse=True) +def _purge(celery_app): + celery_app.control.purge() + + +@shared_task(ignore_result=True) +def timeout_task(timeout=0): + time.sleep(timeout) + + +@shared_task(bind=True, ignore_result=True) +def failing_task(self, fail_n_times: int, countdown=None): + if fail_n_times > self.request.retries: + self.retry(countdown=countdown) diff --git a/src/exporter.py b/src/exporter.py index 26f4abe..542041a 100644 --- a/src/exporter.py +++ b/src/exporter.py @@ -3,16 +3,15 @@ import re import sys import time - from collections import defaultdict from typing import Optional -from celery import Celery +from celery import Celery, states from celery.events.state import State # type: ignore from celery.utils import nodesplit # type: ignore from kombu.exceptions import ChannelError # type: ignore from loguru import logger -from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram +from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Summary from .http_server import start_http_server @@ -132,6 +131,12 @@ def __init__( ["queue_name"], registry=self.registry, ) + self.celery_time_in_queue = Summary( + f"{metric_prefix}time_in_queue", + "The time task spends in broker queue.", + ["queue_name"], + registry=self.registry, + ) def scrape(self): if ( @@ -253,6 +258,9 @@ def track_queue_metrics(self): self.celery_queue_length.labels(queue_name=queue).set(length) def track_task_event(self, event): + previous_task_state = getattr( + self.state.tasks.get(event["uuid"]), "state", None + ) self.state.event(event) task = self.state.tasks.get(event["uuid"]) logger.debug("Received event='{}' for task='{}'", event["type"], task.name) @@ -260,10 +268,11 @@ def track_task_event(self, event): if event["type"] not in self.state_counters: logger.warning("No counter matches task state='{}'", task.state) + queue_name = getattr(task, "queue", "celery") labels = { "name": task.name, "hostname": get_hostname(task.hostname), - "queue_name": getattr(task, "queue", "celery"), + "queue_name": queue_name, } if event["type"] == "task-sent" and self.generic_hostname_task_sent_metric: labels["hostname"] = "generic" @@ -296,6 +305,23 @@ def track_task_event(self, event): labels, task.runtime, ) + elif event["type"] in ("task-received", "task-revoked"): + was_terminated = event.get("terminated", False) + + if all( + [ + # tasks with ETA do not go through the queue => skip them + task.eta is None, + task.sent is not None, + # terminated tasks were already processed during "receive" => ignore them + was_terminated is False, + # `task-revoked` can be sent multiple times; track only the first one + # https://docs.celeryq.dev/en/stable/userguide/monitoring.html#task-revoked + previous_task_state != states.REVOKED, + ] + ): + time_in_queue = event["timestamp"] - task.sent + self.celery_time_in_queue.labels(queue_name).observe(time_in_queue) def track_worker_status(self, event, is_online): value = 1 if is_online else 0 diff --git a/src/test_metrics.py b/src/test_metrics.py index a0b7141..9874906 100644 --- a/src/test_metrics.py +++ b/src/test_metrics.py @@ -1,9 +1,12 @@ import logging import time +from unittest.mock import ANY import pytest from celery.contrib.testing.worker import start_worker # type: ignore +from conftest import timeout_task, failing_task + @pytest.fixture def assert_exporter_metric_called(mocker, celery_app, celery_worker, hostname): @@ -190,3 +193,106 @@ def succeed(): ) == 1.0 ) + + +@pytest.mark.celery +def test_time_in_queue(threaded_exporter, celery_app): + with start_worker(celery_app, perform_ping_check=False): + timeout_task.delay().get(timeout=10) + + # this is necessary otherwise asserts do not match expectation + time.sleep(2) + + tasks_in_queue = threaded_exporter.registry.get_sample_value( + "celery_time_in_queue_count", labels={"queue_name": "celery"} + ) + time_in_queue_sum = threaded_exporter.registry.get_sample_value( + "celery_time_in_queue_sum", labels={"queue_name": "celery"} + ) + + assert tasks_in_queue == 1 + assert 0 < time_in_queue_sum < 1.0 + + +@pytest.mark.celery +def test_time_in_queue_ignores_eta(threaded_exporter, celery_app): + with start_worker(celery_app, perform_ping_check=False): + timeout_task.apply_async(countdown=0).get(timeout=10) + + # this is necessary otherwise asserts do not match expectation + time.sleep(2) + + tasks_started = threaded_exporter.registry.get_sample_value( + "celery_task_started_total", labels=ANY + ) + assert tasks_started == 1 + assert "celery_time_in_queue_count" not in threaded_exporter.registry.collect() + + +@pytest.mark.celery +def test_time_in_queue_retries(threaded_exporter, celery_app): + with start_worker(celery_app, perform_ping_check=False): + failing_task.delay(fail_n_times=3, countdown=0.1).get(timeout=10) + + # this is necessary otherwise asserts do not match expectation + time.sleep(2) + + tasks_started = threaded_exporter.registry.get_sample_value( + "celery_task_started_total", labels=ANY + ) + tasks_trough_queue = threaded_exporter.registry.get_sample_value( + "celery_time_in_queue_count", labels={"queue_name": "celery"} + ) + + # Only the first execution goes through the queue, the rest is executed with ETA + # with event task-retried + assert tasks_started == 1 + assert tasks_trough_queue == 1 + + +@pytest.mark.celery +def test_time_in_queue_expires(threaded_exporter, celery_app): + with start_worker(celery_app, perform_ping_check=False): + result = timeout_task.delay(3) + timeout_task.apply_async(expires=1) + result.get(timeout=10) + + # this is necessary otherwise asserts do not match expectation + time.sleep(2) + threaded_exporter.scrape() + + tasks_started = threaded_exporter.registry.get_sample_value( + "celery_task_received_total", labels=ANY + ) + tasks_trough_queue = threaded_exporter.registry.get_sample_value( + "celery_time_in_queue_count", labels={"queue_name": "celery"} + ) + assert tasks_started == 1 + assert tasks_trough_queue == 2 + + +@pytest.mark.xfail( + reason="Sometimes the task is started before the revoke command is processed" +) +@pytest.mark.celery +def test_time_in_queue_revoke(threaded_exporter, celery_app): + with start_worker(celery_app, perform_ping_check=False): + first_result = timeout_task.delay(5) + revoked_result = timeout_task.delay() + revoked_result.revoke(terminate=True) + first_result.get(timeout=10) + + # this is necessary otherwise asserts do not match expectation + time.sleep(5) + threaded_exporter.scrape() + + tasks_started = threaded_exporter.registry.get_sample_value( + "celery_task_started_total", labels=ANY + ) + tasks_trough_queue = threaded_exporter.registry.get_sample_value( + "celery_time_in_queue_count", labels={"queue_name": "celery"} + ) + # Only the first task gets started, + # the second is revoked before being picked-up by the worker + assert tasks_started == 1 + assert tasks_trough_queue == 2