From 20c17201f01d3513163b52f8868b2a536bcbb810 Mon Sep 17 00:00:00 2001 From: Thomas Versteeg Date: Mon, 23 Oct 2023 15:56:12 +0200 Subject: [PATCH] feat: metric-prefix configuration For setting a custom prefix for all metrics. --- src/cli.py | 8 ++++++++ src/exporter.py | 31 ++++++++++++++++--------------- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/cli.py b/src/cli.py index a654e57..684a448 100644 --- a/src/cli.py +++ b/src/cli.py @@ -109,6 +109,12 @@ def _comma_seperated_argument(_ctx, _param, value): "Queues not included in this setting will not appear in metrics until at least one worker has " "been seen to follow that queue.", ) +@click.option( + "--metric-prefix", + default="celery_", + help="Prefix all metrics with a string. " + "This option replaces the 'celery_*' part with a custom prefix. ", +) def cli( # pylint: disable=too-many-arguments,too-many-locals broker_url, broker_transport_option, @@ -123,6 +129,7 @@ def cli( # pylint: disable=too-many-arguments,too-many-locals purge_offline_worker_metrics, generic_hostname_task_sent_metric, queues, + metric_prefix, ): # pylint: disable=unused-argument formatted_buckets = list(map(float, buckets.split(","))) ctx = click.get_current_context() @@ -132,4 +139,5 @@ def cli( # pylint: disable=too-many-arguments,too-many-locals purge_offline_worker_metrics, generic_hostname_task_sent_metric, queues, + metric_prefix, ).run(ctx.params) diff --git a/src/exporter.py b/src/exporter.py index e5dc1b5..26f4abe 100644 --- a/src/exporter.py +++ b/src/exporter.py @@ -28,6 +28,7 @@ def __init__( purge_offline_worker_metrics_seconds=10 * 60, generic_hostname_task_sent_metric=False, initial_queues=None, + metric_prefix="celery_", ): self.registry = CollectorRegistry(auto_describe=True) self.queue_cache = set(initial_queues or []) @@ -39,94 +40,94 @@ def __init__( self.generic_hostname_task_sent_metric = generic_hostname_task_sent_metric self.state_counters = { "task-sent": Counter( - "celery_task_sent", + f"{metric_prefix}task_sent", "Sent when a task message is published.", ["name", "hostname", "queue_name"], registry=self.registry, ), "task-received": Counter( - "celery_task_received", + f"{metric_prefix}task_received", "Sent when the worker receives a task.", ["name", "hostname", "queue_name"], registry=self.registry, ), "task-started": Counter( - "celery_task_started", + f"{metric_prefix}task_started", "Sent just before the worker executes the task.", ["name", "hostname", "queue_name"], registry=self.registry, ), "task-succeeded": Counter( - "celery_task_succeeded", + f"{metric_prefix}task_succeeded", "Sent if the task executed successfully.", ["name", "hostname", "queue_name"], registry=self.registry, ), "task-failed": Counter( - "celery_task_failed", + f"{metric_prefix}task_failed", "Sent if the execution of the task failed.", ["name", "hostname", "exception", "queue_name"], registry=self.registry, ), "task-rejected": Counter( - "celery_task_rejected", + f"{metric_prefix}task_rejected", # pylint: disable=line-too-long "The task was rejected by the worker, possibly to be re-queued or moved to a dead letter queue.", ["name", "hostname", "queue_name"], registry=self.registry, ), "task-revoked": Counter( - "celery_task_revoked", + f"{metric_prefix}task_revoked", "Sent if the task has been revoked.", ["name", "hostname", "queue_name"], registry=self.registry, ), "task-retried": Counter( - "celery_task_retried", + f"{metric_prefix}task_retried", "Sent if the task failed, but will be retried in the future.", ["name", "hostname", "queue_name"], registry=self.registry, ), } self.celery_worker_up = Gauge( - "celery_worker_up", + f"{metric_prefix}worker_up", "Indicates if a worker has recently sent a heartbeat.", ["hostname"], registry=self.registry, ) self.worker_tasks_active = Gauge( - "celery_worker_tasks_active", + f"{metric_prefix}worker_tasks_active", "The number of tasks the worker is currently processing", ["hostname"], registry=self.registry, ) self.celery_task_runtime = Histogram( - "celery_task_runtime", + f"{metric_prefix}task_runtime", "Histogram of task runtime measurements.", ["name", "hostname", "queue_name"], registry=self.registry, buckets=buckets or Histogram.DEFAULT_BUCKETS, ) self.celery_queue_length = Gauge( - "celery_queue_length", + f"{metric_prefix}queue_length", "The number of message in broker queue.", ["queue_name"], registry=self.registry, ) self.celery_active_consumer_count = Gauge( - "celery_active_consumer_count", + f"{metric_prefix}active_consumer_count", "The number of active consumer in broker queue.", ["queue_name"], registry=self.registry, ) self.celery_active_worker_count = Gauge( - "celery_active_worker_count", + f"{metric_prefix}active_worker_count", "The number of active workers in broker queue.", ["queue_name"], registry=self.registry, ) self.celery_active_process_count = Gauge( - "celery_active_process_count", + f"{metric_prefix}active_process_count", "The number of active processes in broker queue.", ["queue_name"], registry=self.registry,