Skip to content

Commit

Permalink
fix: Use event timestamp based on utcoffset
Browse files Browse the repository at this point in the history
  • Loading branch information
reshab48 committed Jul 17, 2024
1 parent b1450bf commit 5907ddb
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 18 deletions.
19 changes: 16 additions & 3 deletions src/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import time

from collections import defaultdict
from typing import Optional
from typing import Callable, Optional

from celery import Celery
from celery.events.state import State # type: ignore
from celery.utils import nodesplit # type: ignore
from celery.utils.time import utcoffset # type: ignore
from kombu.exceptions import ChannelError # type: ignore
from loguru import logger
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram
Expand Down Expand Up @@ -307,7 +308,9 @@ def track_worker_status(self, event, is_online):

if is_online:
self.worker_last_seen[hostname] = {
"ts": event["timestamp"],
"ts": reverse_adjust_timestamp(
event["timestamp"], event.get("utcoffset")
),
"forgotten": False,
}
else:
Expand All @@ -317,7 +320,10 @@ def track_worker_heartbeat(self, event):
hostname = get_hostname(event["hostname"])
logger.debug("Received event='{}' for worker='{}'", event["type"], hostname)

self.worker_last_seen[hostname] = {"ts": event["timestamp"], "forgotten": False}
self.worker_last_seen[hostname] = {
"ts": reverse_adjust_timestamp(event["timestamp"], event.get("utcoffset")),
"forgotten": False,
}
worker_state = self.state.event(event)[0][0]
active = worker_state.active or 0
up = 1 if worker_state.alive else 0
Expand Down Expand Up @@ -407,6 +413,13 @@ def run(self, click_params):
exception_pattern = re.compile(r"^(\w+)\(")


def reverse_adjust_timestamp(
ts: float, offset: Optional[int] = None, here: Callable[..., float] = utcoffset
) -> float:
"""Adjust timestamp in reverse of celery, based on provided utcoffset."""
return ts + ((offset or 0) - here()) * 3600


def get_exception_class_name(exception_name: str):
m = exception_pattern.match(exception_name)
if m:
Expand Down
59 changes: 44 additions & 15 deletions src/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

import pytest
from celery.contrib.testing.worker import start_worker # type: ignore
from celery.utils.time import adjust_timestamp # type: ignore

from src.exporter import reverse_adjust_timestamp


@pytest.fixture
Expand Down Expand Up @@ -70,9 +73,22 @@ def test_worker_status(threaded_exporter, celery_app, hostname):
)


def test_worker_timeout_status(threaded_exporter, hostname):
ts = time.time()
threaded_exporter.track_worker_status({"hostname": hostname, "timestamp": ts}, True)
@pytest.mark.parametrize(
"input_utcoffset, sleep_seconds, expected_metric_value",
[
(None, 5, 0.0),
(0, 5, 0.0),
(7, 5, 0.0),
(7, 0, 1.0),
], # Eg: PST (America/Los_Angeles)
)
def test_worker_timeout_status(
input_utcoffset, sleep_seconds, expected_metric_value, threaded_exporter, hostname
):
ts = adjust_timestamp(time.time(), (input_utcoffset or 0))
threaded_exporter.track_worker_status(
{"hostname": hostname, "timestamp": ts, "utcoffset": input_utcoffset}, True
)
assert (
threaded_exporter.registry.get_sample_value(
"celery_worker_up", labels={"hostname": hostname}
Expand All @@ -81,22 +97,35 @@ def test_worker_timeout_status(threaded_exporter, hostname):
)
assert threaded_exporter.worker_last_seen[hostname] == {
"forgotten": False,
"ts": ts,
"ts": reverse_adjust_timestamp(ts, input_utcoffset),
}

time.sleep(5)
time.sleep(sleep_seconds)
threaded_exporter.scrape()
assert (
threaded_exporter.registry.get_sample_value(
"celery_worker_up", labels={"hostname": hostname}
)
== 0.0
== expected_metric_value
)


def test_purge_offline_worker_metrics(threaded_exporter, hostname):
ts = time.time()
threaded_exporter.track_worker_status({"hostname": hostname, "timestamp": ts}, True)
@pytest.mark.parametrize(
"input_utcoffset, sleep_seconds, expected_metric_value",
[
(None, 15, None),
(0, 15, None),
(7, 15, None),
(7, 0, 1.0),
], # Eg: PST (America/Los_Angeles)
)
def test_purge_offline_worker_metrics(
input_utcoffset, sleep_seconds, expected_metric_value, threaded_exporter, hostname
):
ts = adjust_timestamp(time.time(), (input_utcoffset or 0))
threaded_exporter.track_worker_status(
{"hostname": hostname, "timestamp": ts, "utcoffset": input_utcoffset}, True
)
threaded_exporter.worker_tasks_active.labels(hostname=hostname).inc()
threaded_exporter.celery_task_runtime.labels(
name="boosh", hostname=hostname, queue_name="test"
Expand Down Expand Up @@ -134,36 +163,36 @@ def test_purge_offline_worker_metrics(threaded_exporter, hostname):

assert threaded_exporter.worker_last_seen[hostname] == {
"forgotten": False,
"ts": ts,
"ts": reverse_adjust_timestamp(ts, input_utcoffset),
}

time.sleep(15)
time.sleep(sleep_seconds)
threaded_exporter.scrape()
assert (
threaded_exporter.registry.get_sample_value(
"celery_worker_up", labels={"hostname": hostname}
)
is None
== expected_metric_value
)
assert (
threaded_exporter.registry.get_sample_value(
"celery_worker_tasks_active", labels={"hostname": hostname}
)
is None
== expected_metric_value
)
assert (
threaded_exporter.registry.get_sample_value(
"celery_task_runtime_count",
labels={"hostname": hostname, "queue_name": "test", "name": "boosh"},
)
is None
== expected_metric_value
)
assert (
threaded_exporter.registry.get_sample_value(
"celery_task_sent_total",
labels={"hostname": hostname, "queue_name": "test", "name": "boosh"},
)
is None
== expected_metric_value
)


Expand Down

0 comments on commit 5907ddb

Please sign in to comment.