Skip to content

Commit

Permalink
Add optional HTTP auth, tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
btimby committed Mar 1, 2024
1 parent 4b70855 commit bdf820f
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 4 deletions.
39 changes: 39 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,34 @@ def exporter_instance(find_free_port, celery_config, log_level):
yield exporter


@pytest.fixture()
def exporter_instance_auth(find_free_port, celery_config, log_level):
cfg = {
"host": "0.0.0.0",
"port": find_free_port(),
"broker_url": celery_config["broker_url"],
"broker_transport_option": ["visibility_timeout=7200"],
"broker_ssl_option": [],
"retry_interval": 5,
"log_level": log_level,
"accept_content": None,
"worker_timeout": 1,
"purge_offline_worker_metrics": 10,
"initial_queues": ["queue_from_command_line"],
"http_username": "angus",
"http_password": "secret",
}
exporter = Exporter(
worker_timeout_seconds=cfg["worker_timeout"],
purge_offline_worker_metrics_seconds=cfg["purge_offline_worker_metrics"],
initial_queues=cfg["initial_queues"],
http_username=cfg["http_username"],
http_password=cfg["http_password"],
)
setattr(exporter, "cfg", cfg)
yield exporter


@pytest.fixture()
def threaded_exporter(exporter_instance):
thread = threading.Thread(
Expand All @@ -114,6 +142,17 @@ def threaded_exporter(exporter_instance):
yield exporter_instance


@pytest.fixture()
def threaded_exporter_auth(exporter_instance_auth):
thread = threading.Thread(
target=exporter_instance_auth.run,
args=(exporter_instance_auth.cfg,),
daemon=True,
)
thread.start()
yield exporter_instance_auth


@pytest.fixture()
def hostname():
return socket.gethostname()
34 changes: 32 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ loguru = "^0.7.2"
redis = "^5.0.1"
Flask = "^3.0.0"
waitress = "^2.1.2"
flask-httpauth = "^4.8.0"
flask-httpauth-stubs = "^0.1.6"

[tool.poetry.group.dev.dependencies]
pytest = "^7.4.4"
Expand Down
10 changes: 10 additions & 0 deletions src/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ def _comma_seperated_argument(_ctx, _param, value):
help="Prefix all metrics with a string. "
"This option replaces the 'celery_*' part with a custom prefix. ",
)
@click.option(
"--http-username", default=None, help="Basic auth username for /metrics endpoint."
)
@click.option(
"--http-password", default=None, help="Basic auth password for /metrics endpoint."
)
def cli( # pylint: disable=too-many-arguments,too-many-locals
broker_url,
broker_transport_option,
Expand All @@ -130,6 +136,8 @@ def cli( # pylint: disable=too-many-arguments,too-many-locals
generic_hostname_task_sent_metric,
queues,
metric_prefix,
http_username,
http_password,
): # pylint: disable=unused-argument
formatted_buckets = list(map(float, buckets.split(",")))
ctx = click.get_current_context()
Expand All @@ -140,4 +148,6 @@ def cli( # pylint: disable=too-many-arguments,too-many-locals
generic_hostname_task_sent_metric,
queues,
metric_prefix,
http_username,
http_password,
).run(ctx.params)
6 changes: 6 additions & 0 deletions src/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ def __init__(
generic_hostname_task_sent_metric=False,
initial_queues=None,
metric_prefix="celery_",
http_username=None,
http_password=None,
):
self.registry = CollectorRegistry(auto_describe=True)
self.queue_cache = set(initial_queues or [])
Expand Down Expand Up @@ -132,6 +134,8 @@ def __init__(
["queue_name"],
registry=self.registry,
)
self.http_username = http_username
self.http_password = http_password

def scrape(self):
if (
Expand Down Expand Up @@ -383,6 +387,8 @@ def run(self, click_params):
click_params["host"],
click_params["port"],
self.scrape,
self.http_username,
self.http_password,
)
while True:
try:
Expand Down
26 changes: 24 additions & 2 deletions src/http_server.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from threading import Thread

import kombu.exceptions
from flask import Blueprint, Flask, current_app, request
from flask import Blueprint, Flask, current_app, request, abort
from flask_httpauth import HTTPBasicAuth
from loguru import logger
from prometheus_client.exposition import choose_encoder
from waitress import serve

blueprint = Blueprint("celery_exporter", __name__)
auth = HTTPBasicAuth()


@blueprint.route("/")
Expand All @@ -28,6 +30,7 @@ def index():


@blueprint.route("/metrics")
@auth.login_required(optional=False)
def metrics():
current_app.config["metrics_puller"]()
encoder, content_type = choose_encoder(request.headers.get("accept"))
Expand All @@ -36,6 +39,7 @@ def metrics():


@blueprint.route("/health")
@auth.login_required(optional=False)
def health():
conn = current_app.config["celery_connection"]
uri = conn.as_uri()
Expand All @@ -51,11 +55,29 @@ def health():
return f"Connected to the broker {conn.as_uri()}"


def start_http_server(registry, celery_connection, host, port, metrics_puller):
# pylint: disable=too-many-arguments
def start_http_server(
registry,
celery_connection,
host,
port,
metrics_puller,
http_username=None,
http_password=None,
):
app = Flask(__name__)
app.config["registry"] = registry
app.config["celery_connection"] = celery_connection
app.config["metrics_puller"] = metrics_puller

@auth.verify_password
def verify_password(username, password):
if not http_username or not http_password:
return "anonymous"
if http_username == username and http_password == password:
return "authenticated"
abort(401)

app.register_blueprint(blueprint)
Thread(
target=serve,
Expand Down
21 changes: 21 additions & 0 deletions src/test_http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,27 @@ def test_health(threaded_exporter):
res.raise_for_status()


@pytest.mark.celery()
def test_health_auth_missing(threaded_exporter_auth):
time.sleep(1)
res = requests.get(
f"http://localhost:{threaded_exporter_auth.cfg['port']}/health", timeout=3
)
assert res.status_code == 401


@pytest.mark.celery()
def test_health_auth_present(threaded_exporter_auth):
time.sleep(1)
username = threaded_exporter_auth.cfg["http_username"]
password = threaded_exporter_auth.cfg["http_password"]
res = requests.get(
f"http://{username}:{password}@localhost:{threaded_exporter_auth.cfg['port']}/health",
timeout=3,
)
res.raise_for_status()


def test_index(threaded_exporter):
time.sleep(1)
res = requests.get(f"http://localhost:{threaded_exporter.cfg['port']}", timeout=3)
Expand Down

0 comments on commit bdf820f

Please sign in to comment.