Skip to content

Commit

Permalink
fix: fix for default value
Browse files Browse the repository at this point in the history
  • Loading branch information
DanArmor authored and danihodovic committed Dec 9, 2024
1 parent 85109b7 commit 8b978b9
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 20 deletions.
4 changes: 3 additions & 1 deletion src/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def _comma_seperated_argument(_ctx, _param, value):
return value.split(",")
return []


# Accepts value string in format "key=val". Returns dict {key: val}.
# * If value is None - returns empty dict
def _eq_sign_separated_argument_to_dict(_ctx, _param, value):
Expand All @@ -30,6 +31,7 @@ def _eq_sign_separated_argument_to_dict(_ctx, _param, value):
return dict_of_key_value_pairs
return {}


@click.command(help=cmd_help)
@click.option(
"--broker-url",
Expand Down Expand Up @@ -159,5 +161,5 @@ def cli( # pylint: disable=too-many-arguments,too-many-positional-arguments,too
generic_hostname_task_sent_metric,
queues,
metric_prefix,
static_label
static_label,
).run(ctx.params)
50 changes: 31 additions & 19 deletions src/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(
generic_hostname_task_sent_metric=False,
initial_queues=None,
metric_prefix="celery_",
static_label=[]
static_label=None,
):
self.registry = CollectorRegistry(auto_describe=True)
self.queue_cache = set(initial_queues or [])
Expand All @@ -39,10 +39,10 @@ def __init__(
purge_offline_worker_metrics_seconds
)
self.generic_hostname_task_sent_metric = generic_hostname_task_sent_metric

# Static labels
self.static_label = static_label
self.static_label_keys = static_label.keys()
self.static_label = static_label or {}
self.static_label_keys = self.static_label.keys()

self.state_counters = {
"task-sent": Counter(
Expand Down Expand Up @@ -72,7 +72,13 @@ def __init__(
"task-failed": Counter(
f"{metric_prefix}task_failed",
"Sent if the execution of the task failed.",
["name", "hostname", "exception", "queue_name", *self.static_label_keys],
[
"name",
"hostname",
"exception",
"queue_name",
*self.static_label_keys,
],
registry=self.registry,
),
"task-rejected": Counter(
Expand Down Expand Up @@ -150,7 +156,9 @@ def scrape(self):
def forget_worker(self, hostname):
if hostname in self.worker_last_seen:
self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(0)
self.worker_tasks_active.labels(hostname=hostname, **self.static_label).set(0)
self.worker_tasks_active.labels(hostname=hostname, **self.static_label).set(
0
)
logger.debug(
"Updated gauge='{}' value='{}'", self.worker_tasks_active._name, 0
)
Expand Down Expand Up @@ -244,19 +252,21 @@ def track_queue_metrics(self):
for queue in self.queue_cache:
if transport in ["amqp", "amqps", "memory"]:
consumer_count = rabbitmq_queue_consumer_count(connection, queue)
self.celery_active_consumer_count.labels(queue_name=queue, **self.static_label).set(
consumer_count
)

self.celery_active_process_count.labels(queue_name=queue, **self.static_label).set(
processes_per_queue[queue]
)
self.celery_active_worker_count.labels(queue_name=queue, **self.static_label).set(
workers_per_queue[queue]
)
self.celery_active_consumer_count.labels(
queue_name=queue, **self.static_label
).set(consumer_count)

self.celery_active_process_count.labels(
queue_name=queue, **self.static_label
).set(processes_per_queue[queue])
self.celery_active_worker_count.labels(
queue_name=queue, **self.static_label
).set(workers_per_queue[queue])
length = queue_length(transport, connection, queue)
if length is not None:
self.celery_queue_length.labels(queue_name=queue, **self.static_label).set(length)
self.celery_queue_length.labels(
queue_name=queue, **self.static_label
).set(length)

def track_task_event(self, event):
self.state.event(event)
Expand All @@ -270,7 +280,7 @@ def track_task_event(self, event):
"name": task.name,
"hostname": get_hostname(task.hostname),
"queue_name": getattr(task, "queue", "celery"),
**self.static_label
**self.static_label,
}
if event["type"] == "task-sent" and self.generic_hostname_task_sent_metric:
labels["hostname"] = "generic"
Expand Down Expand Up @@ -334,7 +344,9 @@ def track_worker_heartbeat(self, event):
active = worker_state.active or 0
up = 1 if worker_state.alive else 0
self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(up)
self.worker_tasks_active.labels(hostname=hostname, **self.static_label).set(active)
self.worker_tasks_active.labels(hostname=hostname, **self.static_label).set(
active
)
logger.debug(
"Updated gauge='{}' value='{}'", self.worker_tasks_active._name, active
)
Expand Down

0 comments on commit 8b978b9

Please sign in to comment.