Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Removed some of the old cache logic #250

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 15 additions & 136 deletions lib/dl_core/dl_core/data_processing/cache/utils.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,37 @@
from __future__ import annotations

import abc
import logging
from typing import (
TYPE_CHECKING,
Collection,
Optional,
)

import attr
from sqlalchemy.exc import DatabaseError

from dl_cache_engine.exc import CachePreparationFailed
from dl_cache_engine.primitives import (
BIQueryCacheOptions,
CacheTTLConfig,
CacheTTLInfo,
DataKeyPart,
LocalKeyRepresentation,
)
from dl_core.query.bi_query import QueryAndResultInfo
from dl_core.us_connection_base import ConnectionBase
from dl_model_tools.serialization import hashable_dumps


if TYPE_CHECKING:
from sqlalchemy.engine.default import DefaultDialect
from sqlalchemy.sql import Select

from dl_constants.enums import UserDataType
from dl_constants.types import TJSONExt
from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo
from dl_core.data_source.base import DataSource
from dl_core.us_manager.local_cache import USEntryBuffer


LOGGER = logging.getLogger(__name__)


@attr.s
class CacheOptionsBuilderBase:
class CacheOptionsBuilderBase(abc.ABC):
default_ttl_config: CacheTTLConfig = attr.ib(factory=CacheTTLConfig)
_is_bleeding_edge_user: bool = attr.ib(default=False)

