Skip to content

Commit

Permalink
Refactored cache key logic some more
Browse files Browse the repository at this point in the history
  • Loading branch information
altvod committed Nov 8, 2023
1 parent afcf4bb commit 7c64a82
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 54 deletions.
48 changes: 5 additions & 43 deletions lib/dl_core/dl_core/data_processing/cache/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class CacheOptionsBuilderBase:
def get_actual_ttl_config(
self,
connection: ConnectionBase,
dataset: Optional[Dataset] = None,
) -> CacheTTLConfig:
ctc = self.default_ttl_config

Expand Down Expand Up @@ -90,19 +89,13 @@ def config_to_ttl_info(ttl_config: CacheTTLConfig) -> CacheTTLInfo:
refresh_ttl_on_read=False,
)

def get_cache_ttl_info(
self,
data_source_list: Collection[DataSource],
# For future use
dataset: Optional[Dataset] = None, # noqa
) -> CacheTTLInfo:
def get_cache_ttl_info(self, data_source_list: Collection[DataSource]) -> CacheTTLInfo:
# TODO FIX: Assert that there is no connection divergence or migrate to joint data source info
assert data_source_list, "Cannot generate cache options for empty source list"
actual_connection = next(iter({dsrc.connection for dsrc in data_source_list}))

ttl_config = self.get_actual_ttl_config(
connection=actual_connection,
dataset=dataset,
)
return self.config_to_ttl_info(ttl_config=ttl_config)

Expand All @@ -123,11 +116,7 @@ class DatasetOptionsBuilder(CacheOptionsBuilderBase):
def get_cache_options(
self,
joint_dsrc_info: PreparedFromInfo,
query: Select,
user_types: list[UserDataType],
dataset: Dataset,
data_key: LocalKeyRepresentation,
role: DataSourceRole = DataSourceRole.origin,
) -> BIQueryCacheOptions:
raise NotImplementedError

Expand All @@ -139,16 +128,9 @@ class CompengOptionsBuilder(DatasetOptionsBuilder): # TODO: Move to compeng pac
def get_cache_options(
self,
joint_dsrc_info: PreparedFromInfo,
query: Select,
user_types: list[UserDataType],
dataset: Dataset,
data_key: LocalKeyRepresentation,
role: DataSourceRole = DataSourceRole.origin,
) -> BIQueryCacheOptions:
ttl_info = self.get_cache_ttl_info(
data_source_list=joint_dsrc_info.data_source_list,
dataset=dataset,
)
ttl_info = self.get_cache_ttl_info(data_source_list=joint_dsrc_info.data_source_list)
return BIQueryCacheOptions(
cache_enabled=self.cache_enabled,
key=data_key,
Expand All @@ -170,35 +152,15 @@ def get_cache_enabled(self, joint_dsrc_info: PreparedFromInfo) -> bool:
def get_cache_options(
self,
joint_dsrc_info: PreparedFromInfo,
query: Select,
user_types: list[UserDataType],
dataset: Dataset,
data_key: LocalKeyRepresentation,
role: DataSourceRole = DataSourceRole.origin,
) -> BIQueryCacheOptions:
"""Returns cache key, TTL for new entries, refresh TTL flag"""

compiled_query = self.get_query_str_for_cache(
query=query,
dialect=joint_dsrc_info.query_compiler.dialect,
)
local_key_rep: Optional[LocalKeyRepresentation] = self.make_data_select_cache_key(
from_info=joint_dsrc_info,
compiled_query=compiled_query,
user_types=user_types,
is_bleeding_edge_user=self._is_bleeding_edge_user,
)
ttl_info = self.get_cache_ttl_info(
data_source_list=joint_dsrc_info.data_source_list,
dataset=dataset,
)
ttl_info = self.get_cache_ttl_info(data_source_list=joint_dsrc_info.data_source_list)
cache_enabled = self.get_cache_enabled(joint_dsrc_info=joint_dsrc_info)
if not cache_enabled:
local_key_rep = None

return BIQueryCacheOptions(
cache_enabled=cache_enabled,
key=local_key_rep,
key=data_key if not cache_enabled else None,
ttl_sec=ttl_info.ttl_sec,
refresh_ttl_on_read=ttl_info.refresh_ttl_on_read,
)
Expand Down Expand Up @@ -270,7 +232,7 @@ def get_cache_options(
data_key: LocalKeyRepresentation = LocalKeyRepresentation(),
) -> BIQueryCacheOptions:
cache_enabled = self.get_cache_enabled(conn=conn)
ttl_config = self.get_actual_ttl_config(connection=conn, dataset=None)
ttl_config = self.get_actual_ttl_config(connection=conn)
ttl_info = self.config_to_ttl_info(ttl_config)

local_key_rep: LocalKeyRepresentation = data_key.multi_extend(*conn.get_cache_key_part().key_parts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ async def _execute_and_fetch(
# Resolve TTL info and save BIQueryCacheOptions object
cache_options = self._cache_options_builder.get_cache_options(
joint_dsrc_info=joint_dsrc_info,
query=query,
user_types=list(user_types),
dataset=self._dataset,
data_key=data_key,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def get_data_key(
from_info: Optional[PreparedFromInfo] = None,
base_key: LocalKeyRepresentation = LocalKeyRepresentation(),
) -> Optional[LocalKeyRepresentation]:
# TODO: Remove this method
query_res_info = self._make_query_res_info(query=query, user_types=user_types)
data_key = self._cache_options_builder.get_data_key(
from_info=from_info,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,19 +179,33 @@ def get_from_info_from_stream(self, source_stream: AbstractStream) -> PreparedFr
def make_data_key(self, op: BaseOp) -> LocalKeyRepresentation:
assert isinstance(op, CalcOp)
source_stream = self.ctx.get_stream(op.source_stream_id)

# TODO: Remove legacy version

# Legacy procedure
from_info = self.get_from_info_from_stream(source_stream=source_stream)
query_compiler = from_info.query_compiler
query = query_compiler.compile_select(
bi_query=op.bi_query,
sql_source=from_info.sql_source,
)
data_key = self.db_ex_adapter.get_data_key(
legacy_data_key = self.db_ex_adapter.get_data_key(
query=query,
user_types=source_stream.user_types,
from_info=from_info,
base_key=source_stream.data_key,
)
return data_key

# New procedure
new_data_key = source_stream.data_key.extend("query", op.data_key_data)

LOGGER.info(
f"Preliminary cache key info for query: "
f"legacy key: {legacy_data_key.key_parts_hash} ; "
f"new key: {new_data_key.key_parts_hash}"
)

return legacy_data_key

@log_op # type: ignore # TODO: fix
async def execute(self, op: BaseOp) -> DataSourceVS: # type: ignore # TODO: fix
Expand Down
1 change: 1 addition & 0 deletions lib/dl_core/dl_core/data_processing/selectors/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
@attr.s
class DatasetDbDataSelectorAsync(DatasetDataSelectorAsyncBase):
"""Async selector that fetches data from the database"""

# TODO: Merge all selector logic into data processors

_active_queries: list[BIQueryExecutionContext] = attr.ib(init=False, factory=list)
Expand Down
12 changes: 6 additions & 6 deletions lib/dl_core/dl_core/services_registry/selector_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ def _create_dataset_selector(
allow_cache_usage: bool = True,
) -> DatasetDataSelectorAsyncBase:
return DatasetDbDataSelectorAsync(
dataset=dataset,
service_registry=self.services_registry,
# allow_cache_usage=allow_cache_usage,
# is_bleeding_edge_user=self._is_bleeding_edge_user,
us_entry_buffer=us_entry_buffer,
)
dataset=dataset,
service_registry=self.services_registry,
# allow_cache_usage=allow_cache_usage,
# is_bleeding_edge_user=self._is_bleeding_edge_user,
us_entry_buffer=us_entry_buffer,
)

0 comments on commit 7c64a82

Please sign in to comment.