From 031ee986e2e368ad53f37d0c93b1cbddc05f537c Mon Sep 17 00:00:00 2001 From: DanArmor Date: Wed, 11 Dec 2024 20:18:12 +0300 Subject: [PATCH] style: refactor static labels feat to be more compact --- src/exporter.py | 183 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 149 insertions(+), 34 deletions(-) diff --git a/src/exporter.py b/src/exporter.py index 609227a..bac0084 100644 --- a/src/exporter.py +++ b/src/exporter.py @@ -3,20 +3,120 @@ import re import sys import time + from collections import defaultdict from typing import Callable, Optional +import prometheus_client + from celery import Celery from celery.events.state import State # type: ignore from celery.utils import nodesplit # type: ignore from celery.utils.time import utcoffset # type: ignore from kombu.exceptions import ChannelError # type: ignore from loguru import logger -from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram +from prometheus_client import CollectorRegistry from .http_server import start_http_server +class Counter(prometheus_client.Counter): + # pylint: disable=too-many-arguments,too-many-positional-arguments + def __init__( + self, + name, + documentation, + labelnames=..., + namespace="", + subsystem="", + unit="", + registry=..., + _labelvalues=None, + static_labels=None, + ): + self.static_labels = static_labels or {} + static_labels_keys = self.static_labels.keys() or [] + super().__init__( + name, + documentation, + [*labelnames, *static_labels_keys], + namespace, + subsystem, + unit, + registry, + _labelvalues, + ) + + def labels(self, *labelvalues, **labelkwargs): + return super().labels(*labelvalues, **labelkwargs, **self.static_labels) + + +class Gauge(prometheus_client.Gauge): + # pylint: disable=too-many-arguments,too-many-positional-arguments + def __init__( + self, + name, + documentation, + labelnames=..., + namespace="", + subsystem="", + unit="", + registry=..., + _labelvalues=None, + multiprocess_mode="all", + static_labels=None, + ): + self.static_labels = static_labels or {} + static_labels_keys = self.static_labels.keys() or [] + super().__init__( + name, + documentation, + [*labelnames, *static_labels_keys], + namespace, + subsystem, + unit, + registry, + _labelvalues, + multiprocess_mode, + ) + + def labels(self, *labelvalues, **labelkwargs): + return super().labels(*labelvalues, **labelkwargs, **self.static_labels) + + +class Histogram(prometheus_client.Histogram): + # pylint: disable=too-many-arguments,too-many-positional-arguments + def __init__( + self, + name, + documentation, + labelnames=..., + namespace="", + subsystem="", + unit="", + registry=..., + _labelvalues=None, + buckets=..., + static_labels=None, + ): + self.static_labels = static_labels or {} + static_labels_keys = self.static_labels.keys() or [] + super().__init__( + name, + documentation, + [*labelnames, *static_labels_keys], + namespace, + subsystem, + unit, + registry, + _labelvalues, + buckets, + ) + + def labels(self, *labelvalues, **labelkwargs): + return super().labels(*labelvalues, **labelkwargs, **self.static_labels) + + class Exporter: # pylint: disable=too-many-instance-attributes,too-many-branches state: State = None @@ -40,34 +140,34 @@ def __init__( ) self.generic_hostname_task_sent_metric = generic_hostname_task_sent_metric - # Static labels - self.static_label = static_label or {} - self.static_label_keys = self.static_label.keys() - self.state_counters = { "task-sent": Counter( f"{metric_prefix}task_sent", "Sent when a task message is published.", - ["name", "hostname", "queue_name", *self.static_label_keys], + ["name", "hostname", "queue_name"], registry=self.registry, + static_labels=static_label, ), "task-received": Counter( f"{metric_prefix}task_received", "Sent when the worker receives a task.", - ["name", "hostname", "queue_name", *self.static_label_keys], + ["name", "hostname", "queue_name"], registry=self.registry, + static_labels=static_label, ), "task-started": Counter( f"{metric_prefix}task_started", "Sent just before the worker executes the task.", - ["name", "hostname", "queue_name", *self.static_label_keys], + ["name", "hostname", "queue_name"], registry=self.registry, + static_labels=static_label, ), "task-succeeded": Counter( f"{metric_prefix}task_succeeded", "Sent if the task executed successfully.", - ["name", "hostname", "queue_name", *self.static_label_keys], + ["name", "hostname", "queue_name"], registry=self.registry, + static_labels=static_label, ), "task-failed": Counter( f"{metric_prefix}task_failed", @@ -77,72 +177,82 @@ def __init__( "hostname", "exception", "queue_name", - *self.static_label_keys, ], registry=self.registry, + static_labels=static_label, ), "task-rejected": Counter( f"{metric_prefix}task_rejected", # pylint: disable=line-too-long "The task was rejected by the worker, possibly to be re-queued or moved to a dead letter queue.", - ["name", "hostname", "queue_name", *self.static_label_keys], + ["name", "hostname", "queue_name"], registry=self.registry, + static_labels=static_label, ), "task-revoked": Counter( f"{metric_prefix}task_revoked", "Sent if the task has been revoked.", - ["name", "hostname", "queue_name", *self.static_label_keys], + ["name", "hostname", "queue_name"], registry=self.registry, + static_labels=static_label, ), "task-retried": Counter( f"{metric_prefix}task_retried", "Sent if the task failed, but will be retried in the future.", - ["name", "hostname", "queue_name", *self.static_label_keys], + ["name", "hostname", "queue_name"], registry=self.registry, + static_labels=static_label, ), } self.celery_worker_up = Gauge( f"{metric_prefix}worker_up", "Indicates if a worker has recently sent a heartbeat.", - ["hostname", *self.static_label_keys], + ["hostname"], registry=self.registry, + static_labels=static_label, ) self.worker_tasks_active = Gauge( f"{metric_prefix}worker_tasks_active", "The number of tasks the worker is currently processing", - ["hostname", *self.static_label_keys], + ["hostname"], registry=self.registry, + static_labels=static_label, ) self.celery_task_runtime = Histogram( f"{metric_prefix}task_runtime", "Histogram of task runtime measurements.", - ["name", "hostname", "queue_name", *self.static_label_keys], + ["name", "hostname", "queue_name"], registry=self.registry, + static_labels=static_label, buckets=buckets or Histogram.DEFAULT_BUCKETS, ) self.celery_queue_length = Gauge( f"{metric_prefix}queue_length", "The number of message in broker queue.", - ["queue_name", *self.static_label_keys], + ["queue_name"], registry=self.registry, + static_labels=static_label, ) self.celery_active_consumer_count = Gauge( f"{metric_prefix}active_consumer_count", "The number of active consumer in broker queue.", - ["queue_name", *self.static_label_keys], + ["queue_name"], registry=self.registry, + static_labels=static_label, ) self.celery_active_worker_count = Gauge( f"{metric_prefix}active_worker_count", "The number of active workers in broker queue.", - ["queue_name", *self.static_label_keys], + ["queue_name"], registry=self.registry, + static_labels=static_label, ) self.celery_active_process_count = Gauge( f"{metric_prefix}active_process_count", "The number of active processes in broker queue.", - ["queue_name", *self.static_label_keys], + ["queue_name"], registry=self.registry, + static_labels=static_label, ) def scrape(self): @@ -155,10 +265,12 @@ def scrape(self): def forget_worker(self, hostname): if hostname in self.worker_last_seen: - self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(0) - self.worker_tasks_active.labels(hostname=hostname, **self.static_label).set( - 0 - ) + self.celery_worker_up.labels( + hostname=hostname, + ).set(0) + self.worker_tasks_active.labels( + hostname=hostname, + ).set(0) logger.debug( "Updated gauge='{}' value='{}'", self.worker_tasks_active._name, 0 ) @@ -253,19 +365,19 @@ def track_queue_metrics(self): if transport in ["amqp", "amqps", "memory"]: consumer_count = rabbitmq_queue_consumer_count(connection, queue) self.celery_active_consumer_count.labels( - queue_name=queue, **self.static_label + queue_name=queue, ).set(consumer_count) self.celery_active_process_count.labels( - queue_name=queue, **self.static_label + queue_name=queue, ).set(processes_per_queue[queue]) self.celery_active_worker_count.labels( - queue_name=queue, **self.static_label + queue_name=queue, ).set(workers_per_queue[queue]) length = queue_length(transport, connection, queue) if length is not None: self.celery_queue_length.labels( - queue_name=queue, **self.static_label + queue_name=queue, ).set(length) def track_task_event(self, event): @@ -280,7 +392,6 @@ def track_task_event(self, event): "name": task.name, "hostname": get_hostname(task.hostname), "queue_name": getattr(task, "queue", "celery"), - **self.static_label, } if event["type"] == "task-sent" and self.generic_hostname_task_sent_metric: labels["hostname"] = "generic" @@ -320,7 +431,9 @@ def track_worker_status(self, event, is_online): event_name = "worker-online" if is_online else "worker-offline" hostname = get_hostname(event["hostname"]) logger.debug("Received event='{}' for hostname='{}'", event_name, hostname) - self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(value) + self.celery_worker_up.labels( + hostname=hostname, + ).set(value) if is_online: self.worker_last_seen[hostname] = { @@ -343,10 +456,12 @@ def track_worker_heartbeat(self, event): worker_state = self.state.event(event)[0][0] active = worker_state.active or 0 up = 1 if worker_state.alive else 0 - self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(up) - self.worker_tasks_active.labels(hostname=hostname, **self.static_label).set( - active - ) + self.celery_worker_up.labels( + hostname=hostname, + ).set(up) + self.worker_tasks_active.labels( + hostname=hostname, + ).set(active) logger.debug( "Updated gauge='{}' value='{}'", self.worker_tasks_active._name, active )