From a57f66e58cc74a62764c57a3a8bf27ee69996ae9 Mon Sep 17 00:00:00 2001 From: Grigorii Statsenko Date: Mon, 18 Dec 2023 12:53:06 +0100 Subject: [PATCH] Revert "Separated CacheKeyBuilder from EntityCacheEngineBase (#33)" This reverts commit 03a5b8a6e69591e5e896dc8d800c2545a53d5a60. --- .../dl_core/data_processing/cache/engine.py | 83 +++++++------------ 1 file changed, 30 insertions(+), 53 deletions(-) diff --git a/lib/dl_core/dl_core/data_processing/cache/engine.py b/lib/dl_core/dl_core/data_processing/cache/engine.py index f56bf167b..768ca52bf 100644 --- a/lib/dl_core/dl_core/data_processing/cache/engine.py +++ b/lib/dl_core/dl_core/data_processing/cache/engine.py @@ -68,7 +68,6 @@ from __future__ import annotations -import abc import asyncio import collections import contextlib @@ -432,9 +431,10 @@ def _make_rcl(self) -> Tuple[RedisCacheLock, HistoryHolder]: local_key_rep = self.local_key_rep local_key_rep.validate() key = local_key_rep.key_parts_hash - key_root = cache_engine._cache_key_builder.get_key_root() + key_root = cache_engine._get_key_root() # Keys structure: `resource_tag + ('/data:'|'/notif:'|'/lock:') + key` # e.g. 'bic_dp_ce_l__iyirei5oqew0c__query_cache//data:657f18518eaa2f41307895e18c3ba0d12d97b8a23c6de3966f52c6ba39a07ee4' + # NOTE: not using `cache_engine._get_key_query_cache_entry` at all history_holder = HistoryHolder() rcl = RedisCacheLockWrapped( key=key, @@ -541,48 +541,6 @@ def save_history(self, error: Any = None, save_error: Any = None) -> None: ) -_CKB_TV = TypeVar("_CKB_TV", bound="CacheKeyBuilder") - - -@attr.s(frozen=True) -class CacheKeyBuilder(abc.ABC): - def clone(self: _CKB_TV, **updates: Any) -> _CKB_TV: - return attr.evolve(self, **updates) - - @abc.abstractmethod - def get_key_root(self) -> str: - raise NotImplementedError - - @abc.abstractmethod - def make_cache_key(self, local_key_rep: LocalKeyRepresentation) -> str: - raise NotImplementedError - - @abc.abstractmethod - def get_all_keys_pattern(self) -> str: - """Returns pattern for all cached keys for the entity""" - raise NotImplementedError - - -@attr.s(frozen=True) -class DefaultCacheKeyBuilder(CacheKeyBuilder): - entity_id: str = attr.ib(kw_only=True) - key_prefix_tpl: str = attr.ib(kw_only=True, default="bic_dp_ce__{entity_id}__query_cache/") - data_keys_suffix: str = attr.ib(kw_only=True, default="") - - def get_key_root(self) -> str: - return self.key_prefix_tpl.format(entity_id=self.entity_id) - - def make_cache_key(self, local_key_rep: LocalKeyRepresentation) -> str: - return "{entity_root_key}/{local_key_rep_str}".format( - entity_root_key=self.get_key_root(), - local_key_rep_str=local_key_rep.key_parts_hash, - ) - - def get_all_keys_pattern(self) -> str: - """Returns pattern for all cached keys for the entity""" - return self.get_key_root() + self.data_keys_suffix + "*" - - _ECE_TV = TypeVar("_ECE_TV", bound="EntityCacheEngineBase") @@ -595,15 +553,29 @@ class EntityCacheEngineBase: DEFAULT_COMPRESS_ALG: CompressAlg = CompressAlg.GZIP MIN_BYTES_TO_COMPRESS: int = 120 - _cache_key_builder: CacheKeyBuilder = attr.ib(kw_only=True) - - @_cache_key_builder.default - def __make_cache_key_builder(self) -> CacheKeyBuilder: - return DefaultCacheKeyBuilder(entity_id=self.entity_id) + # Old keys, effectively, were 'dataset:{entity_id}:query_cache:' + key_prefix_tpl: str = "bic_dp_ce__{entity_id}__query_cache/" # dl_core data_processing cache_engine + data_keys_suffix: str = "" def clone(self: _ECE_TV, **updates: Any) -> _ECE_TV: return attr.evolve(self, **updates) + # Keys section + def _get_key_root(self) -> str: + return self.key_prefix_tpl.format(entity_id=self.entity_id) + + def _get_key_query_cache_entry(self, local_key_rep: LocalKeyRepresentation) -> str: + local_key_rep.validate() + # noinspection PyProtectedMember + return "{entity_root_key}/{local_key_rep_str}".format( + entity_root_key=self._get_key_root(), + local_key_rep_str=local_key_rep.key_parts_hash, + ) + + def _get_all_keys_pattern(self) -> str: + """Returns pattern for all cached keys for the entity""" + return self._get_key_root() + self.data_keys_suffix + "*" + def _make_cache_update_request( self, local_key_rep: LocalKeyRepresentation, @@ -619,7 +591,7 @@ def _make_cache_update_request( if compress_alg is None: compress_alg = self.DEFAULT_COMPRESS_ALG - full_key = self._cache_key_builder.make_cache_key(local_key_rep=local_key_rep) + full_key = self._get_key_query_cache_entry(local_key_rep=local_key_rep) # noinspection PyProtectedMember entry_data, details = ResultCacheEntry( @@ -651,7 +623,7 @@ def _make_full_key_and_log_details( local_key_rep: LocalKeyRepresentation, new_ttl_sec: Optional[float] = None, ) -> Tuple[str, dict]: - full_key = self._cache_key_builder.make_cache_key(local_key_rep) + full_key = self._get_key_query_cache_entry(local_key_rep) details = dict( cache_key=full_key, @@ -761,7 +733,12 @@ def as_lockable(self) -> EntityCacheEngineAsync: Use the entry manager that synchronizes the data generation over a lock, and a different key prefix just in case. """ - return self.clone(entity_cache_entry_manager_cls=EntityCacheEntryLockedManagerAsync) + return self.clone( + entity_cache_entry_manager_cls=EntityCacheEntryLockedManagerAsync, + # dl_core data_processing cache_engine locked + key_prefix_tpl="bic_dp_ce_l__{entity_id}__query_cache/", + data_keys_suffix="/data:*", + ) # Redis helpers @generic_profiler_async("qcache-invalidate") # type: ignore # TODO: fix @@ -880,7 +857,7 @@ async def _redis_get( return result async def invalidate_all(self) -> None: - await self._delete_keys_by_pattern(self._cache_key_builder.get_all_keys_pattern()) + await self._delete_keys_by_pattern(self._get_all_keys_pattern()) def get_cache_entry_manager( self,