diff --git a/conftest.py b/conftest.py index b92883a..f8dd7a4 100644 --- a/conftest.py +++ b/conftest.py @@ -1,5 +1,6 @@ import socket import threading +import copy import pytest @@ -81,8 +82,9 @@ def _find_free_port(): return _find_free_port -@pytest.fixture() -def exporter_instance(find_free_port, celery_config, log_level): +# Configurations for exporters +@pytest.fixture(scope="session") +def exporter_cfg_defaults(find_free_port, celery_config, log_level): cfg = { "host": "0.0.0.0", "port": find_free_port(), @@ -96,12 +98,21 @@ def exporter_instance(find_free_port, celery_config, log_level): "purge_offline_worker_metrics": 10, "initial_queues": ["queue_from_command_line"], } + yield cfg + + +@pytest.fixture() +def exporter_instance(exporter_cfg_defaults, find_free_port): + exporter_cfg = copy.deepcopy(exporter_cfg_defaults) + exporter_cfg["port"] = find_free_port() exporter = Exporter( - worker_timeout_seconds=cfg["worker_timeout"], - purge_offline_worker_metrics_seconds=cfg["purge_offline_worker_metrics"], - initial_queues=cfg["initial_queues"], + worker_timeout_seconds=exporter_cfg["worker_timeout"], + purge_offline_worker_metrics_seconds=exporter_cfg[ + "purge_offline_worker_metrics" + ], + initial_queues=exporter_cfg["initial_queues"], ) - setattr(exporter, "cfg", cfg) + setattr(exporter, "cfg", exporter_cfg) yield exporter @@ -114,6 +125,38 @@ def threaded_exporter(exporter_instance): yield exporter_instance +# Fixtures for same exporter, but with static labels +@pytest.fixture +def exporter_instance_static_labels(exporter_cfg_defaults, find_free_port): + exporter_cfg = copy.deepcopy(exporter_cfg_defaults) + exporter_cfg["port"] = find_free_port() + exporter_cfg["static_label"] = { + "test_label_1": "test_value", + "test_label_2_long_named": "test_value_2_long_named", + } + exporter = Exporter( + worker_timeout_seconds=exporter_cfg["worker_timeout"], + purge_offline_worker_metrics_seconds=exporter_cfg[ + "purge_offline_worker_metrics" + ], + initial_queues=exporter_cfg["initial_queues"], + static_label=exporter_cfg["static_label"], + ) + setattr(exporter, "cfg", exporter_cfg) + yield exporter + + +@pytest.fixture() +def threaded_exporter_static_labels(exporter_instance_static_labels): + thread = threading.Thread( + target=exporter_instance_static_labels.run, + args=(exporter_instance_static_labels.cfg,), + daemon=True, + ) + thread.start() + yield exporter_instance_static_labels + + @pytest.fixture() def hostname(): return socket.gethostname() diff --git a/src/test_cli.py b/src/test_cli.py index a4f6930..2544207 100644 --- a/src/test_cli.py +++ b/src/test_cli.py @@ -115,3 +115,176 @@ def fail(): assert 'celery_active_consumer_count{queue_name="celery"} 0.0' in res.text assert 'celery_active_worker_count{queue_name="celery"} 0.0' in res.text assert 'celery_active_process_count{queue_name="celery"} 0.0' in res.text + + +# pylint: disable=too-many-statements +@pytest.mark.celery() +def test_integration_static_labels( + broker, celery_app, threaded_exporter_static_labels, hostname +): + exporter_url = ( + f"http://localhost:{threaded_exporter_static_labels.cfg['port']}/metrics" + ) + # Substring representing static labels in metrics labels + static_labels_str = ",".join( + [ + f'{k}="{v}"' + for k, v in sorted( + threaded_exporter_static_labels.cfg["static_label"].items() + ) + ] + ) + + @celery_app.task + def succeed(): + pass + + @celery_app.task + def fail(): + raise HTTPError("Intentional error") + + time.sleep(1) + # Before the first worker starts, make sure queues that the exporter is initialized + # with are available anyway. Queues to be detected from workers should not be there yet + res = requests.get(exporter_url, timeout=5) + assert res.status_code == 200 + assert ( + f'celery_queue_length{{queue_name="queue_from_command_line",{static_labels_str}}} 0.0' + in res.text + ) + assert ( + # pylint: disable=line-too-long + f'celery_active_worker_count{{queue_name="queue_from_command_line",{static_labels_str}}} 0.0' + in res.text + ) + assert ( + # pylint: disable=line-too-long + f'celery_active_process_count{{queue_name="queue_from_command_line",{static_labels_str}}} 0.0' + in res.text + ) + assert ( + f'celery_queue_length{{queue_name="celery",{static_labels_str}}}' + not in res.text + ) + assert ( + f'celery_active_worker_count{{queue_name="celery",{static_labels_str}}}' + not in res.text + ) + assert ( + f'celery_active_process_count{{queue_name="celery",{static_labels_str}}}' + not in res.text + ) + + # start worker first so the exporter can fetch and cache queue information + with start_worker(celery_app, without_heartbeat=False): + time.sleep(5) + res = requests.get(exporter_url, timeout=5) + assert res.status_code == 200 + assert ( + f'celery_queue_length{{queue_name="celery",{static_labels_str}}} 0.0' + in res.text + ), res.text + + # TODO: Fix this... + if broker == "memory": + assert ( + f'celery_active_consumer_count{{queue_name="celery",{static_labels_str}}} 0.0' + in res.text + ), res.text + assert ( + f'celery_active_worker_count{{queue_name="celery",{static_labels_str}}} 1.0' + in res.text + ) + assert ( + f'celery_active_process_count{{queue_name="celery",{static_labels_str}}} 1.0' + in res.text + ) + + succeed.apply_async() + succeed.apply_async() + fail.apply_async() + + # assert celery_queue_length when message in broker but no worker start + res = requests.get(exporter_url, timeout=3) + assert res.status_code == 200 + assert ( + f'celery_queue_length{{queue_name="celery",{static_labels_str}}} 3.0' + in res.text + ) + + if broker == "memory": + assert ( + f'celery_active_consumer_count{{queue_name="celery",{static_labels_str}}} 0.0' + in res.text + ) + assert ( + f'celery_active_worker_count{{queue_name="celery",{static_labels_str}}} 0.0' + in res.text + ) + assert ( + f'celery_active_process_count{{queue_name="celery",{static_labels_str}}} 0.0' + in res.text + ) + + # start worker and consume message in broker + with start_worker(celery_app, without_heartbeat=False): + time.sleep(2) + + res = requests.get(exporter_url, timeout=3) + assert res.status_code == 200 + # pylint: disable=line-too-long + assert ( + f'celery_task_sent_total{{hostname="{hostname}",name="src.test_cli.succeed",queue_name="celery",{static_labels_str}}} 2.0' + in res.text + ) + assert ( + f'celery_task_sent_total{{hostname="{hostname}",name="src.test_cli.fail",queue_name="celery",{static_labels_str}}} 1.0' + in res.text + ) + assert ( + f'celery_task_received_total{{hostname="{hostname}",name="src.test_cli.succeed",queue_name="celery",{static_labels_str}}} 2.0' + in res.text + ) + assert ( + f'celery_task_received_total{{hostname="{hostname}",name="src.test_cli.fail",queue_name="celery",{static_labels_str}}} 1.0' + in res.text + ) + assert ( + f'celery_task_started_total{{hostname="{hostname}",name="src.test_cli.succeed",queue_name="celery",{static_labels_str}}} 2.0' + in res.text + ) + assert ( + f'celery_task_started_total{{hostname="{hostname}",name="src.test_cli.fail",queue_name="celery",{static_labels_str}}} 1.0' + in res.text + ) + assert ( + f'celery_task_succeeded_total{{hostname="{hostname}",name="src.test_cli.succeed",queue_name="celery",{static_labels_str}}} 2.0' + in res.text + ) + assert ( + f'celery_task_failed_total{{exception="HTTPError",hostname="{hostname}",name="src.test_cli.fail",queue_name="celery",{static_labels_str}}} 1.0' + in res.text + ) + assert ( + f'celery_task_runtime_count{{hostname="{hostname}",name="src.test_cli.succeed",queue_name="celery",{static_labels_str}}} 2.0' + in res.text + ) + assert ( + f'celery_queue_length{{queue_name="celery",{static_labels_str}}} 0.0' + in res.text + ) + + # TODO: Fix this... + if broker == "memory": + assert ( + f'celery_active_consumer_count{{queue_name="celery",{static_labels_str}}} 0.0' + in res.text + ) + assert ( + f'celery_active_worker_count{{queue_name="celery",{static_labels_str}}} 0.0' + in res.text + ) + assert ( + f'celery_active_process_count{{queue_name="celery",{static_labels_str}}} 0.0' + in res.text + )