Skip to content

Commit

Permalink
feat: static labels introduced
Browse files Browse the repository at this point in the history
  • Loading branch information
DanArmor authored and danihodovic committed Dec 9, 2024
1 parent a61934b commit 85109b7
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 24 deletions.
20 changes: 20 additions & 0 deletions src/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ def _comma_seperated_argument(_ctx, _param, value):
return value.split(",")
return []

# Accepts value string in format "key=val". Returns dict {key: val}.
# * If value is None - returns empty dict
def _eq_sign_separated_argument_to_dict(_ctx, _param, value):
if value is not None:
dict_of_key_value_pairs = {}
for key_value_pair in value:
key, val = key_value_pair.split("=")
dict_of_key_value_pairs[key] = val
return dict_of_key_value_pairs
return {}

@click.command(help=cmd_help)
@click.option(
Expand Down Expand Up @@ -115,6 +125,14 @@ def _comma_seperated_argument(_ctx, _param, value):
help="Prefix all metrics with a string. "
"This option replaces the 'celery_*' part with a custom prefix. ",
)
@click.option(
"--static-label",
required=False,
default=None,
multiple=True,
callback=_eq_sign_separated_argument_to_dict,
help="Add label with static value to all metrics",
)
def cli( # pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals
broker_url,
broker_transport_option,
Expand All @@ -130,6 +148,7 @@ def cli( # pylint: disable=too-many-arguments,too-many-positional-arguments,too
generic_hostname_task_sent_metric,
queues,
metric_prefix,
static_label,
): # pylint: disable=unused-argument
formatted_buckets = list(map(float, buckets.split(",")))
ctx = click.get_current_context()
Expand All @@ -140,4 +159,5 @@ def cli( # pylint: disable=too-many-arguments,too-many-positional-arguments,too
generic_hostname_task_sent_metric,
queues,
metric_prefix,
static_label
).run(ctx.params)
55 changes: 31 additions & 24 deletions src/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
generic_hostname_task_sent_metric=False,
initial_queues=None,
metric_prefix="celery_",
static_label=[]
):
self.registry = CollectorRegistry(auto_describe=True)
self.queue_cache = set(initial_queues or [])
Expand All @@ -38,98 +39,103 @@ def __init__(
purge_offline_worker_metrics_seconds
)
self.generic_hostname_task_sent_metric = generic_hostname_task_sent_metric

# Static labels
self.static_label = static_label
self.static_label_keys = static_label.keys()

self.state_counters = {
"task-sent": Counter(
f"{metric_prefix}task_sent",
"Sent when a task message is published.",
["name", "hostname", "queue_name"],
["name", "hostname", "queue_name", *self.static_label_keys],
registry=self.registry,
),
"task-received": Counter(
f"{metric_prefix}task_received",
"Sent when the worker receives a task.",
["name", "hostname", "queue_name"],
["name", "hostname", "queue_name", *self.static_label_keys],
registry=self.registry,
),
"task-started": Counter(
f"{metric_prefix}task_started",
"Sent just before the worker executes the task.",
["name", "hostname", "queue_name"],
["name", "hostname", "queue_name", *self.static_label_keys],
registry=self.registry,
),
"task-succeeded": Counter(
f"{metric_prefix}task_succeeded",
"Sent if the task executed successfully.",
["name", "hostname", "queue_name"],
["name", "hostname", "queue_name", *self.static_label_keys],
registry=self.registry,
),
"task-failed": Counter(
f"{metric_prefix}task_failed",
"Sent if the execution of the task failed.",
["name", "hostname", "exception", "queue_name"],
["name", "hostname", "exception", "queue_name", *self.static_label_keys],
registry=self.registry,
),
"task-rejected": Counter(
f"{metric_prefix}task_rejected",
# pylint: disable=line-too-long
"The task was rejected by the worker, possibly to be re-queued or moved to a dead letter queue.",
["name", "hostname", "queue_name"],
["name", "hostname", "queue_name", *self.static_label_keys],
registry=self.registry,
),
"task-revoked": Counter(
f"{metric_prefix}task_revoked",
"Sent if the task has been revoked.",
["name", "hostname", "queue_name"],
["name", "hostname", "queue_name", *self.static_label_keys],
registry=self.registry,
),
"task-retried": Counter(
f"{metric_prefix}task_retried",
"Sent if the task failed, but will be retried in the future.",
["name", "hostname", "queue_name"],
["name", "hostname", "queue_name", *self.static_label_keys],
registry=self.registry,
),
}
self.celery_worker_up = Gauge(
f"{metric_prefix}worker_up",
"Indicates if a worker has recently sent a heartbeat.",
["hostname"],
["hostname", *self.static_label_keys],
registry=self.registry,
)
self.worker_tasks_active = Gauge(
f"{metric_prefix}worker_tasks_active",
"The number of tasks the worker is currently processing",
["hostname"],
["hostname", *self.static_label_keys],
registry=self.registry,
)
self.celery_task_runtime = Histogram(
f"{metric_prefix}task_runtime",
"Histogram of task runtime measurements.",
["name", "hostname", "queue_name"],
["name", "hostname", "queue_name", *self.static_label_keys],
registry=self.registry,
buckets=buckets or Histogram.DEFAULT_BUCKETS,
)
self.celery_queue_length = Gauge(
f"{metric_prefix}queue_length",
"The number of message in broker queue.",
["queue_name"],
["queue_name", *self.static_label_keys],
registry=self.registry,
)
self.celery_active_consumer_count = Gauge(
f"{metric_prefix}active_consumer_count",
"The number of active consumer in broker queue.",
["queue_name"],
["queue_name", *self.static_label_keys],
registry=self.registry,
)
self.celery_active_worker_count = Gauge(
f"{metric_prefix}active_worker_count",
"The number of active workers in broker queue.",
["queue_name"],
["queue_name", *self.static_label_keys],
registry=self.registry,
)
self.celery_active_process_count = Gauge(
f"{metric_prefix}active_process_count",
"The number of active processes in broker queue.",
["queue_name"],
["queue_name", *self.static_label_keys],
registry=self.registry,
)

