Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: metric-prefix configuration #275

Merged
merged 1 commit into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ def _comma_seperated_argument(_ctx, _param, value):
"Queues not included in this setting will not appear in metrics until at least one worker has "
"been seen to follow that queue.",
)
@click.option(
"--metric-prefix",
default="celery_",
help="Prefix all metrics with a string. "
"This option replaces the 'celery_*' part with a custom prefix. ",
)
def cli( # pylint: disable=too-many-arguments,too-many-locals
broker_url,
broker_transport_option,
Expand All @@ -123,6 +129,7 @@ def cli( # pylint: disable=too-many-arguments,too-many-locals
purge_offline_worker_metrics,
generic_hostname_task_sent_metric,
queues,
metric_prefix,
): # pylint: disable=unused-argument
formatted_buckets = list(map(float, buckets.split(",")))
ctx = click.get_current_context()
Expand All @@ -132,4 +139,5 @@ def cli( # pylint: disable=too-many-arguments,too-many-locals
purge_offline_worker_metrics,
generic_hostname_task_sent_metric,
queues,
metric_prefix,
).run(ctx.params)
31 changes: 16 additions & 15 deletions src/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __init__(
purge_offline_worker_metrics_seconds=10 * 60,
generic_hostname_task_sent_metric=False,
initial_queues=None,
metric_prefix="celery_",
):
self.registry = CollectorRegistry(auto_describe=True)
self.queue_cache = set(initial_queues or [])
Expand All @@ -39,94 +40,94 @@ def __init__(
self.generic_hostname_task_sent_metric = generic_hostname_task_sent_metric
self.state_counters = {
"task-sent": Counter(
"celery_task_sent",
f"{metric_prefix}task_sent",
"Sent when a task message is published.",
["name", "hostname", "queue_name"],
registry=self.registry,
),
"task-received": Counter(
"celery_task_received",
f"{metric_prefix}task_received",
"Sent when the worker receives a task.",
["name", "hostname", "queue_name"],
registry=self.registry,
),
"task-started": Counter(
"celery_task_started",
f"{metric_prefix}task_started",
"Sent just before the worker executes the task.",
["name", "hostname", "queue_name"],
registry=self.registry,
),
"task-succeeded": Counter(
"celery_task_succeeded",
f"{metric_prefix}task_succeeded",
"Sent if the task executed successfully.",
["name", "hostname", "queue_name"],
registry=self.registry,
),
"task-failed": Counter(
"celery_task_failed",
f"{metric_prefix}task_failed",
"Sent if the execution of the task failed.",
["name", "hostname", "exception", "queue_name"],
registry=self.registry,
),
"task-rejected": Counter(
"celery_task_rejected",
f"{metric_prefix}task_rejected",
# pylint: disable=line-too-long
"The task was rejected by the worker, possibly to be re-queued or moved to a dead letter queue.",
["name", "hostname", "queue_name"],
registry=self.registry,
),
"task-revoked": Counter(
"celery_task_revoked",
f"{metric_prefix}task_revoked",
"Sent if the task has been revoked.",
["name", "hostname", "queue_name"],
registry=self.registry,
),
"task-retried": Counter(
"celery_task_retried",
f"{metric_prefix}task_retried",
"Sent if the task failed, but will be retried in the future.",
["name", "hostname", "queue_name"],
registry=self.registry,
),
}
self.celery_worker_up = Gauge(
"celery_worker_up",
f"{metric_prefix}worker_up",
"Indicates if a worker has recently sent a heartbeat.",
["hostname"],
registry=self.registry,
)
self.worker_tasks_active = Gauge(
"celery_worker_tasks_active",
f"{metric_prefix}worker_tasks_active",
"The number of tasks the worker is currently processing",
["hostname"],
registry=self.registry,
)
self.celery_task_runtime = Histogram(
"celery_task_runtime",
f"{metric_prefix}task_runtime",
"Histogram of task runtime measurements.",
["name", "hostname", "queue_name"],
registry=self.registry,
buckets=buckets or Histogram.DEFAULT_BUCKETS,
)
self.celery_queue_length = Gauge(
"celery_queue_length",
f"{metric_prefix}queue_length",
"The number of message in broker queue.",
["queue_name"],
registry=self.registry,
)
self.celery_active_consumer_count = Gauge(
"celery_active_consumer_count",
f"{metric_prefix}active_consumer_count",
"The number of active consumer in broker queue.",
["queue_name"],
registry=self.registry,
)
self.celery_active_worker_count = Gauge(
"celery_active_worker_count",
f"{metric_prefix}active_worker_count",
"The number of active workers in broker queue.",
["queue_name"],
registry=self.registry,
)
self.celery_active_process_count = Gauge(
"celery_active_process_count",
f"{metric_prefix}active_process_count",
"The number of active processes in broker queue.",
["queue_name"],
registry=self.registry,
Expand Down
Loading