Skip to content

Commit

Permalink
better logs and prometheus counter
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias committed Dec 7, 2023
1 parent c6f0925 commit 0de94a0
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion fixcloudutils/redis/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import Any, Optional, TypeVar, Callable, ParamSpec, NewType

from attr import frozen
from prometheus_client import Counter
from redis.asyncio import Redis

from fixcloudutils.asyncio import stop_running_task
Expand All @@ -33,6 +34,7 @@
RedisKey = NewType("RedisKey", str)
P = ParamSpec("P")
T = TypeVar("T")
CacheHit = Counter("redis_cache", "Redis Cache", ["key", "stage"])


@frozen
Expand Down Expand Up @@ -100,11 +102,13 @@ async def stop(self) -> None:
self.started = False

async def evict(self, fn_name: str, key: str) -> None:
log.info(f"{self.key}:{fn_name} Evict {key}")
await self.queue.put(RedisCacheEvict(self._redis_key(fn_name, key)))

def evict_with(self, fn: Callable[P, T]) -> Callable[P, T]:
async def evict_fn(*args: Any, **kwargs: Any) -> None:
key = self._redis_key(fn.__name__, None, *args, **kwargs)
log.info(f"{self.key}:{fn.__name__} Evict args based key: {key}")
await self.queue.put(RedisCacheEvict(key))

return evict_fn # type: ignore
Expand Down Expand Up @@ -135,15 +139,20 @@ async def handle_call(*args: Any, **kwargs: Any) -> T:
fns = fn.__name__
local_cache_key = key or (fns, *args, *kwargs.values())
if local_value := self.local_cache.get(local_cache_key):
log.info(f"{self.key}:{fns} Serve result from local cache.")
CacheHit.labels(self.key, "local").inc()
return local_value.value # type: ignore
# check if the value is available in redis
redis_key = self._redis_key(fns, key, *args, **kwargs)
if redis_value := await self.redis.get(redis_key):
log.info(f"{self.key}:{fns} Serve result from redis cache.")
CacheHit.labels(self.key, "redis").inc()
result: T = pickle.loads(base64.b64decode(redis_value))
self._add_to_local_cache(local_cache_key, redis_key, result, ttl_memory or self.ttl_memory)
return result
# call the function
result = await fn(*args, **kwargs) # type: ignore
CacheHit.labels(self.key, "call").inc()
self._add_to_local_cache(local_cache_key, redis_key, result, ttl_memory or self.ttl_memory)
await self.queue.put(RedisCacheSet(redis_key, result, ttl_redis or self.ttl_redis))
return result
Expand All @@ -158,9 +167,11 @@ async def _process_queue(self) -> None:
try:
entry = await self.queue.get()
if isinstance(entry, RedisCacheSet):
log.info(f"{self.key}: Store cached value in redis as {entry.key}")
value = base64.b64encode(pickle.dumps(entry.value))
await self.redis.set(name=entry.key, value=value, ex=entry.ttl)
elif isinstance(entry, RedisCacheEvict):
log.info(f"{self.key}: Delete cached value from redis key {entry.key}")
# delete the entry
await self.redis.delete(entry.key)
# inform all other cache instances to evict the key
Expand Down Expand Up @@ -192,7 +203,7 @@ async def _wipe_outdated_from_local_cache(self) -> None:
now = utc()
for key, entry in list(self.local_cache.items()):
if entry.deadline < now:
log.debug(f"Evicting {key} from local cache")
log.info(f"Evicting {key} from local cache")
del self.local_cache[key]

def _add_to_local_cache(self, key: Any, redis_key: RedisKey, value: Any, ttl: timedelta) -> None:
Expand All @@ -206,6 +217,7 @@ def _remove_from_local_cache(self, redis_key: str) -> None:
local_key = key
break
if local_key:
log.info(f"Evicting {redis_key} from local cache")
del self.local_cache[local_key]

def _redis_key(self, fn_name: str, fn_key: Optional[str], *args: Any, **kwargs: Any) -> RedisKey:
Expand Down

0 comments on commit 0de94a0

Please sign in to comment.