Skip to content

Commit

Permalink
Refactored cache key logic some more
Browse files Browse the repository at this point in the history
  • Loading branch information
altvod committed Nov 8, 2023
1 parent afcf4bb commit b6c7452
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 48 deletions.
48 changes: 5 additions & 43 deletions lib/dl_core/dl_core/data_processing/cache/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class CacheOptionsBuilderBase:
def get_actual_ttl_config(
self,
connection: ConnectionBase,
dataset: Optional[Dataset] = None,
) -> CacheTTLConfig:
ctc = self.default_ttl_config

Expand Down Expand Up @@ -90,19 +89,13 @@ def config_to_ttl_info(ttl_config: CacheTTLConfig) -> CacheTTLInfo:
refresh_ttl_on_read=False,
)

def get_cache_ttl_info(
self,
data_source_list: Collection[DataSource],
# For future use
dataset: Optional[Dataset] = None, # noqa
) -> CacheTTLInfo:
def get_cache_ttl_info(self, data_source_list: Collection[DataSource]) -> CacheTTLInfo:
# TODO FIX: Assert that there is no connection divergence or migrate to joint data source info
assert data_source_list, "Cannot generate cache options for empty source list"
actual_connection = next(iter({dsrc.connection for dsrc in data_source_list}))

ttl_config = self.get_actual_ttl_config(
connection=actual_connection,
dataset=dataset,
)
return self.config_to_ttl_info(ttl_config=ttl_config)

Expand All @@ -123,11 +116,7 @@ class DatasetOptionsBuilder(CacheOptionsBuilderBase):
def get_cache_options(
self,
joint_dsrc_info: PreparedFromInfo,
query: Select,
user_types: list[UserDataType],
dataset: Dataset,
data_key: LocalKeyRepresentation,
role: DataSourceRole = DataSourceRole.origin,
) -> BIQueryCacheOptions:
raise NotImplementedError

Expand All @@ -139,16 +128,9 @@ class CompengOptionsBuilder(DatasetOptionsBuilder): # TODO: Move to compeng pac
def get_cache_options(
self,
joint_dsrc_info: PreparedFromInfo,
query: Select,
user_types: list[UserDataType],
dataset: Dataset,
data_key: LocalKeyRepresentation,
role: DataSourceRole = DataSourceRole.origin,
) -> BIQueryCacheOptions:
ttl_info = self.get_cache_ttl_info(
data_source_list=joint_dsrc_info.data_source_list,
dataset=dataset,
)
ttl_info = self.get_cache_ttl_info(data_source_list=joint_dsrc_info.data_source_list)
return BIQueryCacheOptions(
cache_enabled=self.cache_enabled,
key=data_key,
Expand All @@ -170,35 +152,15 @@ def get_cache_enabled(self, joint_dsrc_info: PreparedFromInfo) -> bool:
def get_cache_options(
self,
joint_dsrc_info: PreparedFromInfo,
query: Select,
user_types: list[UserDataType],
dataset: Dataset,
data_key: LocalKeyRepresentation,
role: DataSourceRole = DataSourceRole.origin,
) -> BIQueryCacheOptions:
"""Returns cache key, TTL for new entries, refresh TTL flag"""

compiled_query = self.get_query_str_for_cache(
query=query,
dialect=joint_dsrc_info.query_compiler.dialect,
)
local_key_rep: Optional[LocalKeyRepresentation] = self.make_data_select_cache_key(
from_info=joint_dsrc_info,
compiled_query=compiled_query,
user_types=user_types,
is_bleeding_edge_user=self._is_bleeding_edge_user,
)
ttl_info = self.get_cache_ttl_info(
data_source_list=joint_dsrc_info.data_source_list,
dataset=dataset,
)
ttl_info = self.get_cache_ttl_info(data_source_list=joint_dsrc_info.data_source_list)
cache_enabled = self.get_cache_enabled(joint_dsrc_info=joint_dsrc_info)
if not cache_enabled:
local_key_rep = None

return BIQueryCacheOptions(
cache_enabled=cache_enabled,
key=local_key_rep,
key=data_key if not cache_enabled else None,
ttl_sec=ttl_info.ttl_sec,
refresh_ttl_on_read=ttl_info.refresh_ttl_on_read,
)
Expand Down Expand Up @@ -270,7 +232,7 @@ def get_cache_options(
data_key: LocalKeyRepresentation = LocalKeyRepresentation(),
) -> BIQueryCacheOptions:
cache_enabled = self.get_cache_enabled(conn=conn)
ttl_config = self.get_actual_ttl_config(connection=conn, dataset=None)
ttl_config = self.get_actual_ttl_config(connection=conn)
ttl_info = self.config_to_ttl_info(ttl_config)

local_key_rep: LocalKeyRepresentation = data_key.multi_extend(*conn.get_cache_key_part().key_parts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ async def _execute_and_fetch(
# Resolve TTL info and save BIQueryCacheOptions object
cache_options = self._cache_options_builder.get_cache_options(
joint_dsrc_info=joint_dsrc_info,
query=query,
user_types=list(user_types),
dataset=self._dataset,
data_key=data_key,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def get_data_key(
from_info: Optional[PreparedFromInfo] = None,
base_key: LocalKeyRepresentation = LocalKeyRepresentation(),
) -> Optional[LocalKeyRepresentation]:
# TODO: Remove this method
query_res_info = self._make_query_res_info(query=query, user_types=user_types)
data_key = self._cache_options_builder.get_data_key(
from_info=from_info,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,19 +179,33 @@ def get_from_info_from_stream(self, source_stream: AbstractStream) -> PreparedFr
def make_data_key(self, op: BaseOp) -> LocalKeyRepresentation:
assert isinstance(op, CalcOp)
source_stream = self.ctx.get_stream(op.source_stream_id)

# TODO: Remove legacy version

# Legacy procedure
from_info = self.get_from_info_from_stream(source_stream=source_stream)
query_compiler = from_info.query_compiler
query = query_compiler.compile_select(
bi_query=op.bi_query,
sql_source=from_info.sql_source,
)
data_key = self.db_ex_adapter.get_data_key(
legacy_data_key = self.db_ex_adapter.get_data_key(
query=query,
user_types=source_stream.user_types,
from_info=from_info,
base_key=source_stream.data_key,
)
return data_key

# New procedure
new_data_key = source_stream.data_key.extend('query', op.data_key_data)

LOGGER.info(
f'Preliminary cache key info for query: '
f'legacy key: {legacy_data_key.key_parts_hash} ; '
f'new key: {new_data_key.key_parts_hash}'
)

return legacy_data_key

@log_op # type: ignore # TODO: fix
async def execute(self, op: BaseOp) -> DataSourceVS: # type: ignore # TODO: fix
Expand Down

0 comments on commit b6c7452

Please sign in to comment.