diff --git a/lib/dl_core/dl_core/data_processing/cache/utils.py b/lib/dl_core/dl_core/data_processing/cache/utils.py index 3fbec2ddc..4fd13f436 100644 --- a/lib/dl_core/dl_core/data_processing/cache/utils.py +++ b/lib/dl_core/dl_core/data_processing/cache/utils.py @@ -50,7 +50,6 @@ class CacheOptionsBuilderBase: def get_actual_ttl_config( self, connection: ConnectionBase, - dataset: Optional[Dataset] = None, ) -> CacheTTLConfig: ctc = self.default_ttl_config @@ -90,19 +89,13 @@ def config_to_ttl_info(ttl_config: CacheTTLConfig) -> CacheTTLInfo: refresh_ttl_on_read=False, ) - def get_cache_ttl_info( - self, - data_source_list: Collection[DataSource], - # For future use - dataset: Optional[Dataset] = None, # noqa - ) -> CacheTTLInfo: + def get_cache_ttl_info(self, data_source_list: Collection[DataSource]) -> CacheTTLInfo: # TODO FIX: Assert that there is no connection divergence or migrate to joint data source info assert data_source_list, "Cannot generate cache options for empty source list" actual_connection = next(iter({dsrc.connection for dsrc in data_source_list})) ttl_config = self.get_actual_ttl_config( connection=actual_connection, - dataset=dataset, ) return self.config_to_ttl_info(ttl_config=ttl_config) @@ -123,11 +116,7 @@ class DatasetOptionsBuilder(CacheOptionsBuilderBase): def get_cache_options( self, joint_dsrc_info: PreparedFromInfo, - query: Select, - user_types: list[UserDataType], - dataset: Dataset, data_key: LocalKeyRepresentation, - role: DataSourceRole = DataSourceRole.origin, ) -> BIQueryCacheOptions: raise NotImplementedError @@ -139,16 +128,9 @@ class CompengOptionsBuilder(DatasetOptionsBuilder): # TODO: Move to compeng pac def get_cache_options( self, joint_dsrc_info: PreparedFromInfo, - query: Select, - user_types: list[UserDataType], - dataset: Dataset, data_key: LocalKeyRepresentation, - role: DataSourceRole = DataSourceRole.origin, ) -> BIQueryCacheOptions: - ttl_info = self.get_cache_ttl_info( - data_source_list=joint_dsrc_info.data_source_list, - dataset=dataset, - ) + ttl_info = self.get_cache_ttl_info(data_source_list=joint_dsrc_info.data_source_list) return BIQueryCacheOptions( cache_enabled=self.cache_enabled, key=data_key, @@ -170,35 +152,15 @@ def get_cache_enabled(self, joint_dsrc_info: PreparedFromInfo) -> bool: def get_cache_options( self, joint_dsrc_info: PreparedFromInfo, - query: Select, - user_types: list[UserDataType], - dataset: Dataset, data_key: LocalKeyRepresentation, - role: DataSourceRole = DataSourceRole.origin, ) -> BIQueryCacheOptions: """Returns cache key, TTL for new entries, refresh TTL flag""" - compiled_query = self.get_query_str_for_cache( - query=query, - dialect=joint_dsrc_info.query_compiler.dialect, - ) - local_key_rep: Optional[LocalKeyRepresentation] = self.make_data_select_cache_key( - from_info=joint_dsrc_info, - compiled_query=compiled_query, - user_types=user_types, - is_bleeding_edge_user=self._is_bleeding_edge_user, - ) - ttl_info = self.get_cache_ttl_info( - data_source_list=joint_dsrc_info.data_source_list, - dataset=dataset, - ) + ttl_info = self.get_cache_ttl_info(data_source_list=joint_dsrc_info.data_source_list) cache_enabled = self.get_cache_enabled(joint_dsrc_info=joint_dsrc_info) - if not cache_enabled: - local_key_rep = None - return BIQueryCacheOptions( cache_enabled=cache_enabled, - key=local_key_rep, + key=data_key if not cache_enabled else None, ttl_sec=ttl_info.ttl_sec, refresh_ttl_on_read=ttl_info.refresh_ttl_on_read, ) @@ -270,7 +232,7 @@ def get_cache_options( data_key: LocalKeyRepresentation = LocalKeyRepresentation(), ) -> BIQueryCacheOptions: cache_enabled = self.get_cache_enabled(conn=conn) - ttl_config = self.get_actual_ttl_config(connection=conn, dataset=None) + ttl_config = self.get_actual_ttl_config(connection=conn) ttl_info = self.config_to_ttl_info(ttl_config) local_key_rep: LocalKeyRepresentation = data_key.multi_extend(*conn.get_cache_key_part().key_parts) diff --git a/lib/dl_core/dl_core/data_processing/processing/cache/exec_adapter.py b/lib/dl_core/dl_core/data_processing/processing/cache/exec_adapter.py index 2c6b7ff2e..a29647ecf 100644 --- a/lib/dl_core/dl_core/data_processing/processing/cache/exec_adapter.py +++ b/lib/dl_core/dl_core/data_processing/processing/cache/exec_adapter.py @@ -71,9 +71,6 @@ async def _execute_and_fetch( # Resolve TTL info and save BIQueryCacheOptions object cache_options = self._cache_options_builder.get_cache_options( joint_dsrc_info=joint_dsrc_info, - query=query, - user_types=list(user_types), - dataset=self._dataset, data_key=data_key, ) diff --git a/lib/dl_core/dl_core/data_processing/processing/db_base/exec_adapter_base.py b/lib/dl_core/dl_core/data_processing/processing/db_base/exec_adapter_base.py index 5b387c74c..68f37288f 100644 --- a/lib/dl_core/dl_core/data_processing/processing/db_base/exec_adapter_base.py +++ b/lib/dl_core/dl_core/data_processing/processing/db_base/exec_adapter_base.py @@ -151,6 +151,7 @@ def get_data_key( from_info: Optional[PreparedFromInfo] = None, base_key: LocalKeyRepresentation = LocalKeyRepresentation(), ) -> Optional[LocalKeyRepresentation]: + # TODO: Remove this method query_res_info = self._make_query_res_info(query=query, user_types=user_types) data_key = self._cache_options_builder.get_data_key( from_info=from_info, diff --git a/lib/dl_core/dl_core/data_processing/processing/db_base/op_executors.py b/lib/dl_core/dl_core/data_processing/processing/db_base/op_executors.py index a83f73d8a..dabab35c3 100644 --- a/lib/dl_core/dl_core/data_processing/processing/db_base/op_executors.py +++ b/lib/dl_core/dl_core/data_processing/processing/db_base/op_executors.py @@ -179,19 +179,33 @@ def get_from_info_from_stream(self, source_stream: AbstractStream) -> PreparedFr def make_data_key(self, op: BaseOp) -> LocalKeyRepresentation: assert isinstance(op, CalcOp) source_stream = self.ctx.get_stream(op.source_stream_id) + + # TODO: Remove legacy version + + # Legacy procedure from_info = self.get_from_info_from_stream(source_stream=source_stream) query_compiler = from_info.query_compiler query = query_compiler.compile_select( bi_query=op.bi_query, sql_source=from_info.sql_source, ) - data_key = self.db_ex_adapter.get_data_key( + legacy_data_key = self.db_ex_adapter.get_data_key( query=query, user_types=source_stream.user_types, from_info=from_info, base_key=source_stream.data_key, ) - return data_key + + # New procedure + new_data_key = source_stream.data_key.extend("query", op.data_key_data) + + LOGGER.info( + f"Preliminary cache key info for query: " + f"legacy key: {legacy_data_key.key_parts_hash} ; " + f"new key: {new_data_key.key_parts_hash}" + ) + + return legacy_data_key @log_op # type: ignore # TODO: fix async def execute(self, op: BaseOp) -> DataSourceVS: # type: ignore # TODO: fix diff --git a/lib/dl_core/dl_core/data_processing/selectors/db.py b/lib/dl_core/dl_core/data_processing/selectors/db.py index 56753cb36..93b714de7 100644 --- a/lib/dl_core/dl_core/data_processing/selectors/db.py +++ b/lib/dl_core/dl_core/data_processing/selectors/db.py @@ -29,6 +29,7 @@ @attr.s class DatasetDbDataSelectorAsync(DatasetDataSelectorAsyncBase): """Async selector that fetches data from the database""" + # TODO: Merge all selector logic into data processors _active_queries: list[BIQueryExecutionContext] = attr.ib(init=False, factory=list) diff --git a/lib/dl_core/dl_core/services_registry/selector_factory.py b/lib/dl_core/dl_core/services_registry/selector_factory.py index db3c8493e..14849cd54 100644 --- a/lib/dl_core/dl_core/services_registry/selector_factory.py +++ b/lib/dl_core/dl_core/services_registry/selector_factory.py @@ -103,9 +103,9 @@ def _create_dataset_selector( allow_cache_usage: bool = True, ) -> DatasetDataSelectorAsyncBase: return DatasetDbDataSelectorAsync( - dataset=dataset, - service_registry=self.services_registry, - # allow_cache_usage=allow_cache_usage, - # is_bleeding_edge_user=self._is_bleeding_edge_user, - us_entry_buffer=us_entry_buffer, - ) + dataset=dataset, + service_registry=self.services_registry, + # allow_cache_usage=allow_cache_usage, + # is_bleeding_edge_user=self._is_bleeding_edge_user, + us_entry_buffer=us_entry_buffer, + )