Skip to content

Commit

Permalink
style: refactor static labels feat to be more compact
Browse files Browse the repository at this point in the history
  • Loading branch information
DanArmor committed Dec 11, 2024
1 parent 4a899c7 commit 031ee98
Showing 1 changed file with 149 additions and 34 deletions.
183 changes: 149 additions & 34 deletions src/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,120 @@
import re
import sys
import time

from collections import defaultdict
from typing import Callable, Optional

import prometheus_client

from celery import Celery
from celery.events.state import State # type: ignore
from celery.utils import nodesplit # type: ignore
from celery.utils.time import utcoffset # type: ignore
from kombu.exceptions import ChannelError # type: ignore
from loguru import logger
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram
from prometheus_client import CollectorRegistry

from .http_server import start_http_server


class Counter(prometheus_client.Counter):
# pylint: disable=too-many-arguments,too-many-positional-arguments
def __init__(
self,
name,
documentation,
labelnames=...,
namespace="",
subsystem="",
unit="",
registry=...,
_labelvalues=None,
static_labels=None,
):
self.static_labels = static_labels or {}
static_labels_keys = self.static_labels.keys() or []
super().__init__(
name,
documentation,
[*labelnames, *static_labels_keys],
namespace,
subsystem,
unit,
registry,
_labelvalues,
)

def labels(self, *labelvalues, **labelkwargs):
return super().labels(*labelvalues, **labelkwargs, **self.static_labels)


class Gauge(prometheus_client.Gauge):
# pylint: disable=too-many-arguments,too-many-positional-arguments
def __init__(
self,
name,
documentation,
labelnames=...,
namespace="",
subsystem="",
unit="",
registry=...,
_labelvalues=None,
multiprocess_mode="all",
static_labels=None,
):
self.static_labels = static_labels or {}
static_labels_keys = self.static_labels.keys() or []
super().__init__(
name,
documentation,
[*labelnames, *static_labels_keys],
namespace,
subsystem,
unit,
registry,
_labelvalues,
multiprocess_mode,
)

def labels(self, *labelvalues, **labelkwargs):
return super().labels(*labelvalues, **labelkwargs, **self.static_labels)


class Histogram(prometheus_client.Histogram):
# pylint: disable=too-many-arguments,too-many-positional-arguments
def __init__(
self,
name,
documentation,
labelnames=...,
namespace="",
subsystem="",
unit="",
registry=...,
_labelvalues=None,
buckets=...,
static_labels=None,
):
self.static_labels = static_labels or {}
static_labels_keys = self.static_labels.keys() or []
super().__init__(
name,
documentation,
[*labelnames, *static_labels_keys],
namespace,
subsystem,
unit,
registry,
_labelvalues,
buckets,
)

def labels(self, *labelvalues, **labelkwargs):
return super().labels(*labelvalues, **labelkwargs, **self.static_labels)


class Exporter: # pylint: disable=too-many-instance-attributes,too-many-branches
state: State = None

Expand All @@ -40,34 +140,34 @@ def __init__(
)
self.generic_hostname_task_sent_metric = generic_hostname_task_sent_metric

# Static labels
self.static_label = static_label or {}
self.static_label_keys = self.static_label.keys()

