diff --git a/src/cli.py b/src/cli.py index a26788d..c0614b1 100644 --- a/src/cli.py +++ b/src/cli.py @@ -19,6 +19,7 @@ def _comma_seperated_argument(_ctx, _param, value): return value.split(",") return [] + # Accepts value string in format "key=val". Returns dict {key: val}. # * If value is None - returns empty dict def _eq_sign_separated_argument_to_dict(_ctx, _param, value): @@ -30,6 +31,7 @@ def _eq_sign_separated_argument_to_dict(_ctx, _param, value): return dict_of_key_value_pairs return {} + @click.command(help=cmd_help) @click.option( "--broker-url", @@ -159,5 +161,5 @@ def cli( # pylint: disable=too-many-arguments,too-many-positional-arguments,too generic_hostname_task_sent_metric, queues, metric_prefix, - static_label + static_label, ).run(ctx.params) diff --git a/src/exporter.py b/src/exporter.py index 7ca073a..609227a 100644 --- a/src/exporter.py +++ b/src/exporter.py @@ -29,7 +29,7 @@ def __init__( generic_hostname_task_sent_metric=False, initial_queues=None, metric_prefix="celery_", - static_label=[] + static_label=None, ): self.registry = CollectorRegistry(auto_describe=True) self.queue_cache = set(initial_queues or []) @@ -39,10 +39,10 @@ def __init__( purge_offline_worker_metrics_seconds ) self.generic_hostname_task_sent_metric = generic_hostname_task_sent_metric - + # Static labels - self.static_label = static_label - self.static_label_keys = static_label.keys() + self.static_label = static_label or {} + self.static_label_keys = self.static_label.keys() self.state_counters = { "task-sent": Counter( @@ -72,7 +72,13 @@ def __init__( "task-failed": Counter( f"{metric_prefix}task_failed", "Sent if the execution of the task failed.", - ["name", "hostname", "exception", "queue_name", *self.static_label_keys], + [ + "name", + "hostname", + "exception", + "queue_name", + *self.static_label_keys, + ], registry=self.registry, ), "task-rejected": Counter( @@ -150,7 +156,9 @@ def scrape(self): def forget_worker(self, hostname): if hostname in self.worker_last_seen: self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(0) - self.worker_tasks_active.labels(hostname=hostname, **self.static_label).set(0) + self.worker_tasks_active.labels(hostname=hostname, **self.static_label).set( + 0 + ) logger.debug( "Updated gauge='{}' value='{}'", self.worker_tasks_active._name, 0 ) @@ -244,19 +252,21 @@ def track_queue_metrics(self): for queue in self.queue_cache: if transport in ["amqp", "amqps", "memory"]: consumer_count = rabbitmq_queue_consumer_count(connection, queue) - self.celery_active_consumer_count.labels(queue_name=queue, **self.static_label).set( - consumer_count - ) - - self.celery_active_process_count.labels(queue_name=queue, **self.static_label).set( - processes_per_queue[queue] - ) - self.celery_active_worker_count.labels(queue_name=queue, **self.static_label).set( - workers_per_queue[queue] - ) + self.celery_active_consumer_count.labels( + queue_name=queue, **self.static_label + ).set(consumer_count) + + self.celery_active_process_count.labels( + queue_name=queue, **self.static_label + ).set(processes_per_queue[queue]) + self.celery_active_worker_count.labels( + queue_name=queue, **self.static_label + ).set(workers_per_queue[queue]) length = queue_length(transport, connection, queue) if length is not None: - self.celery_queue_length.labels(queue_name=queue, **self.static_label).set(length) + self.celery_queue_length.labels( + queue_name=queue, **self.static_label + ).set(length) def track_task_event(self, event): self.state.event(event) @@ -270,7 +280,7 @@ def track_task_event(self, event): "name": task.name, "hostname": get_hostname(task.hostname), "queue_name": getattr(task, "queue", "celery"), - **self.static_label + **self.static_label, } if event["type"] == "task-sent" and self.generic_hostname_task_sent_metric: labels["hostname"] = "generic" @@ -334,7 +344,9 @@ def track_worker_heartbeat(self, event): active = worker_state.active or 0 up = 1 if worker_state.alive else 0 self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(up) - self.worker_tasks_active.labels(hostname=hostname, **self.static_label).set(active) + self.worker_tasks_active.labels(hostname=hostname, **self.static_label).set( + active + ) logger.debug( "Updated gauge='{}' value='{}'", self.worker_tasks_active._name, active )