Skip to content

Commit

Permalink
Add some profiling to mutation caches; propagate allow_slave up to da…
Browse files Browse the repository at this point in the history
…taset preparer (#378)

* Add some profiling to mutation caches; propagate allow_slave up to dataset preparer

* fix profilers; fix mypy

* mypy ignore (i'm sorry)

* remove some mypy ignores
  • Loading branch information
KonstantAnxiety authored Mar 19, 2024
1 parent 94fb9b9 commit 3b2c693
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 12 deletions.
16 changes: 12 additions & 4 deletions lib/dl_api_lib/dl_api_lib/app/data_api/resources/dataset/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@
FieldAction,
)
from dl_api_lib.service_registry.service_registry import ApiServiceRegistry
from dl_app_tools.profiling_base import GenericProfiler
from dl_app_tools.profiling_base import (
GenericProfiler,
generic_profiler,
generic_profiler_async,
)
from dl_constants.enums import DataSourceRole
from dl_core.components.accessor import DatasetComponentAccessor
from dl_core.data_source.base import DataSource
Expand Down Expand Up @@ -200,14 +204,15 @@ def try_get_mutation_key(self, updates: List[Action]) -> Optional[MutationKey]:
return UpdateDatasetMutationKey.create(self.dataset.revision_id, updates) # type: ignore # 2024-01-30 # TODO: Argument 2 to "create" of "UpdateDatasetMutationKey" has incompatible type "list[Action]"; expected "list[FieldAction]" [arg-type]
return None

def try_get_cache(self) -> Optional[USEntryMutationCache]:
@generic_profiler("mutation-cache-init")
def try_get_cache(self, allow_slave: bool) -> Optional[USEntryMutationCache]:
try:
mc_factory = self.dl_request.services_registry.get_mutation_cache_factory()
if mc_factory is None:
LOGGER.debug("Mutation cache is disabled")
return None
mce_factory = self.dl_request.services_registry.get_mutation_cache_engine_factory(RedisCacheEngine)
cache_engine = mce_factory.get_cache_engine()
cache_engine = mce_factory.get_cache_engine(allow_slave)
mutation_cache = mc_factory.get_mutation_cache(
usm=self.dl_request.us_manager,
engine=cache_engine,
Expand All @@ -217,6 +222,7 @@ def try_get_cache(self) -> Optional[USEntryMutationCache]:
LOGGER.error("Mutation cache error", exc_info=True)
return None

@generic_profiler_async("mutation-cache-get") # type: ignore # TODO: fix
async def try_get_dataset_from_cache(
self,
mutation_cache: Optional[USEntryMutationCache],
Expand All @@ -241,6 +247,7 @@ async def try_get_dataset_from_cache(
assert isinstance(cached_dataset, Dataset)
return cached_dataset

@generic_profiler_async("mutation-cache-save") # type: ignore # TODO: fix
async def try_save_dataset_to_cache(
self,
mutation_cache: Optional[USEntryMutationCache],
Expand All @@ -266,7 +273,8 @@ async def prepare_dataset_for_request(

with GenericProfiler("dataset-prepare"):
if enable_mutation_caching:
mutation_cache = self.try_get_cache()
mutation_cache = self.try_get_cache(allow_slave=False)
# TODO consider: ^ analyze profiling & maybe use allow_slave=True when reading from cache
mutation_key = self.try_get_mutation_key(req_model.updates)
cached_dataset = await self.try_get_dataset_from_cache(mutation_cache, mutation_key)
if cached_dataset:
Expand Down
8 changes: 5 additions & 3 deletions lib/dl_app_tools/dl_app_tools/profiling_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@ def _update_profile_result(self, exc_info: Optional[TExcInfo] = None, log_data:

def generic_profiler(
stage: str,
extra_data: dict = None, # type: ignore # 2024-01-24 # TODO: Incompatible default for argument "extra_data" (default has type "None", argument has type "dict[Any, Any]") [assignment]
logger: logging.Logger = None, # type: ignore # 2024-01-24 # TODO: Incompatible default for argument "logger" (default has type "None", argument has type "Logger") [assignment]
extra_data: Optional[dict] = None,
logger: Optional[logging.Logger] = None,
) -> Callable[[_GP_FUNC_T], _GP_FUNC_T]:
def generic_profiler_wrap(func: _GP_FUNC_T) -> _GP_FUNC_T:
if inspect.iscoroutinefunction(func):
Expand All @@ -265,7 +265,9 @@ def generic_profiler_wrapper(*args: Any, **kwargs: Any) -> _GP_FUNC_RET_TV: # t


def generic_profiler_async(
stage: str, extra_data: dict = None, logger: logging.Logger = None # type: ignore # 2024-01-24 # TODO: Incompatible default for argument "extra_data" (default has type "None", argument has type "dict[Any, Any]") [assignment]
stage: str,
extra_data: Optional[dict] = None,
logger: Optional[logging.Logger] = None,
) -> Callable[[_GPA_CORO_TV], _GPA_CORO_TV]:
def generic_profiler_wrap_async(coro: _GPA_CORO_TV) -> _GPA_CORO_TV:
if not inspect.iscoroutinefunction(coro):
Expand Down
10 changes: 5 additions & 5 deletions lib/dl_core/dl_core/us_manager/mutation_cache/engine_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class CacheInitializationError(Exception):

class MutationCacheEngineFactory(metaclass=abc.ABCMeta):
@abc.abstractmethod
def get_cache_engine(self) -> GenericCacheEngine:
def get_cache_engine(self, allow_slave: bool) -> GenericCacheEngine:
pass


Expand All @@ -48,20 +48,20 @@ def _get_memory_cache_engine_singleton(cls) -> MemoryCacheEngine:
def service_registry(self) -> "ServicesRegistry":
return self._services_registry_ref.ref

def _get_redis_cache_engine(self) -> Optional[RedisCacheEngine]:
def _get_redis_cache_engine(self, allow_slave: bool) -> Optional[RedisCacheEngine]:
try:
redis_client = self.service_registry.get_mutations_redis_client()
redis_client = self.service_registry.get_mutations_redis_client(allow_slave)
if redis_client is None:
return None
return RedisCacheEngine(redis_client)
except ValueError:
return None

def get_cache_engine(self) -> GenericCacheEngine:
def get_cache_engine(self, allow_slave: bool) -> GenericCacheEngine:
if self.cache_type == MemoryCacheEngine:
return self._get_memory_cache_engine_singleton()
elif self.cache_type == RedisCacheEngine:
redis_cache_engine = self._get_redis_cache_engine()
redis_cache_engine = self._get_redis_cache_engine(allow_slave)
if redis_cache_engine:
return redis_cache_engine
LOGGER.info("Can not create mutation cache engine: service registry did not return a Redis client")
Expand Down

0 comments on commit 3b2c693

Please sign in to comment.