def get_actual_ttl_config(
self,
Expand All @@ -54,28 +45,6 @@ def get_actual_ttl_config(

return ctc

@staticmethod
def get_query_str_for_cache(query: Select, dialect: DefaultDialect) -> str:
try:
compiled_query = query.compile(dialect=dialect)
except DatabaseError as err:
raise CachePreparationFailed from err

if isinstance(compiled_query.params, dict):
ordered_params = sorted(
compiled_query.params.items(),
key=lambda item: item[0],
)
else:
ordered_params = compiled_query.params

return ";".join(
(
str(compiled_query),
str(ordered_params),
)
)

@staticmethod
def config_to_ttl_info(ttl_config: CacheTTLConfig) -> CacheTTLInfo:
return CacheTTLInfo(
Expand All @@ -93,133 +62,43 @@ def get_cache_ttl_info(self, data_source_list: Collection[DataSource]) -> CacheT
)
return self.config_to_ttl_info(ttl_config=ttl_config)

def get_data_key(
self,
*,
query_res_info: QueryAndResultInfo,
from_info: Optional[PreparedFromInfo] = None,
base_key: LocalKeyRepresentation = LocalKeyRepresentation(), # noqa: B008
) -> Optional[LocalKeyRepresentation]:
return base_key


@attr.s
class DatasetOptionsBuilder(CacheOptionsBuilderBase):
cache_enabled: bool = attr.ib(kw_only=True, default=True)

def get_cache_options(
self,
joint_dsrc_info: PreparedFromInfo,
data_key: LocalKeyRepresentation,
) -> BIQueryCacheOptions:
@abc.abstractmethod
def get_cache_enabled(self, joint_dsrc_info: PreparedFromInfo) -> bool:
raise NotImplementedError


@attr.s
class CompengOptionsBuilder(DatasetOptionsBuilder): # TODO: Move to compeng package
cache_enabled: bool = attr.ib(kw_only=True, default=True)

def get_cache_options(
self,
joint_dsrc_info: PreparedFromInfo,
data_key: LocalKeyRepresentation,
) -> BIQueryCacheOptions:
ttl_info = self.get_cache_ttl_info(data_source_list=joint_dsrc_info.data_source_list) # type: ignore # 2024-01-24 # TODO: Argument "data_source_list" to "get_cache_ttl_info" of "CacheOptionsBuilderBase" has incompatible type "tuple[DataSource, ...] | None"; expected "Collection[DataSource]" [arg-type]
cache_enabled = self.get_cache_enabled(joint_dsrc_info=joint_dsrc_info)
return BIQueryCacheOptions(
cache_enabled=self.cache_enabled,
key=data_key,
cache_enabled=cache_enabled,
key=data_key if cache_enabled else None,
ttl_sec=ttl_info.ttl_sec,
refresh_ttl_on_read=ttl_info.refresh_ttl_on_read,
)

def get_data_key(
self,
*,
query_res_info: QueryAndResultInfo,
from_info: Optional[PreparedFromInfo] = None,
base_key: LocalKeyRepresentation = LocalKeyRepresentation(), # noqa: B008
) -> Optional[LocalKeyRepresentation]:
# TODO: Remove after switching to new cache keys
compiled_query = self.get_query_str_for_cache(
query=query_res_info.query,
dialect=from_info.query_compiler.dialect, # type: ignore # 2024-01-24 # TODO: Item "None" of "PreparedFromInfo | None" has no attribute "query_compiler" [union-attr]
)
return base_key.extend(part_type="query", part_content=compiled_query)


@attr.s
class SelectorCacheOptionsBuilder(DatasetOptionsBuilder):
_is_bleeding_edge_user: bool = attr.ib(default=False)
_us_entry_buffer: USEntryBuffer = attr.ib(kw_only=True)
class CompengOptionsBuilder(DatasetOptionsBuilder): # TODO: Move to compeng package
cache_enabled: bool = attr.ib(kw_only=True, default=True)

def get_cache_enabled(self, joint_dsrc_info: PreparedFromInfo) -> bool:
return self.cache_enabled


@attr.s
class SelectorCacheOptionsBuilder(DatasetOptionsBuilder): # TODO: Rename to SourceDbCacheOptionsBuilder
def get_cache_enabled(self, joint_dsrc_info: PreparedFromInfo) -> bool:
assert joint_dsrc_info.data_source_list is not None
cache_enabled = all(dsrc.cache_enabled for dsrc in joint_dsrc_info.data_source_list)
return cache_enabled

def get_cache_options(
self,
joint_dsrc_info: PreparedFromInfo,
data_key: LocalKeyRepresentation,
) -> BIQueryCacheOptions:
"""Returns cache key, TTL for new entries, refresh TTL flag"""

ttl_info = self.get_cache_ttl_info(data_source_list=joint_dsrc_info.data_source_list) # type: ignore # 2024-01-24 # TODO: Argument "data_source_list" to "get_cache_ttl_info" of "CacheOptionsBuilderBase" has incompatible type "tuple[DataSource, ...] | None"; expected "Collection[DataSource]" [arg-type]
cache_enabled = self.get_cache_enabled(joint_dsrc_info=joint_dsrc_info)
return BIQueryCacheOptions(
cache_enabled=cache_enabled,
key=data_key if cache_enabled else None,
ttl_sec=ttl_info.ttl_sec,
refresh_ttl_on_read=ttl_info.refresh_ttl_on_read,
)

def make_data_select_cache_key(
self,
from_info: PreparedFromInfo,
compiled_query: str,
user_types: list[UserDataType],
is_bleeding_edge_user: bool,
base_key: LocalKeyRepresentation = LocalKeyRepresentation(), # noqa: B008
) -> LocalKeyRepresentation:
# TODO: Remove after switching to new cache keys,
# but put the db_name + target_connection.get_cache_key_part() parts somewhere
assert from_info.target_connection_ref is not None
target_connection = self._us_entry_buffer.get_entry(from_info.target_connection_ref)
assert isinstance(target_connection, ConnectionBase)
connection_id = target_connection.uuid
assert connection_id is not None

local_key_rep = base_key
local_key_rep = local_key_rep.extend(part_type="query", part_content=str(compiled_query))
local_key_rep = local_key_rep.extend(part_type="user_types", part_content=tuple(user_types or ()))
local_key_rep = local_key_rep.extend(
part_type="is_bleeding_edge_user",
part_content=is_bleeding_edge_user,
)

return local_key_rep

def get_data_key(
self,
*,
query_res_info: QueryAndResultInfo,
from_info: Optional[PreparedFromInfo] = None,
base_key: LocalKeyRepresentation = LocalKeyRepresentation(), # noqa: B008
) -> Optional[LocalKeyRepresentation]:
# TODO: Remove after switching to new cache keys
compiled_query = self.get_query_str_for_cache(
query=query_res_info.query,
dialect=from_info.query_compiler.dialect, # type: ignore # 2024-01-24 # TODO: Item "None" of "PreparedFromInfo | None" has no attribute "query_compiler" [union-attr]
)
data_key: Optional[LocalKeyRepresentation] = self.make_data_select_cache_key(
base_key=base_key,
from_info=from_info, # type: ignore # 2024-01-24 # TODO: Argument "from_info" to "make_data_select_cache_key" of "SelectorCacheOptionsBuilder" has incompatible type "PreparedFromInfo | None"; expected "PreparedFromInfo" [arg-type]
compiled_query=compiled_query,
user_types=query_res_info.user_types,
is_bleeding_edge_user=self._is_bleeding_edge_user,
)
return data_key


@attr.s
class DashSQLCacheOptionsBuilder(CacheOptionsBuilderBase):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,23 +138,6 @@ def _make_query_res_info(
)
return query_res_info

def get_data_key(
self,
*,
query: str | Select,
user_types: Sequence[UserDataType],
from_info: Optional[PreparedFromInfo] = None,
base_key: LocalKeyRepresentation = LocalKeyRepresentation(), # noqa: B008
) -> Optional[LocalKeyRepresentation]:
# TODO: Remove this method
query_res_info = self._make_query_res_info(query=query, user_types=user_types)
data_key = self._cache_options_builder.get_data_key(
from_info=from_info,
query_res_info=query_res_info,
base_key=base_key,
)
return data_key

async def create_table(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,36 +176,12 @@ def get_from_info_from_stream(self, source_stream: AbstractStream) -> PreparedFr
def make_data_key(self, op: BaseOp) -> LocalKeyRepresentation:
assert isinstance(op, CalcOp)
source_stream = self.ctx.get_stream(op.source_stream_id)

# TODO: Remove legacy version

# Legacy procedure
from_info = self.get_from_info_from_stream(source_stream=source_stream) # type: ignore # 2024-01-24 # TODO: Argument "source_stream" to "get_from_info_from_stream" of "CalcOpExecutorAsync" has incompatible type "AbstractStream | None"; expected "AbstractStream" [arg-type]
query_compiler = from_info.query_compiler
query = query_compiler.compile_select(
bi_query=op.bi_query,
# The info about the real source is already contained in the previous key parts,
# and, also, we want to avoid the randomized table names (in compeng) to appear in the key.
# So just use a fake table here.
sql_source=sa.table("table"),
)
legacy_data_key = self.db_ex_adapter.get_data_key(
query=query,
user_types=source_stream.user_types, # type: ignore # 2024-01-24 # TODO: Item "None" of "AbstractStream | None" has no attribute "user_types" [union-attr]
from_info=from_info,
base_key=source_stream.data_key, # type: ignore # 2024-01-24 # TODO: Item "None" of "AbstractStream | None" has no attribute "data_key" [union-attr]
)

# New procedure
new_data_key = source_stream.data_key.extend("query", op.data_key_data) # type: ignore # 2024-01-24 # TODO: Item "None" of "AbstractStream | None" has no attribute "data_key" [union-attr]

LOGGER.info(
f"Preliminary cache key info for query: "
f"legacy key: {legacy_data_key.key_parts_hash} ; " # type: ignore # 2024-01-24 # TODO: Item "None" of "LocalKeyRepresentation | None" has no attribute "key_parts_hash" [union-attr]
f"new key: {new_data_key.key_parts_hash}"
)

return new_data_key
assert source_stream is not None
base_data_key = source_stream.data_key
assert base_data_key is not None
data_key = base_data_key.extend("query", op.data_key_data)
LOGGER.info(f"Preliminary cache key info for query: " f"key: {data_key.key_parts_hash}")
return data_key

@log_op # type: ignore # TODO: fix
async def execute(self, op: BaseOp) -> DataSourceVS: # type: ignore # TODO: fix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,13 @@ class SourceDbOperationProcessor(ExecutorBasedOperationProcessor):
_dataset: Dataset = attr.ib(kw_only=True)
_row_count_hard_limit: Optional[int] = attr.ib(kw_only=True, default=None)
_us_entry_buffer: USEntryBuffer = attr.ib(kw_only=True)
_is_bleeding_edge_user: bool = attr.ib(default=False)
_default_cache_ttl_config: CacheTTLConfig = attr.ib(default=None)
_ce_factory: ConnExecutorFactory = attr.ib(kw_only=True)
_rci: RequestContextInfo = attr.ib(kw_only=True)

def _make_cache_options_builder(self) -> DatasetOptionsBuilder:
return SelectorCacheOptionsBuilder(
default_ttl_config=self._default_cache_ttl_config,
is_bleeding_edge_user=self._is_bleeding_edge_user,
us_entry_buffer=self._us_entry_buffer,
)

def _make_db_ex_adapter(self) -> Optional[ProcessorDbExecAdapterBase]:
Expand Down
Loading