From 0de94a080051f88dbd2e2566c2f41c19c2b12c2d Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Thu, 7 Dec 2023 20:17:52 +0100 Subject: [PATCH] better logs and prometheus counter --- fixcloudutils/redis/cache.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/fixcloudutils/redis/cache.py b/fixcloudutils/redis/cache.py index 5a4febc..491ab8b 100644 --- a/fixcloudutils/redis/cache.py +++ b/fixcloudutils/redis/cache.py @@ -20,6 +20,7 @@ from typing import Any, Optional, TypeVar, Callable, ParamSpec, NewType from attr import frozen +from prometheus_client import Counter from redis.asyncio import Redis from fixcloudutils.asyncio import stop_running_task @@ -33,6 +34,7 @@ RedisKey = NewType("RedisKey", str) P = ParamSpec("P") T = TypeVar("T") +CacheHit = Counter("redis_cache", "Redis Cache", ["key", "stage"]) @frozen @@ -100,11 +102,13 @@ async def stop(self) -> None: self.started = False async def evict(self, fn_name: str, key: str) -> None: + log.info(f"{self.key}:{fn_name} Evict {key}") await self.queue.put(RedisCacheEvict(self._redis_key(fn_name, key))) def evict_with(self, fn: Callable[P, T]) -> Callable[P, T]: async def evict_fn(*args: Any, **kwargs: Any) -> None: key = self._redis_key(fn.__name__, None, *args, **kwargs) + log.info(f"{self.key}:{fn.__name__} Evict args based key: {key}") await self.queue.put(RedisCacheEvict(key)) return evict_fn # type: ignore @@ -135,15 +139,20 @@ async def handle_call(*args: Any, **kwargs: Any) -> T: fns = fn.__name__ local_cache_key = key or (fns, *args, *kwargs.values()) if local_value := self.local_cache.get(local_cache_key): + log.info(f"{self.key}:{fns} Serve result from local cache.") + CacheHit.labels(self.key, "local").inc() return local_value.value # type: ignore # check if the value is available in redis redis_key = self._redis_key(fns, key, *args, **kwargs) if redis_value := await self.redis.get(redis_key): + log.info(f"{self.key}:{fns} Serve result from redis cache.") + CacheHit.labels(self.key, "redis").inc() result: T = pickle.loads(base64.b64decode(redis_value)) self._add_to_local_cache(local_cache_key, redis_key, result, ttl_memory or self.ttl_memory) return result # call the function result = await fn(*args, **kwargs) # type: ignore + CacheHit.labels(self.key, "call").inc() self._add_to_local_cache(local_cache_key, redis_key, result, ttl_memory or self.ttl_memory) await self.queue.put(RedisCacheSet(redis_key, result, ttl_redis or self.ttl_redis)) return result @@ -158,9 +167,11 @@ async def _process_queue(self) -> None: try: entry = await self.queue.get() if isinstance(entry, RedisCacheSet): + log.info(f"{self.key}: Store cached value in redis as {entry.key}") value = base64.b64encode(pickle.dumps(entry.value)) await self.redis.set(name=entry.key, value=value, ex=entry.ttl) elif isinstance(entry, RedisCacheEvict): + log.info(f"{self.key}: Delete cached value from redis key {entry.key}") # delete the entry await self.redis.delete(entry.key) # inform all other cache instances to evict the key @@ -192,7 +203,7 @@ async def _wipe_outdated_from_local_cache(self) -> None: now = utc() for key, entry in list(self.local_cache.items()): if entry.deadline < now: - log.debug(f"Evicting {key} from local cache") + log.info(f"Evicting {key} from local cache") del self.local_cache[key] def _add_to_local_cache(self, key: Any, redis_key: RedisKey, value: Any, ttl: timedelta) -> None: @@ -206,6 +217,7 @@ def _remove_from_local_cache(self, redis_key: str) -> None: local_key = key break if local_key: + log.info(f"Evicting {redis_key} from local cache") del self.local_cache[local_key] def _redis_key(self, fn_name: str, fn_key: Optional[str], *args: Any, **kwargs: Any) -> RedisKey: