From 85109b79d74d022492bf199343496950c8573e57 Mon Sep 17 00:00:00 2001 From: DanArmor Date: Thu, 5 Dec 2024 12:48:19 +0300 Subject: [PATCH] feat: static labels introduced --- src/cli.py | 20 ++++++++++++++++++ src/exporter.py | 55 ++++++++++++++++++++++++++++--------------------- 2 files changed, 51 insertions(+), 24 deletions(-) diff --git a/src/cli.py b/src/cli.py index f28de29..a26788d 100644 --- a/src/cli.py +++ b/src/cli.py @@ -19,6 +19,16 @@ def _comma_seperated_argument(_ctx, _param, value): return value.split(",") return [] +# Accepts value string in format "key=val". Returns dict {key: val}. +# * If value is None - returns empty dict +def _eq_sign_separated_argument_to_dict(_ctx, _param, value): + if value is not None: + dict_of_key_value_pairs = {} + for key_value_pair in value: + key, val = key_value_pair.split("=") + dict_of_key_value_pairs[key] = val + return dict_of_key_value_pairs + return {} @click.command(help=cmd_help) @click.option( @@ -115,6 +125,14 @@ def _comma_seperated_argument(_ctx, _param, value): help="Prefix all metrics with a string. " "This option replaces the 'celery_*' part with a custom prefix. ", ) +@click.option( + "--static-label", + required=False, + default=None, + multiple=True, + callback=_eq_sign_separated_argument_to_dict, + help="Add label with static value to all metrics", +) def cli( # pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals broker_url, broker_transport_option, @@ -130,6 +148,7 @@ def cli( # pylint: disable=too-many-arguments,too-many-positional-arguments,too generic_hostname_task_sent_metric, queues, metric_prefix, + static_label, ): # pylint: disable=unused-argument formatted_buckets = list(map(float, buckets.split(","))) ctx = click.get_current_context() @@ -140,4 +159,5 @@ def cli( # pylint: disable=too-many-arguments,too-many-positional-arguments,too generic_hostname_task_sent_metric, queues, metric_prefix, + static_label ).run(ctx.params) diff --git a/src/exporter.py b/src/exporter.py index 3e7f8f1..7ca073a 100644 --- a/src/exporter.py +++ b/src/exporter.py @@ -29,6 +29,7 @@ def __init__( generic_hostname_task_sent_metric=False, initial_queues=None, metric_prefix="celery_", + static_label=[] ): self.registry = CollectorRegistry(auto_describe=True) self.queue_cache = set(initial_queues or []) @@ -38,98 +39,103 @@ def __init__( purge_offline_worker_metrics_seconds ) self.generic_hostname_task_sent_metric = generic_hostname_task_sent_metric + + # Static labels + self.static_label = static_label + self.static_label_keys = static_label.keys() + self.state_counters = { "task-sent": Counter( f"{metric_prefix}task_sent", "Sent when a task message is published.", - ["name", "hostname", "queue_name"], + ["name", "hostname", "queue_name", *self.static_label_keys], registry=self.registry, ), "task-received": Counter( f"{metric_prefix}task_received", "Sent when the worker receives a task.", - ["name", "hostname", "queue_name"], + ["name", "hostname", "queue_name", *self.static_label_keys], registry=self.registry, ), "task-started": Counter( f"{metric_prefix}task_started", "Sent just before the worker executes the task.", - ["name", "hostname", "queue_name"], + ["name", "hostname", "queue_name", *self.static_label_keys], registry=self.registry, ), "task-succeeded": Counter( f"{metric_prefix}task_succeeded", "Sent if the task executed successfully.", - ["name", "hostname", "queue_name"], + ["name", "hostname", "queue_name", *self.static_label_keys], registry=self.registry, ), "task-failed": Counter( f"{metric_prefix}task_failed", "Sent if the execution of the task failed.", - ["name", "hostname", "exception", "queue_name"], + ["name", "hostname", "exception", "queue_name", *self.static_label_keys], registry=self.registry, ), "task-rejected": Counter( f"{metric_prefix}task_rejected", # pylint: disable=line-too-long "The task was rejected by the worker, possibly to be re-queued or moved to a dead letter queue.", - ["name", "hostname", "queue_name"], + ["name", "hostname", "queue_name", *self.static_label_keys], registry=self.registry, ), "task-revoked": Counter( f"{metric_prefix}task_revoked", "Sent if the task has been revoked.", - ["name", "hostname", "queue_name"], + ["name", "hostname", "queue_name", *self.static_label_keys], registry=self.registry, ), "task-retried": Counter( f"{metric_prefix}task_retried", "Sent if the task failed, but will be retried in the future.", - ["name", "hostname", "queue_name"], + ["name", "hostname", "queue_name", *self.static_label_keys], registry=self.registry, ), } self.celery_worker_up = Gauge( f"{metric_prefix}worker_up", "Indicates if a worker has recently sent a heartbeat.", - ["hostname"], + ["hostname", *self.static_label_keys], registry=self.registry, ) self.worker_tasks_active = Gauge( f"{metric_prefix}worker_tasks_active", "The number of tasks the worker is currently processing", - ["hostname"], + ["hostname", *self.static_label_keys], registry=self.registry, ) self.celery_task_runtime = Histogram( f"{metric_prefix}task_runtime", "Histogram of task runtime measurements.", - ["name", "hostname", "queue_name"], + ["name", "hostname", "queue_name", *self.static_label_keys], registry=self.registry, buckets=buckets or Histogram.DEFAULT_BUCKETS, ) self.celery_queue_length = Gauge( f"{metric_prefix}queue_length", "The number of message in broker queue.", - ["queue_name"], + ["queue_name", *self.static_label_keys], registry=self.registry, ) self.celery_active_consumer_count = Gauge( f"{metric_prefix}active_consumer_count", "The number of active consumer in broker queue.", - ["queue_name"], + ["queue_name", *self.static_label_keys], registry=self.registry, ) self.celery_active_worker_count = Gauge( f"{metric_prefix}active_worker_count", "The number of active workers in broker queue.", - ["queue_name"], + ["queue_name", *self.static_label_keys], registry=self.registry, ) self.celery_active_process_count = Gauge( f"{metric_prefix}active_process_count", "The number of active processes in broker queue.", - ["queue_name"], + ["queue_name", *self.static_label_keys], registry=self.registry, ) @@ -143,8 +149,8 @@ def scrape(self): def forget_worker(self, hostname): if hostname in self.worker_last_seen: - self.celery_worker_up.labels(hostname=hostname).set(0) - self.worker_tasks_active.labels(hostname=hostname).set(0) + self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(0) + self.worker_tasks_active.labels(hostname=hostname, **self.static_label).set(0) logger.debug( "Updated gauge='{}' value='{}'", self.worker_tasks_active._name, 0 ) @@ -238,19 +244,19 @@ def track_queue_metrics(self): for queue in self.queue_cache: if transport in ["amqp", "amqps", "memory"]: consumer_count = rabbitmq_queue_consumer_count(connection, queue) - self.celery_active_consumer_count.labels(queue_name=queue).set( + self.celery_active_consumer_count.labels(queue_name=queue, **self.static_label).set( consumer_count ) - self.celery_active_process_count.labels(queue_name=queue).set( + self.celery_active_process_count.labels(queue_name=queue, **self.static_label).set( processes_per_queue[queue] ) - self.celery_active_worker_count.labels(queue_name=queue).set( + self.celery_active_worker_count.labels(queue_name=queue, **self.static_label).set( workers_per_queue[queue] ) length = queue_length(transport, connection, queue) if length is not None: - self.celery_queue_length.labels(queue_name=queue).set(length) + self.celery_queue_length.labels(queue_name=queue, **self.static_label).set(length) def track_task_event(self, event): self.state.event(event) @@ -264,6 +270,7 @@ def track_task_event(self, event): "name": task.name, "hostname": get_hostname(task.hostname), "queue_name": getattr(task, "queue", "celery"), + **self.static_label } if event["type"] == "task-sent" and self.generic_hostname_task_sent_metric: labels["hostname"] = "generic" @@ -303,7 +310,7 @@ def track_worker_status(self, event, is_online): event_name = "worker-online" if is_online else "worker-offline" hostname = get_hostname(event["hostname"]) logger.debug("Received event='{}' for hostname='{}'", event_name, hostname) - self.celery_worker_up.labels(hostname=hostname).set(value) + self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(value) if is_online: self.worker_last_seen[hostname] = { @@ -326,8 +333,8 @@ def track_worker_heartbeat(self, event): worker_state = self.state.event(event)[0][0] active = worker_state.active or 0 up = 1 if worker_state.alive else 0 - self.celery_worker_up.labels(hostname=hostname).set(up) - self.worker_tasks_active.labels(hostname=hostname).set(active) + self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(up) + self.worker_tasks_active.labels(hostname=hostname, **self.static_label).set(active) logger.debug( "Updated gauge='{}' value='{}'", self.worker_tasks_active._name, active )