Skip to content

Commit

Permalink
test: added test for static labels feat
Browse files Browse the repository at this point in the history
  • Loading branch information
DanArmor authored and danihodovic committed Dec 9, 2024
1 parent 8b978b9 commit 5e52531
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 6 deletions.
55 changes: 49 additions & 6 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import socket
import threading
import copy

import pytest

Expand Down Expand Up @@ -81,8 +82,9 @@ def _find_free_port():
return _find_free_port


@pytest.fixture()
def exporter_instance(find_free_port, celery_config, log_level):
# Configurations for exporters
@pytest.fixture(scope="session")
def exporter_cfg_defaults(find_free_port, celery_config, log_level):
cfg = {
"host": "0.0.0.0",
"port": find_free_port(),
Expand All @@ -96,12 +98,21 @@ def exporter_instance(find_free_port, celery_config, log_level):
"purge_offline_worker_metrics": 10,
"initial_queues": ["queue_from_command_line"],
}
yield cfg


@pytest.fixture()
def exporter_instance(exporter_cfg_defaults, find_free_port):
exporter_cfg = copy.deepcopy(exporter_cfg_defaults)
exporter_cfg["port"] = find_free_port()
exporter = Exporter(
worker_timeout_seconds=cfg["worker_timeout"],
purge_offline_worker_metrics_seconds=cfg["purge_offline_worker_metrics"],
initial_queues=cfg["initial_queues"],
worker_timeout_seconds=exporter_cfg["worker_timeout"],
purge_offline_worker_metrics_seconds=exporter_cfg[
"purge_offline_worker_metrics"
],
initial_queues=exporter_cfg["initial_queues"],
)
setattr(exporter, "cfg", cfg)
setattr(exporter, "cfg", exporter_cfg)
yield exporter


Expand All @@ -114,6 +125,38 @@ def threaded_exporter(exporter_instance):
yield exporter_instance


# Fixtures for same exporter, but with static labels
@pytest.fixture
def exporter_instance_static_labels(exporter_cfg_defaults, find_free_port):
exporter_cfg = copy.deepcopy(exporter_cfg_defaults)
exporter_cfg["port"] = find_free_port()
exporter_cfg["static_label"] = {
"test_label_1": "test_value",
"test_label_2_long_named": "test_value_2_long_named",
}
exporter = Exporter(
worker_timeout_seconds=exporter_cfg["worker_timeout"],
purge_offline_worker_metrics_seconds=exporter_cfg[
"purge_offline_worker_metrics"
],
initial_queues=exporter_cfg["initial_queues"],
static_label=exporter_cfg["static_label"],
)
setattr(exporter, "cfg", exporter_cfg)
yield exporter


@pytest.fixture()
def threaded_exporter_static_labels(exporter_instance_static_labels):
thread = threading.Thread(
target=exporter_instance_static_labels.run,
args=(exporter_instance_static_labels.cfg,),
daemon=True,
)
thread.start()
yield exporter_instance_static_labels


@pytest.fixture()
def hostname():
return socket.gethostname()
173 changes: 173 additions & 0 deletions src/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,176 @@ def fail():
assert 'celery_active_consumer_count{queue_name="celery"} 0.0' in res.text
assert 'celery_active_worker_count{queue_name="celery"} 0.0' in res.text
assert 'celery_active_process_count{queue_name="celery"} 0.0' in res.text


# pylint: disable=too-many-statements
@pytest.mark.celery()
def test_integration_static_labels(
broker, celery_app, threaded_exporter_static_labels, hostname
):
exporter_url = (
f"http://localhost:{threaded_exporter_static_labels.cfg['port']}/metrics"
)
# Substring representing static labels in metrics labels
static_labels_str = ",".join(
[
f'{k}="{v}"'
for k, v in sorted(
threaded_exporter_static_labels.cfg["static_label"].items()
)
]
)

@celery_app.task
def succeed():
pass

@celery_app.task
def fail():
raise HTTPError("Intentional error")