Expand All @@ -143,8 +149,8 @@ def scrape(self):

def forget_worker(self, hostname):
if hostname in self.worker_last_seen:
self.celery_worker_up.labels(hostname=hostname).set(0)
self.worker_tasks_active.labels(hostname=hostname).set(0)
self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(0)
self.worker_tasks_active.labels(hostname=hostname, **self.static_label).set(0)
logger.debug(
"Updated gauge='{}' value='{}'", self.worker_tasks_active._name, 0
)
Expand Down Expand Up @@ -238,19 +244,19 @@ def track_queue_metrics(self):
for queue in self.queue_cache:
if transport in ["amqp", "amqps", "memory"]:
consumer_count = rabbitmq_queue_consumer_count(connection, queue)
self.celery_active_consumer_count.labels(queue_name=queue).set(
self.celery_active_consumer_count.labels(queue_name=queue, **self.static_label).set(
consumer_count
)

self.celery_active_process_count.labels(queue_name=queue).set(
self.celery_active_process_count.labels(queue_name=queue, **self.static_label).set(
processes_per_queue[queue]
)
self.celery_active_worker_count.labels(queue_name=queue).set(
self.celery_active_worker_count.labels(queue_name=queue, **self.static_label).set(
workers_per_queue[queue]
)
length = queue_length(transport, connection, queue)
if length is not None:
self.celery_queue_length.labels(queue_name=queue).set(length)
self.celery_queue_length.labels(queue_name=queue, **self.static_label).set(length)

def track_task_event(self, event):
self.state.event(event)
Expand All @@ -264,6 +270,7 @@ def track_task_event(self, event):
"name": task.name,
"hostname": get_hostname(task.hostname),
"queue_name": getattr(task, "queue", "celery"),
**self.static_label
}
if event["type"] == "task-sent" and self.generic_hostname_task_sent_metric:
labels["hostname"] = "generic"
Expand Down Expand Up @@ -303,7 +310,7 @@ def track_worker_status(self, event, is_online):
event_name = "worker-online" if is_online else "worker-offline"
hostname = get_hostname(event["hostname"])
logger.debug("Received event='{}' for hostname='{}'", event_name, hostname)
self.celery_worker_up.labels(hostname=hostname).set(value)
self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(value)

if is_online:
self.worker_last_seen[hostname] = {
Expand All @@ -326,8 +333,8 @@ def track_worker_heartbeat(self, event):
worker_state = self.state.event(event)[0][0]
active = worker_state.active or 0
up = 1 if worker_state.alive else 0
self.celery_worker_up.labels(hostname=hostname).set(up)
self.worker_tasks_active.labels(hostname=hostname).set(active)
self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(up)
self.worker_tasks_active.labels(hostname=hostname, **self.static_label).set(active)
logger.debug(
"Updated gauge='{}' value='{}'", self.worker_tasks_active._name, active
)
Expand Down

0 comments on commit 85109b7

Please sign in to comment.