diff --git a/conftest.py b/conftest.py index b92883a..929fe41 100644 --- a/conftest.py +++ b/conftest.py @@ -105,6 +105,34 @@ def exporter_instance(find_free_port, celery_config, log_level): yield exporter +@pytest.fixture() +def exporter_instance_auth(find_free_port, celery_config, log_level): + cfg = { + "host": "0.0.0.0", + "port": find_free_port(), + "broker_url": celery_config["broker_url"], + "broker_transport_option": ["visibility_timeout=7200"], + "broker_ssl_option": [], + "retry_interval": 5, + "log_level": log_level, + "accept_content": None, + "worker_timeout": 1, + "purge_offline_worker_metrics": 10, + "initial_queues": ["queue_from_command_line"], + "http_username": "angus", + "http_password": "secret", + } + exporter = Exporter( + worker_timeout_seconds=cfg["worker_timeout"], + purge_offline_worker_metrics_seconds=cfg["purge_offline_worker_metrics"], + initial_queues=cfg["initial_queues"], + http_username=cfg["http_username"], + http_password=cfg["http_password"], + ) + setattr(exporter, "cfg", cfg) + yield exporter + + @pytest.fixture() def threaded_exporter(exporter_instance): thread = threading.Thread( @@ -114,6 +142,17 @@ def threaded_exporter(exporter_instance): yield exporter_instance +@pytest.fixture() +def threaded_exporter_auth(exporter_instance_auth): + thread = threading.Thread( + target=exporter_instance_auth.run, + args=(exporter_instance_auth.cfg,), + daemon=True, + ) + thread.start() + yield exporter_instance_auth + + @pytest.fixture() def hostname(): return socket.gethostname() diff --git a/poetry.lock b/poetry.lock index 315b74b..ab26ecf 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.1 and should not be changed by hand. [[package]] name = "altgraph" @@ -536,6 +536,36 @@ Werkzeug = ">=3.0.0" async = ["asgiref (>=3.2)"] dotenv = ["python-dotenv"] +[[package]] +name = "flask-httpauth" +version = "4.8.0" +description = "HTTP authentication for Flask routes" +optional = false +python-versions = "*" +files = [ + {file = "Flask-HTTPAuth-4.8.0.tar.gz", hash = "sha256:66568a05bc73942c65f1e2201ae746295816dc009edd84b482c44c758d75097a"}, + {file = "Flask_HTTPAuth-4.8.0-py3-none-any.whl", hash = "sha256:a58fedd09989b9975448eef04806b096a3964a7feeebc0a78831ff55685b62b0"}, +] + +[package.dependencies] +flask = "*" + +[[package]] +name = "flask-httpauth-stubs" +version = "0.1.6" +description = "Mypy plugin and stubs for Flask-HTTPAuth" +optional = false +python-versions = "*" +files = [ + {file = "Flask-HTTPAuth-stubs-0.1.6.tar.gz", hash = "sha256:c8f6179a58bd3b58747fbc40754c0b35f3b32fd7f4b25ee9ec259fd02e4accda"}, + {file = "Flask_HTTPAuth_stubs-0.1.6-py3-none-any.whl", hash = "sha256:366a4d0b7bd570659ea327e09217595ca96ddec3194425b80256bdadbf145f97"}, +] + +[package.dependencies] +Flask-HTTPAuth = ">=4.0.0" +mypy = ">=0.720" +typing-extensions = ">=3.7.4" + [[package]] name = "identify" version = "2.5.33" @@ -1628,4 +1658,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.9,<3.12" -content-hash = "eb3ef774a620acd86a79ce385e8013b75edec2b51966dae486972182467195a2" +content-hash = "376667745cd7a11d6575f64e279fd6f8c296ea923fb8263079d407730a6c7fd8" diff --git a/pyproject.toml b/pyproject.toml index dd7059d..adc4292 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,8 @@ loguru = "^0.7.2" redis = "^5.0.1" Flask = "^3.0.0" waitress = "^2.1.2" +flask-httpauth = "^4.8.0" +flask-httpauth-stubs = "^0.1.6" [tool.poetry.group.dev.dependencies] pytest = "^7.4.4" diff --git a/src/cli.py b/src/cli.py index 684a448..1cf1d55 100644 --- a/src/cli.py +++ b/src/cli.py @@ -115,6 +115,12 @@ def _comma_seperated_argument(_ctx, _param, value): help="Prefix all metrics with a string. " "This option replaces the 'celery_*' part with a custom prefix. ", ) +@click.option( + "--http-username", default=None, help="Basic auth username for /metrics endpoint." +) +@click.option( + "--http-password", default=None, help="Basic auth password for /metrics endpoint." +) def cli( # pylint: disable=too-many-arguments,too-many-locals broker_url, broker_transport_option, @@ -130,6 +136,8 @@ def cli( # pylint: disable=too-many-arguments,too-many-locals generic_hostname_task_sent_metric, queues, metric_prefix, + http_username, + http_password, ): # pylint: disable=unused-argument formatted_buckets = list(map(float, buckets.split(","))) ctx = click.get_current_context() @@ -140,4 +148,6 @@ def cli( # pylint: disable=too-many-arguments,too-many-locals generic_hostname_task_sent_metric, queues, metric_prefix, + http_username, + http_password, ).run(ctx.params) diff --git a/src/exporter.py b/src/exporter.py index 5a30844..b92e4e9 100644 --- a/src/exporter.py +++ b/src/exporter.py @@ -29,6 +29,8 @@ def __init__( generic_hostname_task_sent_metric=False, initial_queues=None, metric_prefix="celery_", + http_username=None, + http_password=None, ): self.registry = CollectorRegistry(auto_describe=True) self.queue_cache = set(initial_queues or []) @@ -132,6 +134,8 @@ def __init__( ["queue_name"], registry=self.registry, ) + self.http_username = http_username + self.http_password = http_password def scrape(self): if ( @@ -383,6 +387,8 @@ def run(self, click_params): click_params["host"], click_params["port"], self.scrape, + self.http_username, + self.http_password, ) while True: try: diff --git a/src/http_server.py b/src/http_server.py index 19f283b..c42e526 100644 --- a/src/http_server.py +++ b/src/http_server.py @@ -1,12 +1,14 @@ from threading import Thread import kombu.exceptions -from flask import Blueprint, Flask, current_app, request +from flask import Blueprint, Flask, current_app, request, abort +from flask_httpauth import HTTPBasicAuth from loguru import logger from prometheus_client.exposition import choose_encoder from waitress import serve blueprint = Blueprint("celery_exporter", __name__) +auth = HTTPBasicAuth() @blueprint.route("/") @@ -28,6 +30,7 @@ def index(): @blueprint.route("/metrics") +@auth.login_required(optional=False) def metrics(): current_app.config["metrics_puller"]() encoder, content_type = choose_encoder(request.headers.get("accept")) @@ -36,6 +39,7 @@ def metrics(): @blueprint.route("/health") +@auth.login_required(optional=False) def health(): conn = current_app.config["celery_connection"] uri = conn.as_uri() @@ -51,11 +55,29 @@ def health(): return f"Connected to the broker {conn.as_uri()}" -def start_http_server(registry, celery_connection, host, port, metrics_puller): +# pylint: disable=too-many-arguments +def start_http_server( + registry, + celery_connection, + host, + port, + metrics_puller, + http_username=None, + http_password=None, +): app = Flask(__name__) app.config["registry"] = registry app.config["celery_connection"] = celery_connection app.config["metrics_puller"] = metrics_puller + + @auth.verify_password + def verify_password(username, password): + if not http_username or not http_password: + return "anonymous" + if http_username == username and http_password == password: + return "authenticated" + abort(401) + app.register_blueprint(blueprint) Thread( target=serve, diff --git a/src/test_http_server.py b/src/test_http_server.py index 04cfed0..505225a 100644 --- a/src/test_http_server.py +++ b/src/test_http_server.py @@ -14,6 +14,27 @@ def test_health(threaded_exporter): res.raise_for_status() +@pytest.mark.celery() +def test_health_auth_missing(threaded_exporter_auth): + time.sleep(1) + res = requests.get( + f"http://localhost:{threaded_exporter_auth.cfg['port']}/health", timeout=3 + ) + assert res.status_code == 401 + + +@pytest.mark.celery() +def test_health_auth_present(threaded_exporter_auth): + time.sleep(1) + username = threaded_exporter_auth.cfg["http_username"] + password = threaded_exporter_auth.cfg["http_password"] + res = requests.get( + f"http://{username}:{password}@localhost:{threaded_exporter_auth.cfg['port']}/health", + timeout=3, + ) + res.raise_for_status() + + def test_index(threaded_exporter): time.sleep(1) res = requests.get(f"http://localhost:{threaded_exporter.cfg['port']}", timeout=3)