Skip to content

Commit

Permalink
Separated CacheKeyBuilder from EntityCacheEngineBase
Browse files Browse the repository at this point in the history
  • Loading branch information
altvod committed Oct 27, 2023
1 parent 26128aa commit 2afb994
Showing 1 changed file with 53 additions and 30 deletions.
83 changes: 53 additions & 30 deletions lib/dl_core/dl_core/data_processing/cache/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@

from __future__ import annotations

import abc
import asyncio
import collections
import contextlib
Expand Down Expand Up @@ -431,10 +432,9 @@ def _make_rcl(self) -> Tuple[RedisCacheLock, HistoryHolder]:
local_key_rep = self.local_key_rep
local_key_rep.validate()
key = local_key_rep.key_parts_hash
key_root = cache_engine._get_key_root()
key_root = cache_engine._cache_key_builder.get_key_root()
# Keys structure: `resource_tag + ('/data:'|'/notif:'|'/lock:') + key`
# e.g. 'bic_dp_ce_l__iyirei5oqew0c__query_cache//data:657f18518eaa2f41307895e18c3ba0d12d97b8a23c6de3966f52c6ba39a07ee4'
# NOTE: not using `cache_engine._get_key_query_cache_entry` at all
history_holder = HistoryHolder()
rcl = RedisCacheLockWrapped(
key=key,
Expand Down Expand Up @@ -541,6 +541,48 @@ def save_history(self, error: Any = None, save_error: Any = None) -> None:
)


_CKB_TV = TypeVar("_CKB_TV", bound="CacheKeyBuilder")


@attr.s(frozen=True)
class CacheKeyBuilder(abc.ABC):
def clone(self: _CKB_TV, **updates: Any) -> _CKB_TV:
return attr.evolve(self, **updates)

@abc.abstractmethod
def get_key_root(self) -> str:
raise NotImplementedError

@abc.abstractmethod
def make_cache_key(self, local_key_rep: LocalKeyRepresentation) -> str:
raise NotImplementedError

@abc.abstractmethod
def get_all_keys_pattern(self) -> str:
"""Returns pattern for all cached keys for the entity"""
raise NotImplementedError


@attr.s(frozen=True)
class DefaultCacheKeyBuilder(CacheKeyBuilder):
entity_id: str = attr.ib(kw_only=True)
key_prefix_tpl: str = attr.ib(kw_only=True, default="bic_dp_ce__{entity_id}__query_cache/")
data_keys_suffix: str = attr.ib(kw_only=True, default="")

def get_key_root(self) -> str:
return self.key_prefix_tpl.format(entity_id=self.entity_id)

def make_cache_key(self, local_key_rep: LocalKeyRepresentation) -> str:
return "{entity_root_key}/{local_key_rep_str}".format(
entity_root_key=self.get_key_root(),
local_key_rep_str=local_key_rep.key_parts_hash,
)

def get_all_keys_pattern(self) -> str:
"""Returns pattern for all cached keys for the entity"""
return self.get_key_root() + self.data_keys_suffix + "*"


_ECE_TV = TypeVar("_ECE_TV", bound="EntityCacheEngineBase")


Expand All @@ -553,29 +595,15 @@ class EntityCacheEngineBase:
DEFAULT_COMPRESS_ALG: CompressAlg = CompressAlg.GZIP
MIN_BYTES_TO_COMPRESS: int = 120

# Old keys, effectively, were 'dataset:{entity_id}:query_cache:'
key_prefix_tpl: str = "bic_dp_ce__{entity_id}__query_cache/" # dl_core data_processing cache_engine
data_keys_suffix: str = ""
_cache_key_builder: CacheKeyBuilder = attr.ib(kw_only=True)

@_cache_key_builder.default
def __make_cache_key_builder(self) -> CacheKeyBuilder:
return DefaultCacheKeyBuilder(entity_id=self.entity_id)

def clone(self: _ECE_TV, **updates: Any) -> _ECE_TV:
return attr.evolve(self, **updates)

# Keys section
def _get_key_root(self) -> str:
return self.key_prefix_tpl.format(entity_id=self.entity_id)

def _get_key_query_cache_entry(self, local_key_rep: LocalKeyRepresentation) -> str:
local_key_rep.validate()
# noinspection PyProtectedMember
return "{entity_root_key}/{local_key_rep_str}".format(
entity_root_key=self._get_key_root(),
local_key_rep_str=local_key_rep.key_parts_hash,
)

def _get_all_keys_pattern(self) -> str:
"""Returns pattern for all cached keys for the entity"""
return self._get_key_root() + self.data_keys_suffix + "*"

def _make_cache_update_request(
self,
local_key_rep: LocalKeyRepresentation,
Expand All @@ -591,7 +619,7 @@ def _make_cache_update_request(
if compress_alg is None:
compress_alg = self.DEFAULT_COMPRESS_ALG

full_key = self._get_key_query_cache_entry(local_key_rep=local_key_rep)
full_key = self._cache_key_builder.make_cache_key(local_key_rep=local_key_rep)

# noinspection PyProtectedMember
entry_data, details = ResultCacheEntry(
Expand Down Expand Up @@ -623,7 +651,7 @@ def _make_full_key_and_log_details(
local_key_rep: LocalKeyRepresentation,
new_ttl_sec: Optional[float] = None,
) -> Tuple[str, dict]:
full_key = self._get_key_query_cache_entry(local_key_rep)
full_key = self._cache_key_builder.make_cache_key(local_key_rep)

details = dict(
cache_key=full_key,
Expand Down Expand Up @@ -733,12 +761,7 @@ def as_lockable(self) -> EntityCacheEngineAsync:
Use the entry manager that synchronizes the data generation over a lock,
and a different key prefix just in case.
"""
return self.clone(
entity_cache_entry_manager_cls=EntityCacheEntryLockedManagerAsync,
# dl_core data_processing cache_engine locked
key_prefix_tpl="bic_dp_ce_l__{entity_id}__query_cache/",
data_keys_suffix="/data:*",
)
return self.clone(entity_cache_entry_manager_cls=EntityCacheEntryLockedManagerAsync)

# Redis helpers
@generic_profiler_async("qcache-invalidate") # type: ignore # TODO: fix
Expand Down Expand Up @@ -857,7 +880,7 @@ async def _redis_get(
return result

async def invalidate_all(self) -> None:
await self._delete_keys_by_pattern(self._get_all_keys_pattern())
await self._delete_keys_by_pattern(self._cache_key_builder.get_all_keys_pattern())

def get_cache_entry_manager(
self,
Expand Down

0 comments on commit 2afb994

Please sign in to comment.