self.state_counters = {
"task-sent": Counter(
f"{metric_prefix}task_sent",
"Sent when a task message is published.",
["name", "hostname", "queue_name", *self.static_label_keys],
["name", "hostname", "queue_name"],
registry=self.registry,
static_labels=static_label,
),
"task-received": Counter(
f"{metric_prefix}task_received",
"Sent when the worker receives a task.",
["name", "hostname", "queue_name", *self.static_label_keys],
["name", "hostname", "queue_name"],
registry=self.registry,
static_labels=static_label,
),
"task-started": Counter(
f"{metric_prefix}task_started",
"Sent just before the worker executes the task.",
["name", "hostname", "queue_name", *self.static_label_keys],
["name", "hostname", "queue_name"],
registry=self.registry,
static_labels=static_label,
),
"task-succeeded": Counter(
f"{metric_prefix}task_succeeded",
"Sent if the task executed successfully.",
["name", "hostname", "queue_name", *self.static_label_keys],
["name", "hostname", "queue_name"],
registry=self.registry,
static_labels=static_label,
),
"task-failed": Counter(
f"{metric_prefix}task_failed",
Expand All @@ -77,72 +177,82 @@ def __init__(
"hostname",
"exception",
"queue_name",
*self.static_label_keys,
],
registry=self.registry,
static_labels=static_label,
),
"task-rejected": Counter(
f"{metric_prefix}task_rejected",
# pylint: disable=line-too-long
"The task was rejected by the worker, possibly to be re-queued or moved to a dead letter queue.",
["name", "hostname", "queue_name", *self.static_label_keys],
["name", "hostname", "queue_name"],
registry=self.registry,
static_labels=static_label,
),
"task-revoked": Counter(
f"{metric_prefix}task_revoked",
"Sent if the task has been revoked.",
["name", "hostname", "queue_name", *self.static_label_keys],
["name", "hostname", "queue_name"],
registry=self.registry,
static_labels=static_label,
),
"task-retried": Counter(
f"{metric_prefix}task_retried",
"Sent if the task failed, but will be retried in the future.",
["name", "hostname", "queue_name", *self.static_label_keys],
["name", "hostname", "queue_name"],
registry=self.registry,
static_labels=static_label,
),
}
self.celery_worker_up = Gauge(
f"{metric_prefix}worker_up",
"Indicates if a worker has recently sent a heartbeat.",
["hostname", *self.static_label_keys],
["hostname"],
registry=self.registry,
static_labels=static_label,
)
self.worker_tasks_active = Gauge(
f"{metric_prefix}worker_tasks_active",
"The number of tasks the worker is currently processing",
["hostname", *self.static_label_keys],
["hostname"],
registry=self.registry,
static_labels=static_label,
)
self.celery_task_runtime = Histogram(
f"{metric_prefix}task_runtime",
"Histogram of task runtime measurements.",
["name", "hostname", "queue_name", *self.static_label_keys],
["name", "hostname", "queue_name"],
registry=self.registry,
static_labels=static_label,
buckets=buckets or Histogram.DEFAULT_BUCKETS,
)
self.celery_queue_length = Gauge(
f"{metric_prefix}queue_length",
"The number of message in broker queue.",
["queue_name", *self.static_label_keys],
["queue_name"],
registry=self.registry,
static_labels=static_label,
)
self.celery_active_consumer_count = Gauge(
f"{metric_prefix}active_consumer_count",
"The number of active consumer in broker queue.",
["queue_name", *self.static_label_keys],
["queue_name"],
registry=self.registry,
static_labels=static_label,
)
self.celery_active_worker_count = Gauge(
f"{metric_prefix}active_worker_count",
"The number of active workers in broker queue.",
["queue_name", *self.static_label_keys],
["queue_name"],
registry=self.registry,
static_labels=static_label,
)
self.celery_active_process_count = Gauge(
f"{metric_prefix}active_process_count",
"The number of active processes in broker queue.",
["queue_name", *self.static_label_keys],
["queue_name"],
registry=self.registry,
static_labels=static_label,
)

def scrape(self):
Expand All @@ -155,10 +265,12 @@ def scrape(self):

def forget_worker(self, hostname):
if hostname in self.worker_last_seen:
self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(0)
self.worker_tasks_active.labels(hostname=hostname, **self.static_label).set(
0
)
self.celery_worker_up.labels(
hostname=hostname,
).set(0)
self.worker_tasks_active.labels(
hostname=hostname,
).set(0)
logger.debug(
"Updated gauge='{}' value='{}'", self.worker_tasks_active._name, 0
)
Expand Down Expand Up @@ -253,19 +365,19 @@ def track_queue_metrics(self):
if transport in ["amqp", "amqps", "memory"]:
consumer_count = rabbitmq_queue_consumer_count(connection, queue)
self.celery_active_consumer_count.labels(
queue_name=queue, **self.static_label
queue_name=queue,
).set(consumer_count)

self.celery_active_process_count.labels(
queue_name=queue, **self.static_label
queue_name=queue,
).set(processes_per_queue[queue])
self.celery_active_worker_count.labels(
queue_name=queue, **self.static_label
queue_name=queue,
).set(workers_per_queue[queue])
length = queue_length(transport, connection, queue)
if length is not None:
self.celery_queue_length.labels(
queue_name=queue, **self.static_label
queue_name=queue,
).set(length)

def track_task_event(self, event):
Expand All @@ -280,7 +392,6 @@ def track_task_event(self, event):
"name": task.name,
"hostname": get_hostname(task.hostname),
"queue_name": getattr(task, "queue", "celery"),
**self.static_label,
}
if event["type"] == "task-sent" and self.generic_hostname_task_sent_metric:
labels["hostname"] = "generic"
Expand Down Expand Up @@ -320,7 +431,9 @@ def track_worker_status(self, event, is_online):
event_name = "worker-online" if is_online else "worker-offline"
hostname = get_hostname(event["hostname"])
logger.debug("Received event='{}' for hostname='{}'", event_name, hostname)
self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(value)
self.celery_worker_up.labels(
hostname=hostname,
).set(value)

if is_online:
self.worker_last_seen[hostname] = {
Expand All @@ -343,10 +456,12 @@ def track_worker_heartbeat(self, event):
worker_state = self.state.event(event)[0][0]
active = worker_state.active or 0
up = 1 if worker_state.alive else 0
self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(up)
self.worker_tasks_active.labels(hostname=hostname, **self.static_label).set(
active
)
self.celery_worker_up.labels(
hostname=hostname,
).set(up)
self.worker_tasks_active.labels(
hostname=hostname,
).set(active)
logger.debug(
"Updated gauge='{}' value='{}'", self.worker_tasks_active._name, active
)
Expand Down

0 comments on commit 031ee98

Please sign in to comment.