Skip to content

Commit

Permalink
feat: metric-prefix configuration
Browse files Browse the repository at this point in the history
For setting a custom prefix for all metrics.
  • Loading branch information
tversteeg authored and danihodovic committed Oct 23, 2023
1 parent b6207be commit 7822070
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 15 deletions.
8 changes: 8 additions & 0 deletions src/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ def _comma_seperated_argument(_ctx, _param, value):
"Queues not included in this setting will not appear in metrics until at least one worker has "
"been seen to follow that queue.",
)
@click.option(
"--metric-prefix",
default="celery_",
help="Prefix all metrics with a string. "
"This option replaces the 'celery_*' part with a custom prefix. ",
)
def cli( # pylint: disable=too-many-arguments,too-many-locals
broker_url,
broker_transport_option,
Expand All @@ -123,6 +129,7 @@ def cli( # pylint: disable=too-many-arguments,too-many-locals
purge_offline_worker_metrics,
generic_hostname_task_sent_metric,
queues,
metric_prefix,
): # pylint: disable=unused-argument
formatted_buckets = list(map(float, buckets.split(",")))
ctx = click.get_current_context()
Expand All @@ -132,4 +139,5 @@ def cli( # pylint: disable=too-many-arguments,too-many-locals
purge_offline_worker_metrics,
generic_hostname_task_sent_metric,
queues,
metric_prefix,
).run(ctx.params)
31 changes: 16 additions & 15 deletions src/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __init__(
purge_offline_worker_metrics_seconds=10 * 60,
generic_hostname_task_sent_metric=False,
initial_queues=None,
metric_prefix="celery_",
):
self.registry = CollectorRegistry(auto_describe=True)
self.queue_cache = set(initial_queues or [])
Expand All @@ -39,94 +40,94 @@ def __init__(
self.generic_hostname_task_sent_metric = generic_hostname_task_sent_metric
self.state_counters = {
"task-sent": Counter(
"celery_task_sent",
f"{metric_prefix}task_sent",
"Sent when a task message is published.",
["name", "hostname", "queue_name"],
registry=self.registry,
),
"task-received": Counter(
"celery_task_received",
f"{metric_prefix}task_received",
"Sent when the worker receives a task.",
["name", "hostname", "queue_name"],
registry=self.registry,
),
"task-started": Counter(
"celery_task_started",
f"{metric_prefix}task_started",
"Sent just before the worker executes the task.",
["name", "hostname", "queue_name"],
registry=self.registry,
),
"task-succeeded": Counter(
"celery_task_succeeded",
f"{metric_prefix}task_succeeded",
"Sent if the task executed successfully.",
["name", "hostname", "queue_name"],
registry=self.registry,
),
"task-failed": Counter(
"celery_task_failed",
f"{metric_prefix}task_failed",
"Sent if the execution of the task failed.",
["name", "hostname", "exception", "queue_name"],
registry=self.registry,
),
"task-rejected": Counter(
"celery_task_rejected",
f"{metric_prefix}task_rejected",
# pylint: disable=line-too-long
"The task was rejected by the worker, possibly to be re-queued or moved to a dead letter queue.",
["name", "hostname", "queue_name"],
registry=self.registry,
),
"task-revoked": Counter(
"celery_task_revoked",
f"{metric_prefix}task_revoked",
"Sent if the task has been revoked.",
["name", "hostname", "queue_name"],
registry=self.registry,
),
"task-retried": Counter(
"celery_task_retried",
f"{metric_prefix}task_retried",
"Sent if the task failed, but will be retried in the future.",
["name", "hostname", "queue_name"],
registry=self.registry,
),
}
self.celery_worker_up = Gauge(
"celery_worker_up",
f"{metric_prefix}worker_up",
"Indicates if a worker has recently sent a heartbeat.",
["hostname"],
registry=self.registry,
)
self.worker_tasks_active = Gauge(
"celery_worker_tasks_active",
f"{metric_prefix}worker_tasks_active",
"The number of tasks the worker is currently processing",
["hostname"],
registry=self.registry,
)
self.celery_task_runtime = Histogram(
"celery_task_runtime",
f"{metric_prefix}task_runtime",
"Histogram of task runtime measurements.",
["name", "hostname", "queue_name"],
registry=self.registry,
buckets=buckets or Histogram.DEFAULT_BUCKETS,
)
self.celery_queue_length = Gauge(
"celery_queue_length",
f"{metric_prefix}queue_length",
"The number of message in broker queue.",
["queue_name"],
registry=self.registry,
)
self.celery_active_consumer_count = Gauge(
"celery_active_consumer_count",
f"{metric_prefix}active_consumer_count",
"The number of active consumer in broker queue.",
["queue_name"],
registry=self.registry,
)
self.celery_active_worker_count = Gauge(
"celery_active_worker_count",
f"{metric_prefix}active_worker_count",
"The number of active workers in broker queue.",
["queue_name"],
registry=self.registry,
)
self.celery_active_process_count = Gauge(
"celery_active_process_count",
f"{metric_prefix}active_process_count",
"The number of active processes in broker queue.",
["queue_name"],
registry=self.registry,
Expand Down

0 comments on commit 7822070

Please sign in to comment.