diff --git a/lib/dl_core/dl_core/data_processing/cache/utils.py b/lib/dl_core/dl_core/data_processing/cache/utils.py index 3fbecdb8c..e716b2d22 100644 --- a/lib/dl_core/dl_core/data_processing/cache/utils.py +++ b/lib/dl_core/dl_core/data_processing/cache/utils.py @@ -1,17 +1,14 @@ from __future__ import annotations +import abc import logging from typing import ( TYPE_CHECKING, Collection, - Optional, ) import attr -from sqlalchemy.exc import DatabaseError -from dl_constants.enums import DataSourceRole -from dl_core.data_processing.cache.exc import CachePreparationFailed from dl_core.data_processing.cache.primitives import ( BIQueryCacheOptions, CacheTTLConfig, @@ -19,7 +16,6 @@ DataKeyPart, LocalKeyRepresentation, ) -from dl_core.query.bi_query import QueryAndResultInfo from dl_core.serialization import hashable_dumps from dl_core.us_connection_base import ( ConnectionBase, @@ -28,24 +24,17 @@ if TYPE_CHECKING: - from sqlalchemy.engine.default import DefaultDialect - from sqlalchemy.sql import Select - - from dl_constants.enums import UserDataType from dl_constants.types import TJSONExt from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo from dl_core.data_source.base import DataSource - from dl_core.us_dataset import Dataset - from dl_core.us_manager.local_cache import USEntryBuffer LOGGER = logging.getLogger(__name__) @attr.s -class CacheOptionsBuilderBase: +class CacheOptionsBuilderBase(abc.ABC): default_ttl_config: CacheTTLConfig = attr.ib(factory=CacheTTLConfig) - _is_bleeding_edge_user: bool = attr.ib(default=False) def get_actual_ttl_config( self, @@ -60,28 +49,6 @@ def get_actual_ttl_config( return ctc - @staticmethod - def get_query_str_for_cache(query: Select, dialect: DefaultDialect) -> str: - try: - compiled_query = query.compile(dialect=dialect) - except DatabaseError as err: - raise CachePreparationFailed from err - - if isinstance(compiled_query.params, dict): - ordered_params = sorted( - compiled_query.params.items(), - key=lambda item: item[0], - ) - else: - ordered_params = compiled_query.params - - return ";".join( - ( - str(compiled_query), - str(ordered_params), - ) - ) - @staticmethod def config_to_ttl_info(ttl_config: CacheTTLConfig) -> CacheTTLInfo: return CacheTTLInfo( @@ -99,133 +66,44 @@ def get_cache_ttl_info(self, data_source_list: Collection[DataSource]) -> CacheT ) return self.config_to_ttl_info(ttl_config=ttl_config) - def get_data_key( - self, - *, - query_res_info: QueryAndResultInfo, - from_info: Optional[PreparedFromInfo] = None, - base_key: LocalKeyRepresentation = LocalKeyRepresentation(), - ) -> Optional[LocalKeyRepresentation]: - return base_key - @attr.s class DatasetOptionsBuilder(CacheOptionsBuilderBase): - cache_enabled: bool = attr.ib(kw_only=True, default=True) - def get_cache_options( - self, - joint_dsrc_info: PreparedFromInfo, - data_key: LocalKeyRepresentation, - ) -> BIQueryCacheOptions: + @abc.abstractmethod + def get_cache_enabled(self, joint_dsrc_info: PreparedFromInfo) -> bool: raise NotImplementedError - -@attr.s -class CompengOptionsBuilder(DatasetOptionsBuilder): # TODO: Move to compeng package - cache_enabled: bool = attr.ib(kw_only=True, default=True) - def get_cache_options( self, joint_dsrc_info: PreparedFromInfo, data_key: LocalKeyRepresentation, ) -> BIQueryCacheOptions: ttl_info = self.get_cache_ttl_info(data_source_list=joint_dsrc_info.data_source_list) + cache_enabled = self.get_cache_enabled(joint_dsrc_info=joint_dsrc_info) return BIQueryCacheOptions( - cache_enabled=self.cache_enabled, - key=data_key, + cache_enabled=cache_enabled, + key=data_key if cache_enabled else None, ttl_sec=ttl_info.ttl_sec, refresh_ttl_on_read=ttl_info.refresh_ttl_on_read, ) - def get_data_key( - self, - *, - query_res_info: QueryAndResultInfo, - from_info: Optional[PreparedFromInfo] = None, - base_key: LocalKeyRepresentation = LocalKeyRepresentation(), - ) -> Optional[LocalKeyRepresentation]: - # TODO: Remove after switching to new cache keys - compiled_query = self.get_query_str_for_cache( - query=query_res_info.query, - dialect=from_info.query_compiler.dialect, - ) - return base_key.extend(part_type="query", part_content=compiled_query) - @attr.s -class SelectorCacheOptionsBuilder(DatasetOptionsBuilder): - _is_bleeding_edge_user: bool = attr.ib(default=False) - _us_entry_buffer: USEntryBuffer = attr.ib(kw_only=True) +class CompengOptionsBuilder(DatasetOptionsBuilder): # TODO: Move to compeng package + cache_enabled: bool = attr.ib(kw_only=True, default=True) + + def get_cache_enabled(self, joint_dsrc_info: PreparedFromInfo) -> bool: + return self.cache_enabled + +@attr.s +class SelectorCacheOptionsBuilder(DatasetOptionsBuilder): # TODO: Rename to SourceDbCacheOptionsBuilder def get_cache_enabled(self, joint_dsrc_info: PreparedFromInfo) -> bool: assert joint_dsrc_info.data_source_list is not None cache_enabled = all(dsrc.cache_enabled for dsrc in joint_dsrc_info.data_source_list) return cache_enabled - def get_cache_options( - self, - joint_dsrc_info: PreparedFromInfo, - data_key: LocalKeyRepresentation, - ) -> BIQueryCacheOptions: - """Returns cache key, TTL for new entries, refresh TTL flag""" - - ttl_info = self.get_cache_ttl_info(data_source_list=joint_dsrc_info.data_source_list) - cache_enabled = self.get_cache_enabled(joint_dsrc_info=joint_dsrc_info) - return BIQueryCacheOptions( - cache_enabled=cache_enabled, - key=data_key if cache_enabled else None, - ttl_sec=ttl_info.ttl_sec, - refresh_ttl_on_read=ttl_info.refresh_ttl_on_read, - ) - - def make_data_select_cache_key( - self, - from_info: PreparedFromInfo, - compiled_query: str, - user_types: list[UserDataType], - is_bleeding_edge_user: bool, - base_key: LocalKeyRepresentation = LocalKeyRepresentation(), - ) -> LocalKeyRepresentation: - # TODO: Remove after switching to new cache keys, - # but put the db_name + target_connection.get_cache_key_part() parts somewhere - assert from_info.target_connection_ref is not None - target_connection = self._us_entry_buffer.get_entry(from_info.target_connection_ref) - assert isinstance(target_connection, ConnectionBase) - connection_id = target_connection.uuid - assert connection_id is not None - - local_key_rep = base_key - local_key_rep = local_key_rep.extend(part_type="query", part_content=str(compiled_query)) - local_key_rep = local_key_rep.extend(part_type="user_types", part_content=tuple(user_types or ())) - local_key_rep = local_key_rep.extend( - part_type="is_bleeding_edge_user", - part_content=is_bleeding_edge_user, - ) - - return local_key_rep - - def get_data_key( - self, - *, - query_res_info: QueryAndResultInfo, - from_info: Optional[PreparedFromInfo] = None, - base_key: LocalKeyRepresentation = LocalKeyRepresentation(), - ) -> Optional[LocalKeyRepresentation]: - # TODO: Remove after switching to new cache keys - compiled_query = self.get_query_str_for_cache( - query=query_res_info.query, - dialect=from_info.query_compiler.dialect, - ) - data_key: Optional[LocalKeyRepresentation] = self.make_data_select_cache_key( - base_key=base_key, - from_info=from_info, - compiled_query=compiled_query, - user_types=query_res_info.user_types, - is_bleeding_edge_user=self._is_bleeding_edge_user, - ) - return data_key - @attr.s class DashSQLCacheOptionsBuilder(CacheOptionsBuilderBase): diff --git a/lib/dl_core/dl_core/data_processing/processing/db_base/exec_adapter_base.py b/lib/dl_core/dl_core/data_processing/processing/db_base/exec_adapter_base.py index 68f37288f..eec4488e9 100644 --- a/lib/dl_core/dl_core/data_processing/processing/db_base/exec_adapter_base.py +++ b/lib/dl_core/dl_core/data_processing/processing/db_base/exec_adapter_base.py @@ -143,23 +143,6 @@ def _make_query_res_info( ) return query_res_info - def get_data_key( - self, - *, - query: str | Select, - user_types: Sequence[UserDataType], - from_info: Optional[PreparedFromInfo] = None, - base_key: LocalKeyRepresentation = LocalKeyRepresentation(), - ) -> Optional[LocalKeyRepresentation]: - # TODO: Remove this method - query_res_info = self._make_query_res_info(query=query, user_types=user_types) - data_key = self._cache_options_builder.get_data_key( - from_info=from_info, - query_res_info=query_res_info, - base_key=base_key, - ) - return data_key - async def create_table( self, *, diff --git a/lib/dl_core/dl_core/data_processing/processing/db_base/op_executors.py b/lib/dl_core/dl_core/data_processing/processing/db_base/op_executors.py index b11321c9e..b7cdb6016 100644 --- a/lib/dl_core/dl_core/data_processing/processing/db_base/op_executors.py +++ b/lib/dl_core/dl_core/data_processing/processing/db_base/op_executors.py @@ -173,32 +173,12 @@ def make_data_key(self, op: BaseOp) -> LocalKeyRepresentation: assert isinstance(op, CalcOp) source_stream = self.ctx.get_stream(op.source_stream_id) - # TODO: Remove legacy version - - # Legacy procedure - from_info = self.get_from_info_from_stream(source_stream=source_stream) - query_compiler = from_info.query_compiler - query = query_compiler.compile_select( - bi_query=op.bi_query, - # The info about the real source is already contained in the previous key parts, - # and, also, we want to avoid the randomized table names (in compeng) to appear in the key. - # So just use a fake table here. - sql_source=sa.table("table"), - ) - legacy_data_key = self.db_ex_adapter.get_data_key( - query=query, - user_types=source_stream.user_types, - from_info=from_info, - base_key=source_stream.data_key, - ) - # New procedure new_data_key = source_stream.data_key.extend("query", op.data_key_data) LOGGER.info( f"Preliminary cache key info for query: " - f"legacy key: {legacy_data_key.key_parts_hash} ; " - f"new key: {new_data_key.key_parts_hash}" + f"key: {new_data_key.key_parts_hash}" ) return new_data_key diff --git a/lib/dl_core/dl_core/data_processing/processing/source_db/processor.py b/lib/dl_core/dl_core/data_processing/processing/source_db/processor.py index c95221de4..fab30e007 100644 --- a/lib/dl_core/dl_core/data_processing/processing/source_db/processor.py +++ b/lib/dl_core/dl_core/data_processing/processing/source_db/processor.py @@ -23,14 +23,11 @@ class SourceDbOperationProcessor(ExecutorBasedOperationProcessor): _dataset: Dataset = attr.ib(kw_only=True) _row_count_hard_limit: Optional[int] = attr.ib(kw_only=True, default=None) _us_entry_buffer: USEntryBuffer = attr.ib(kw_only=True) - _is_bleeding_edge_user: bool = attr.ib(default=False) _default_cache_ttl_config: CacheTTLConfig = attr.ib(default=None) def _make_cache_options_builder(self) -> DatasetOptionsBuilder: return SelectorCacheOptionsBuilder( default_ttl_config=self._default_cache_ttl_config, - is_bleeding_edge_user=self._is_bleeding_edge_user, - us_entry_buffer=self._us_entry_buffer, ) def _make_db_ex_adapter(self) -> Optional[ProcessorDbExecAdapterBase]: