From c32d17f9ea4b75902277b9d6922d38a150bee91e Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Fri, 22 Sep 2023 14:04:48 +0200 Subject: [PATCH] [feat] Add prometheus timed function --- .github/workflows/build_and_publish.yml | 2 +- Makefile | 2 +- fixcloudutils/asyncio/timed.py | 59 +++++++++++++++++++++++++ fixcloudutils/redis/pub_sub.py | 8 +++- pyproject.toml | 4 +- tests/timed_test.py | 27 +++++++++++ tox.ini | 2 +- 7 files changed, 99 insertions(+), 5 deletions(-) create mode 100644 fixcloudutils/asyncio/timed.py create mode 100644 tests/timed_test.py diff --git a/.github/workflows/build_and_publish.yml b/.github/workflows/build_and_publish.yml index 800e11e..42ca06b 100644 --- a/.github/workflows/build_and_publish.yml +++ b/.github/workflows/build_and_publish.yml @@ -35,7 +35,7 @@ jobs: - name: Install Dependencies run: | python -m pip install --upgrade pip - pip install -e ".[test, arangodb, redis]" + pip install -e ".[test, arangodb, redis, prometheus]" - name: Run tests run: tox diff --git a/Makefile b/Makefile index 2fa6756..eab2cf2 100644 --- a/Makefile +++ b/Makefile @@ -76,7 +76,7 @@ coverage: ## check code coverage quickly with the default Python venv: python3 -m venv venv --prompt "fixcloudutils" . ./venv/bin/activate && python3 -m pip install --upgrade pip - . ./venv/bin/activate && pip install -e ".[test, arango, redis]" + . ./venv/bin/activate && pip install -e ".[test, arango, redis, prometheus]" . ./venv/bin/activate && mypy --install-types --non-interactive fixcloudutils tests setup: clean clean-env venv diff --git a/fixcloudutils/asyncio/timed.py b/fixcloudutils/asyncio/timed.py new file mode 100644 index 0000000..e8aaf6d --- /dev/null +++ b/fixcloudutils/asyncio/timed.py @@ -0,0 +1,59 @@ +import logging +import time +from functools import wraps +from typing import Callable, Any, TypeVar, cast + +from prometheus_client import Histogram + +MethodDuration = Histogram("method_call_duration", "Duration of single method call", ["module", "name"]) + +# Create a type that is bound to the underlying wrapped function +# This way all signature information is preserved! +DecoratedFn = TypeVar("DecoratedFn", bound=Callable[..., Any]) + +log = logging.getLogger(__name__) + + +def perf_now() -> float: + return time.perf_counter() + + +def timed(module: str, name: str, is_async: bool = True) -> Callable[[DecoratedFn], DecoratedFn]: + """ + Use this annotation on a method and measure the duration of the call. + :param module: the name of the component. + :param name: the name of the method to be measured. + :param is_async: set to false if the underlying method is not async + :return: the wrapped function + """ + metric = MethodDuration.labels(module=module, name=name) + + def sync_time_wrapper(fn: DecoratedFn) -> DecoratedFn: + @wraps(fn) + def async_time_decorated(*args: Any, **kwargs: Any) -> Any: + start_time = perf_now() + try: + rv = fn(*args, **kwargs) + return rv + finally: + duration = perf_now() - start_time + log.debug(f"Duration of {module}::{name}: {duration}") + metric.observe(duration) + + return cast(DecoratedFn, async_time_decorated) + + def async_time_wrapper(fn: DecoratedFn) -> DecoratedFn: + @wraps(fn) + async def async_time_decorated(*args: Any, **kwargs: Any) -> Any: + start_time = perf_now() + try: + rv = await fn(*args, **kwargs) + return rv + finally: + duration = round((perf_now() - start_time) * 1000) + log.debug(f"Duration of {module}::{name}: {duration} millis") + metric.observe(duration) + + return cast(DecoratedFn, async_time_decorated) + + return async_time_wrapper if is_async else sync_time_wrapper diff --git a/fixcloudutils/redis/pub_sub.py b/fixcloudutils/redis/pub_sub.py index 25cc203..b7544e7 100644 --- a/fixcloudutils/redis/pub_sub.py +++ b/fixcloudutils/redis/pub_sub.py @@ -17,6 +17,7 @@ import asyncio import json import logging +import re import uuid from asyncio import Task from datetime import datetime @@ -33,6 +34,7 @@ # id, at, publisher, kind, data MessageHandler = Callable[[str, datetime, str, str, Json], Awaitable[Any]] log = logging.getLogger("fixcloudutils.redis.pub_sub") +redis_wildcard = re.compile(r"(? None: log.exception(f"Error handling message {msg}: {ex}. Ignore.") ps = self.redis.pubsub() - await ps.psubscribe(self.channel) + # If the channel name contains wildcards, we need to use psubscribe + if redis_wildcard.search(self.channel) is not None: + await ps.psubscribe(self.channel) + else: + await ps.subscribe(self.channel) self.reader = asyncio.create_task(read_messages(ps)) self.pubsub = ps diff --git a/pyproject.toml b/pyproject.toml index b3b7ac1..a7d8183 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "fixcloudutils" -version = "1.5.0" +version = "1.6.0" authors = [{ name = "Some Engineering Inc." }] description = "Utilities for fixcloud." license = { file = "LICENSE" } @@ -14,6 +14,8 @@ dependencies = ["attrs", "cattrs"] [project.optional-dependencies] redis = ["redis"] arango = ["python-arango"] +prometheus = ["prometheus-client"] + test = [ "black", "coverage", diff --git a/tests/timed_test.py b/tests/timed_test.py new file mode 100644 index 0000000..004c693 --- /dev/null +++ b/tests/timed_test.py @@ -0,0 +1,27 @@ +import prometheus_client + +from fixcloudutils.asyncio.timed import timed + + +@timed("fixcloudutils", "test_async") +async def some_fn_async() -> int: + return 23 + + +@timed("fixcloudutils", "test", is_async=False) +def some_fn() -> int: + return 23 + + +def test_timed() -> None: + for a in range(10): + assert some_fn() == 23 + gl = prometheus_client.generate_latest().decode("utf-8") + assert 'method_call_duration_bucket{le="0.005",module="fixcloudutils",name="test"} 10.0' in gl + + +async def test_async_timed() -> None: + for a in range(10): + assert await some_fn_async() == 23 + gl = prometheus_client.generate_latest().decode("utf-8") + assert 'method_call_duration_bucket{le="0.005",module="fixcloudutils",name="test_async"} 10.0' in gl diff --git a/tox.ini b/tox.ini index 8d7bc23..746c1e5 100644 --- a/tox.ini +++ b/tox.ini @@ -8,7 +8,7 @@ asyncio_mode= auto [testenv] usedevelop = true deps = - --editable=".[test, arango, redis]" + --editable=".[test, arango, redis, prometheus]" # until this is fixed: https://github.com/pypa/setuptools/issues/3518 setenv = SETUPTOOLS_ENABLE_FEATURES = legacy-editable