time.sleep(1)
# Before the first worker starts, make sure queues that the exporter is initialized
# with are available anyway. Queues to be detected from workers should not be there yet
res = requests.get(exporter_url, timeout=5)
assert res.status_code == 200
assert (
f'celery_queue_length{{queue_name="queue_from_command_line",{static_labels_str}}} 0.0'
in res.text
)
assert (
# pylint: disable=line-too-long
f'celery_active_worker_count{{queue_name="queue_from_command_line",{static_labels_str}}} 0.0'
in res.text
)
assert (
# pylint: disable=line-too-long
f'celery_active_process_count{{queue_name="queue_from_command_line",{static_labels_str}}} 0.0'
in res.text
)
assert (
f'celery_queue_length{{queue_name="celery",{static_labels_str}}}'
not in res.text
)
assert (
f'celery_active_worker_count{{queue_name="celery",{static_labels_str}}}'
not in res.text
)
assert (
f'celery_active_process_count{{queue_name="celery",{static_labels_str}}}'
not in res.text
)

# start worker first so the exporter can fetch and cache queue information
with start_worker(celery_app, without_heartbeat=False):
time.sleep(5)
res = requests.get(exporter_url, timeout=5)
assert res.status_code == 200
assert (
f'celery_queue_length{{queue_name="celery",{static_labels_str}}} 0.0'
in res.text
), res.text

# TODO: Fix this...
if broker == "memory":
assert (
f'celery_active_consumer_count{{queue_name="celery",{static_labels_str}}} 0.0'
in res.text
), res.text
assert (
f'celery_active_worker_count{{queue_name="celery",{static_labels_str}}} 1.0'
in res.text
)
assert (
f'celery_active_process_count{{queue_name="celery",{static_labels_str}}} 1.0'
in res.text
)

succeed.apply_async()
succeed.apply_async()
fail.apply_async()

# assert celery_queue_length when message in broker but no worker start
res = requests.get(exporter_url, timeout=3)
assert res.status_code == 200
assert (
f'celery_queue_length{{queue_name="celery",{static_labels_str}}} 3.0'
in res.text
)

if broker == "memory":
assert (
f'celery_active_consumer_count{{queue_name="celery",{static_labels_str}}} 0.0'
in res.text
)
assert (
f'celery_active_worker_count{{queue_name="celery",{static_labels_str}}} 0.0'
in res.text
)
assert (
f'celery_active_process_count{{queue_name="celery",{static_labels_str}}} 0.0'
in res.text
)

# start worker and consume message in broker
with start_worker(celery_app, without_heartbeat=False):
time.sleep(2)

res = requests.get(exporter_url, timeout=3)
assert res.status_code == 200
# pylint: disable=line-too-long
assert (
f'celery_task_sent_total{{hostname="{hostname}",name="src.test_cli.succeed",queue_name="celery",{static_labels_str}}} 2.0'
in res.text
)
assert (
f'celery_task_sent_total{{hostname="{hostname}",name="src.test_cli.fail",queue_name="celery",{static_labels_str}}} 1.0'
in res.text
)
assert (
f'celery_task_received_total{{hostname="{hostname}",name="src.test_cli.succeed",queue_name="celery",{static_labels_str}}} 2.0'
in res.text
)
assert (
f'celery_task_received_total{{hostname="{hostname}",name="src.test_cli.fail",queue_name="celery",{static_labels_str}}} 1.0'
in res.text
)
assert (
f'celery_task_started_total{{hostname="{hostname}",name="src.test_cli.succeed",queue_name="celery",{static_labels_str}}} 2.0'
in res.text
)
assert (
f'celery_task_started_total{{hostname="{hostname}",name="src.test_cli.fail",queue_name="celery",{static_labels_str}}} 1.0'
in res.text
)
assert (
f'celery_task_succeeded_total{{hostname="{hostname}",name="src.test_cli.succeed",queue_name="celery",{static_labels_str}}} 2.0'
in res.text
)
assert (
f'celery_task_failed_total{{exception="HTTPError",hostname="{hostname}",name="src.test_cli.fail",queue_name="celery",{static_labels_str}}} 1.0'
in res.text
)
assert (
f'celery_task_runtime_count{{hostname="{hostname}",name="src.test_cli.succeed",queue_name="celery",{static_labels_str}}} 2.0'
in res.text
)
assert (
f'celery_queue_length{{queue_name="celery",{static_labels_str}}} 0.0'
in res.text
)

# TODO: Fix this...
if broker == "memory":
assert (
f'celery_active_consumer_count{{queue_name="celery",{static_labels_str}}} 0.0'
in res.text
)
assert (
f'celery_active_worker_count{{queue_name="celery",{static_labels_str}}} 0.0'
in res.text
)
assert (
f'celery_active_process_count{{queue_name="celery",{static_labels_str}}} 0.0'
in res.text
)

0 comments on commit 5e52531

Please sign in to comment.