From 5907ddbf09c06d257dab4aad4240f5fb8c7b318a Mon Sep 17 00:00:00 2001 From: Reshab Das Date: Wed, 17 Jul 2024 13:26:18 +0530 Subject: [PATCH] fix: Use event timestamp based on utcoffset --- src/exporter.py | 19 ++++++++++++--- src/test_metrics.py | 59 +++++++++++++++++++++++++++++++++------------ 2 files changed, 60 insertions(+), 18 deletions(-) diff --git a/src/exporter.py b/src/exporter.py index 5a30844..c856953 100644 --- a/src/exporter.py +++ b/src/exporter.py @@ -5,11 +5,12 @@ import time from collections import defaultdict -from typing import Optional +from typing import Callable, Optional from celery import Celery from celery.events.state import State # type: ignore from celery.utils import nodesplit # type: ignore +from celery.utils.time import utcoffset # type: ignore from kombu.exceptions import ChannelError # type: ignore from loguru import logger from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram @@ -307,7 +308,9 @@ def track_worker_status(self, event, is_online): if is_online: self.worker_last_seen[hostname] = { - "ts": event["timestamp"], + "ts": reverse_adjust_timestamp( + event["timestamp"], event.get("utcoffset") + ), "forgotten": False, } else: @@ -317,7 +320,10 @@ def track_worker_heartbeat(self, event): hostname = get_hostname(event["hostname"]) logger.debug("Received event='{}' for worker='{}'", event["type"], hostname) - self.worker_last_seen[hostname] = {"ts": event["timestamp"], "forgotten": False} + self.worker_last_seen[hostname] = { + "ts": reverse_adjust_timestamp(event["timestamp"], event.get("utcoffset")), + "forgotten": False, + } worker_state = self.state.event(event)[0][0] active = worker_state.active or 0 up = 1 if worker_state.alive else 0 @@ -407,6 +413,13 @@ def run(self, click_params): exception_pattern = re.compile(r"^(\w+)\(") +def reverse_adjust_timestamp( + ts: float, offset: Optional[int] = None, here: Callable[..., float] = utcoffset +) -> float: + """Adjust timestamp in reverse of celery, based on provided utcoffset.""" + return ts + ((offset or 0) - here()) * 3600 + + def get_exception_class_name(exception_name: str): m = exception_pattern.match(exception_name) if m: diff --git a/src/test_metrics.py b/src/test_metrics.py index a0b7141..46dac6e 100644 --- a/src/test_metrics.py +++ b/src/test_metrics.py @@ -3,6 +3,9 @@ import pytest from celery.contrib.testing.worker import start_worker # type: ignore +from celery.utils.time import adjust_timestamp # type: ignore + +from src.exporter import reverse_adjust_timestamp @pytest.fixture @@ -70,9 +73,22 @@ def test_worker_status(threaded_exporter, celery_app, hostname): ) -def test_worker_timeout_status(threaded_exporter, hostname): - ts = time.time() - threaded_exporter.track_worker_status({"hostname": hostname, "timestamp": ts}, True) +@pytest.mark.parametrize( + "input_utcoffset, sleep_seconds, expected_metric_value", + [ + (None, 5, 0.0), + (0, 5, 0.0), + (7, 5, 0.0), + (7, 0, 1.0), + ], # Eg: PST (America/Los_Angeles) +) +def test_worker_timeout_status( + input_utcoffset, sleep_seconds, expected_metric_value, threaded_exporter, hostname +): + ts = adjust_timestamp(time.time(), (input_utcoffset or 0)) + threaded_exporter.track_worker_status( + {"hostname": hostname, "timestamp": ts, "utcoffset": input_utcoffset}, True + ) assert ( threaded_exporter.registry.get_sample_value( "celery_worker_up", labels={"hostname": hostname} @@ -81,22 +97,35 @@ def test_worker_timeout_status(threaded_exporter, hostname): ) assert threaded_exporter.worker_last_seen[hostname] == { "forgotten": False, - "ts": ts, + "ts": reverse_adjust_timestamp(ts, input_utcoffset), } - time.sleep(5) + time.sleep(sleep_seconds) threaded_exporter.scrape() assert ( threaded_exporter.registry.get_sample_value( "celery_worker_up", labels={"hostname": hostname} ) - == 0.0 + == expected_metric_value ) -def test_purge_offline_worker_metrics(threaded_exporter, hostname): - ts = time.time() - threaded_exporter.track_worker_status({"hostname": hostname, "timestamp": ts}, True) +@pytest.mark.parametrize( + "input_utcoffset, sleep_seconds, expected_metric_value", + [ + (None, 15, None), + (0, 15, None), + (7, 15, None), + (7, 0, 1.0), + ], # Eg: PST (America/Los_Angeles) +) +def test_purge_offline_worker_metrics( + input_utcoffset, sleep_seconds, expected_metric_value, threaded_exporter, hostname +): + ts = adjust_timestamp(time.time(), (input_utcoffset or 0)) + threaded_exporter.track_worker_status( + {"hostname": hostname, "timestamp": ts, "utcoffset": input_utcoffset}, True + ) threaded_exporter.worker_tasks_active.labels(hostname=hostname).inc() threaded_exporter.celery_task_runtime.labels( name="boosh", hostname=hostname, queue_name="test" @@ -134,36 +163,36 @@ def test_purge_offline_worker_metrics(threaded_exporter, hostname): assert threaded_exporter.worker_last_seen[hostname] == { "forgotten": False, - "ts": ts, + "ts": reverse_adjust_timestamp(ts, input_utcoffset), } - time.sleep(15) + time.sleep(sleep_seconds) threaded_exporter.scrape() assert ( threaded_exporter.registry.get_sample_value( "celery_worker_up", labels={"hostname": hostname} ) - is None + == expected_metric_value ) assert ( threaded_exporter.registry.get_sample_value( "celery_worker_tasks_active", labels={"hostname": hostname} ) - is None + == expected_metric_value ) assert ( threaded_exporter.registry.get_sample_value( "celery_task_runtime_count", labels={"hostname": hostname, "queue_name": "test", "name": "boosh"}, ) - is None + == expected_metric_value ) assert ( threaded_exporter.registry.get_sample_value( "celery_task_sent_total", labels={"hostname": hostname, "queue_name": "test", "name": "boosh"}, ) - is None + == expected_metric_value )