From cafffd598eac9c0a879d790f2df05a05e2916dd9 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Fri, 8 Dec 2023 12:48:32 +0100 Subject: [PATCH 1/2] [feat] Use Hash for Cache Entries --- fixcloudutils/redis/cache.py | 129 +++++++++++++++-------------------- pyproject.toml | 2 +- tests/cache_test.py | 26 ++++--- 3 files changed, 70 insertions(+), 87 deletions(-) diff --git a/fixcloudutils/redis/cache.py b/fixcloudutils/redis/cache.py index 491ab8b..5d5ebe4 100644 --- a/fixcloudutils/redis/cache.py +++ b/fixcloudutils/redis/cache.py @@ -17,7 +17,7 @@ import pickle from asyncio import Task, Queue, CancelledError, create_task from datetime import datetime, timedelta -from typing import Any, Optional, TypeVar, Callable, ParamSpec, NewType +from typing import Any, Optional, TypeVar, Callable, ParamSpec, NewType, Hashable from attr import frozen from prometheus_client import Counter @@ -40,6 +40,7 @@ @frozen class RedisCacheSet: key: RedisKey + fn_key: str value: Any ttl: timedelta @@ -57,6 +58,7 @@ class RedisCacheEvict: class RedisCacheEntry: key: Any redis_key: RedisKey + fn_key: str value: Any deadline: datetime @@ -101,23 +103,15 @@ async def stop(self) -> None: await self.cleaner_task.stop() self.started = False - async def evict(self, fn_name: str, key: str) -> None: - log.info(f"{self.key}:{fn_name} Evict {key}") - await self.queue.put(RedisCacheEvict(self._redis_key(fn_name, key))) - - def evict_with(self, fn: Callable[P, T]) -> Callable[P, T]: - async def evict_fn(*args: Any, **kwargs: Any) -> None: - key = self._redis_key(fn.__name__, None, *args, **kwargs) - log.info(f"{self.key}:{fn.__name__} Evict args based key: {key}") - await self.queue.put(RedisCacheEvict(key)) - - return evict_fn # type: ignore + async def evict(self, key: str) -> None: + log.info(f"{self.key}: Evict {key}") + await self.queue.put(RedisCacheEvict(self._redis_key(key))) def call( self, fn: Callable[P, T], + key: str, *, - key: Optional[str] = None, ttl_memory: Optional[timedelta] = None, ttl_redis: Optional[timedelta] = None, ) -> Callable[P, T]: @@ -137,24 +131,25 @@ def call( async def handle_call(*args: Any, **kwargs: Any) -> T: # check if the value is available in the local cache fns = fn.__name__ - local_cache_key = key or (fns, *args, *kwargs.values()) + fn_key = self._fn_key(fns, *args, *kwargs.values()) + local_cache_key = (key, fn_key) if local_value := self.local_cache.get(local_cache_key): - log.info(f"{self.key}:{fns} Serve result from local cache.") + log.info(f"{self.key}:{fns}:{key} Serve result from local cache.") CacheHit.labels(self.key, "local").inc() return local_value.value # type: ignore # check if the value is available in redis - redis_key = self._redis_key(fns, key, *args, **kwargs) - if redis_value := await self.redis.get(redis_key): - log.info(f"{self.key}:{fns} Serve result from redis cache.") + redis_key = self._redis_key(key) + if redis_value := await self.redis.hget(redis_key, fn_key): # type: ignore + log.info(f"{self.key}:{fns}:{key} Serve result from redis cache.") CacheHit.labels(self.key, "redis").inc() result: T = pickle.loads(base64.b64decode(redis_value)) - self._add_to_local_cache(local_cache_key, redis_key, result, ttl_memory or self.ttl_memory) + self._add_to_local_cache(local_cache_key, redis_key, fn_key, result, ttl_memory or self.ttl_memory) return result # call the function result = await fn(*args, **kwargs) # type: ignore CacheHit.labels(self.key, "call").inc() - self._add_to_local_cache(local_cache_key, redis_key, result, ttl_memory or self.ttl_memory) - await self.queue.put(RedisCacheSet(redis_key, result, ttl_redis or self.ttl_redis)) + self._add_to_local_cache(local_cache_key, redis_key, fn_key, result, ttl_memory or self.ttl_memory) + await self.queue.put(RedisCacheSet(redis_key, fn_key, result, ttl_redis or self.ttl_redis)) return result return handle_call # type: ignore @@ -167,9 +162,10 @@ async def _process_queue(self) -> None: try: entry = await self.queue.get() if isinstance(entry, RedisCacheSet): - log.info(f"{self.key}: Store cached value in redis as {entry.key}") - value = base64.b64encode(pickle.dumps(entry.value)) - await self.redis.set(name=entry.key, value=value, ex=entry.ttl) + log.info(f"{self.key}: Store cached value in redis as {entry.key}:{entry.fn_key}") + value = base64.b64encode(pickle.dumps(entry.value)).decode("utf-8") + await self.redis.hset(name=entry.key, key=entry.fn_key, value=value) # type: ignore + await self.redis.expire(name=entry.key, time=entry.ttl) elif isinstance(entry, RedisCacheEvict): log.info(f"{self.key}: Delete cached value from redis key {entry.key}") # delete the entry @@ -206,56 +202,39 @@ async def _wipe_outdated_from_local_cache(self) -> None: log.info(f"Evicting {key} from local cache") del self.local_cache[key] - def _add_to_local_cache(self, key: Any, redis_key: RedisKey, value: Any, ttl: timedelta) -> None: - entry = RedisCacheEntry(key=key, redis_key=redis_key, value=value, deadline=utc() + ttl) + def _add_to_local_cache(self, key: Any, redis_key: RedisKey, fn_key: str, value: Any, ttl: timedelta) -> None: + entry = RedisCacheEntry(key=key, redis_key=redis_key, fn_key=fn_key, value=value, deadline=utc() + ttl) self.local_cache[key] = entry - def _remove_from_local_cache(self, redis_key: str) -> None: - local_key: Optional[Any] = None - for key, entry in self.local_cache.items(): - if entry.redis_key == redis_key: - local_key = key - break - if local_key: - log.info(f"Evicting {redis_key} from local cache") - del self.local_cache[local_key] - - def _redis_key(self, fn_name: str, fn_key: Optional[str], *args: Any, **kwargs: Any) -> RedisKey: - if fn_key is None: - sha = hashlib.sha256() - for a in args: - sha.update(pickle.dumps(a)) - for k, v in kwargs.items(): - sha.update(pickle.dumps(k)) - sha.update(pickle.dumps(v)) - fn_key = sha.hexdigest() - return RedisKey(f"cache:{self.key}:{fn_name}:{fn_key}") - - -class redis_cached: # noqa - """ - Decorator for caching function calls in redis. - - Usage: - >>> redis = Redis() - >>> cache = RedisCache(redis, "test", "test1") - >>> @redis_cached(cache) - ... async def f(a: int, b: int) -> int: - ... return a + b - """ - - def __init__( - self, - cache: RedisCache, - *, - key: Optional[str] = None, - ttl_memory: Optional[timedelta] = None, - ttl_redis: Optional[timedelta] = None, - ): - self.cache = cache - self.key = key - self.ttl_memory = ttl_memory - self.ttl_redis = ttl_redis - - def __call__(self, fn: Callable[P, T]) -> Callable[P, T]: - return self.cache.call(fn, key=self.key, ttl_memory=self.ttl_memory, ttl_redis=self.ttl_redis) + def _remove_from_local_cache(self, redis_key: RedisKey) -> None: + entries = [key for key, entry in self.local_cache.items() if entry.redis_key == redis_key] + if entries: + log.info(f"Evicting {len(entries)} entries for key {redis_key} from local cache") + for k in entries: + del self.local_cache[k] + + def _redis_key(self, key: str) -> RedisKey: + return RedisKey(f"cache:{self.key}:{key}") + + def _fn_key(self, fn_name: str, *args: Any, **kwargs: Any) -> str: + counter = 0 + + def object_hash(obj: Any) -> bytes: + nonlocal counter + if isinstance(obj, Hashable): + counter += 1 + return f"{counter}:{hash(obj)}".encode("utf-8") + else: + return pickle.dumps(obj) + + if len(args) == 0 and len(kwargs) == 0: + return fn_name + + sha = hashlib.sha256() + sha.update(fn_name.encode("utf-8")) + for a in args: + sha.update(object_hash(a)) + for k, v in kwargs.items(): + sha.update(object_hash(k)) + sha.update(object_hash(v)) + return sha.hexdigest() diff --git a/pyproject.toml b/pyproject.toml index 11e9b8d..aa8486f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "fixcloudutils" -version = "1.12.0" +version = "1.13.0" authors = [{ name = "Some Engineering Inc." }] description = "Utilities for fixcloud." license = { file = "LICENSE" } diff --git a/tests/cache_test.py b/tests/cache_test.py index f4f090a..fd18536 100644 --- a/tests/cache_test.py +++ b/tests/cache_test.py @@ -47,41 +47,45 @@ async def complex_function(a: int, b: int) -> int: async def local_cache_is_empty(cache: RedisCache) -> bool: return len(cache.local_cache) == 0 - key = cache1._redis_key(complex_function.__name__, None, 1, 2) - assert await cache1.call(complex_function)(1, 2) == 3 + c1 = "customer_1" + redis_key = cache1._redis_key(c1) + fn_key = cache1._fn_key(complex_function.__name__, 1, 2) + assert await cache1.call(complex_function, c1)(1, 2) == 3 assert call_count == 1 assert len(cache1.local_cache) == 1 # should come from internal memory cache - assert await cache1.call(complex_function)(1, 2) == 3 + assert await cache1.call(complex_function, c1)(1, 2) == 3 assert call_count == 1 - await eventually(redis.exists, key, timeout=2) + await eventually(redis.hexists, redis_key, fn_key, timeout=2) # type: ignore # should come from redis cache assert len(cache2.local_cache) == 0 - assert await cache2.call(complex_function)(1, 2) == 3 + assert await cache2.call(complex_function, c1)(1, 2) == 3 assert call_count == 1 assert len(cache2.local_cache) == 1 # after ttl expires, the local cache is empty await eventually(local_cache_is_empty, cache1, timeout=1) await eventually(local_cache_is_empty, cache2, timeout=1) # after redis ttl the cache is evicted - await eventually(redis.exists, key, fn=lambda x: not x, timeout=2) + await eventually(redis.hexists, redis_key, fn_key, fn=lambda x: not x, timeout=2) # type: ignore # calling this method again should trigger a new call and a new cache entry # we use a loner redis ttl to test the eviction of the redis cache for a in range(100): - assert await cache1.call(complex_function, ttl_redis=t3)(a, 2) == a + 2 + assert await cache1.call(complex_function, c1, ttl_redis=t3)(a, 2) == a + 2 assert call_count == 101 assert len(cache1.local_cache) == 100 + await eventually(redis.hlen, redis_key, fn=lambda x: x == 100, timeout=2) # type: ignore + for a in range(100): - assert await cache1.call(complex_function)(a, 2) == a + 2 - assert await cache2.call(complex_function)(a, 2) == a + 2 + assert await cache1.call(complex_function, c1)(a, 2) == a + 2 + assert await cache2.call(complex_function, c1)(a, 2) == a + 2 assert len(cache1.local_cache) == 100 assert len(cache2.local_cache) == 100 # no more calls are done assert call_count == 101 # evict all entries should evict all messages in all caches - for a in range(100): - await cache1.evict_with(complex_function)(a, 2) + await cache1.evict(c1) await eventually(local_cache_is_empty, cache1, timeout=1) await eventually(local_cache_is_empty, cache2, timeout=1) + await eventually(redis.exists, redis_key, fn=lambda x: not x, timeout=2) From 55ccfdb29c411a08572b528a82a2ffd262dd0117 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Fri, 8 Dec 2023 13:04:10 +0100 Subject: [PATCH 2/2] better logging --- fixcloudutils/redis/cache.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/fixcloudutils/redis/cache.py b/fixcloudutils/redis/cache.py index 5d5ebe4..4006bed 100644 --- a/fixcloudutils/redis/cache.py +++ b/fixcloudutils/redis/cache.py @@ -40,6 +40,7 @@ @frozen class RedisCacheSet: key: RedisKey + fn_name: str fn_key: str value: Any ttl: timedelta @@ -149,7 +150,7 @@ async def handle_call(*args: Any, **kwargs: Any) -> T: result = await fn(*args, **kwargs) # type: ignore CacheHit.labels(self.key, "call").inc() self._add_to_local_cache(local_cache_key, redis_key, fn_key, result, ttl_memory or self.ttl_memory) - await self.queue.put(RedisCacheSet(redis_key, fn_key, result, ttl_redis or self.ttl_redis)) + await self.queue.put(RedisCacheSet(redis_key, fns, fn_key, result, ttl_redis or self.ttl_redis)) return result return handle_call # type: ignore @@ -162,7 +163,7 @@ async def _process_queue(self) -> None: try: entry = await self.queue.get() if isinstance(entry, RedisCacheSet): - log.info(f"{self.key}: Store cached value in redis as {entry.key}:{entry.fn_key}") + log.info(f"{self.key}:{entry.fn_name} Store cached value in redis as {entry.key}:{entry.fn_key}") value = base64.b64encode(pickle.dumps(entry.value)).decode("utf-8") await self.redis.hset(name=entry.key, key=entry.fn_key, value=value) # type: ignore await self.redis.expire(name=entry.key, time=entry.ttl)