diff --git a/lib/dl_compeng_pg/dl_compeng_pg/compeng_aiopg/data_processor_service_aiopg.py b/lib/dl_compeng_pg/dl_compeng_pg/compeng_aiopg/data_processor_service_aiopg.py index 53b93627a..e65cce7d9 100644 --- a/lib/dl_compeng_pg/dl_compeng_pg/compeng_aiopg/data_processor_service_aiopg.py +++ b/lib/dl_compeng_pg/dl_compeng_pg/compeng_aiopg/data_processor_service_aiopg.py @@ -9,6 +9,7 @@ from dl_compeng_pg.compeng_pg_base.data_processor_service_pg import CompEngPgService from dl_compeng_pg.compeng_pg_base.pool_base import BasePgPoolWrapper from dl_core.data_processing.processing.processor import OperationProcessorAsyncBase +from dl_core.services_registry.top_level import ServicesRegistry @attr.s @@ -16,5 +17,13 @@ class AiopgCompEngService(CompEngPgService[AiopgPoolWrapper]): def _get_pool_wrapper_cls(self) -> Type[BasePgPoolWrapper]: return AiopgPoolWrapper - def get_data_processor(self) -> OperationProcessorAsyncBase: - return AiopgOperationProcessor(pg_pool=self.pool) + def get_data_processor( + self, + service_registry: ServicesRegistry, + reporting_enabled: bool, + ) -> OperationProcessorAsyncBase: + return AiopgOperationProcessor( + service_registry=service_registry, + pg_pool=self.pool, + reporting_enabled=reporting_enabled, + ) diff --git a/lib/dl_compeng_pg/dl_compeng_pg/compeng_aiopg/exec_adapter_aiopg.py b/lib/dl_compeng_pg/dl_compeng_pg/compeng_aiopg/exec_adapter_aiopg.py index cb283c37b..cce6d4484 100644 --- a/lib/dl_compeng_pg/dl_compeng_pg/compeng_aiopg/exec_adapter_aiopg.py +++ b/lib/dl_compeng_pg/dl_compeng_pg/compeng_aiopg/exec_adapter_aiopg.py @@ -4,6 +4,8 @@ from typing import ( TYPE_CHECKING, AsyncGenerator, + Awaitable, + Callable, ClassVar, Optional, Sequence, @@ -17,7 +19,9 @@ from dl_compeng_pg.compeng_pg_base.exec_adapter_base import PostgreSQLExecAdapterAsync from dl_constants.enums import UserDataType -from dl_core.data_processing.prepared_components.primitives import PreparedMultiFromInfo +from dl_core.data_processing.cache.primitives import LocalKeyRepresentation +from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo +from dl_core.data_processing.processing.context import OpExecutionContext from dl_core.data_processing.streaming import ( AsyncChunked, AsyncChunkedBase, @@ -49,12 +53,18 @@ async def _execute_ddl(self, query: Union[str, Executable]) -> None: async def _execute_and_fetch( self, *, - query: Union[Select, str], + query: Select | str, user_types: Sequence[UserDataType], chunk_size: int, - joint_dsrc_info: Optional[PreparedMultiFromInfo] = None, + joint_dsrc_info: Optional[PreparedFromInfo] = None, query_id: str, + ctx: OpExecutionContext, + data_key: LocalKeyRepresentation, + preparation_callback: Optional[Callable[[], Awaitable[None]]], ) -> AsyncChunkedBase[Sequence[TBIDataValue]]: + if preparation_callback is not None: + await preparation_callback() + async def chunked_data_gen() -> AsyncGenerator[list[list[TBIDataValue]], None]: """Fetch data in chunks""" diff --git a/lib/dl_compeng_pg/dl_compeng_pg/compeng_aiopg/processor_aiopg.py b/lib/dl_compeng_pg/dl_compeng_pg/compeng_aiopg/processor_aiopg.py index 5b687d907..3452a3b13 100644 --- a/lib/dl_compeng_pg/dl_compeng_pg/compeng_aiopg/processor_aiopg.py +++ b/lib/dl_compeng_pg/dl_compeng_pg/compeng_aiopg/processor_aiopg.py @@ -9,13 +9,18 @@ @attr.s -class AiopgOperationProcessor(PostgreSQLOperationProcessor[AiopgExecAdapter, AiopgPoolWrapper, aiopg.sa.SAConnection]): +class AiopgOperationProcessor(PostgreSQLOperationProcessor[AiopgPoolWrapper, aiopg.sa.SAConnection]): async def start(self) -> None: self._pg_conn = await self._pg_pool.pool.acquire() - self._pgex_adapter = AiopgExecAdapter(conn=self._pg_conn) # type: ignore # TODO: fix + self._db_ex_adapter = AiopgExecAdapter( + service_registry=self.service_registry, + reporting_enabled=self._reporting_enabled, + conn=self._pg_conn, + cache_options_builder=self._cache_options_builder, + ) # type: ignore # TODO: fix async def end(self) -> None: - self._pgex_adapter = None + self._db_ex_adapter = None if self._pg_conn is not None: await self._pg_conn.close() self._pg_conn = None diff --git a/lib/dl_compeng_pg/dl_compeng_pg/compeng_asyncpg/data_processor_service_asyncpg.py b/lib/dl_compeng_pg/dl_compeng_pg/compeng_asyncpg/data_processor_service_asyncpg.py index bc70240b1..a6f103151 100644 --- a/lib/dl_compeng_pg/dl_compeng_pg/compeng_asyncpg/data_processor_service_asyncpg.py +++ b/lib/dl_compeng_pg/dl_compeng_pg/compeng_asyncpg/data_processor_service_asyncpg.py @@ -9,6 +9,7 @@ from dl_compeng_pg.compeng_pg_base.data_processor_service_pg import CompEngPgService from dl_compeng_pg.compeng_pg_base.pool_base import BasePgPoolWrapper from dl_core.data_processing.processing.processor import OperationProcessorAsyncBase +from dl_core.services_registry.top_level import ServicesRegistry @attr.s @@ -16,5 +17,13 @@ class AsyncpgCompEngService(CompEngPgService[AsyncpgPoolWrapper]): def _get_pool_wrapper_cls(self) -> Type[BasePgPoolWrapper]: return AsyncpgPoolWrapper - def get_data_processor(self) -> OperationProcessorAsyncBase: - return AsyncpgOperationProcessor(pg_pool=self.pool) + def get_data_processor( + self, + service_registry: ServicesRegistry, + reporting_enabled: bool, + ) -> OperationProcessorAsyncBase: + return AsyncpgOperationProcessor( + service_registry=service_registry, + pg_pool=self.pool, + reporting_enabled=reporting_enabled, + ) diff --git a/lib/dl_compeng_pg/dl_compeng_pg/compeng_asyncpg/exec_adapter_asyncpg.py b/lib/dl_compeng_pg/dl_compeng_pg/compeng_asyncpg/exec_adapter_asyncpg.py index 572b68eca..c573c5389 100644 --- a/lib/dl_compeng_pg/dl_compeng_pg/compeng_asyncpg/exec_adapter_asyncpg.py +++ b/lib/dl_compeng_pg/dl_compeng_pg/compeng_asyncpg/exec_adapter_asyncpg.py @@ -5,6 +5,8 @@ from typing import ( TYPE_CHECKING, AsyncGenerator, + Awaitable, + Callable, ClassVar, Generator, Optional, @@ -20,7 +22,9 @@ from dl_compeng_pg.compeng_pg_base.exec_adapter_base import PostgreSQLExecAdapterAsync from dl_constants.enums import UserDataType from dl_core.connectors.base.error_transformer import DbErrorTransformer -from dl_core.data_processing.prepared_components.primitives import PreparedMultiFromInfo +from dl_core.data_processing.cache.primitives import LocalKeyRepresentation +from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo +from dl_core.data_processing.processing.context import OpExecutionContext from dl_core.data_processing.streaming import ( AsyncChunked, AsyncChunkedBase, @@ -89,11 +93,17 @@ async def _execute_and_fetch( # type: ignore # TODO: fix query: Union[str, sa.sql.selectable.Select], user_types: Sequence[UserDataType], chunk_size: int, - joint_dsrc_info: Optional[PreparedMultiFromInfo] = None, + joint_dsrc_info: Optional[PreparedFromInfo] = None, query_id: str, + ctx: OpExecutionContext, + data_key: LocalKeyRepresentation, + preparation_callback: Optional[Callable[[], Awaitable[None]]], ) -> AsyncChunked[list[TBIDataValue]]: query_text, params = self._compile_query(query) + if preparation_callback is not None: + await preparation_callback() + async def chunked_data_gen() -> AsyncGenerator[list[list[TBIDataValue]], None]: """Fetch data in chunks""" diff --git a/lib/dl_compeng_pg/dl_compeng_pg/compeng_asyncpg/processor_asyncpg.py b/lib/dl_compeng_pg/dl_compeng_pg/compeng_asyncpg/processor_asyncpg.py index d5e5ea982..5acd4b656 100644 --- a/lib/dl_compeng_pg/dl_compeng_pg/compeng_asyncpg/processor_asyncpg.py +++ b/lib/dl_compeng_pg/dl_compeng_pg/compeng_asyncpg/processor_asyncpg.py @@ -12,9 +12,7 @@ @attr.s -class AsyncpgOperationProcessor( - PostgreSQLOperationProcessor[AsyncpgExecAdapter, AsyncpgPoolWrapper, asyncpg.pool.PoolConnectionProxy] -): +class AsyncpgOperationProcessor(PostgreSQLOperationProcessor[AsyncpgPoolWrapper, asyncpg.pool.PoolConnectionProxy]): _cmstack: Optional[AsyncExitStack] = attr.ib(init=False, default=None) _timeout = 1.5 @@ -25,10 +23,17 @@ async def start(self) -> None: pg_conn = await cmstack.enter_async_context(self._pg_pool.pool.acquire(timeout=self._timeout)) self._pg_conn = pg_conn await cmstack.enter_async_context(pg_conn.transaction()) - self._pgex_adapter = AsyncpgExecAdapter(conn=pg_conn) + self._db_ex_adapter = AsyncpgExecAdapter( + service_registry=self.service_registry, + reporting_enabled=self._reporting_enabled, + conn=pg_conn, + cache_options_builder=self._cache_options_builder, + ) async def end(self) -> None: - self._pgex_adapter = None + assert self._db_ex_adapter is not None + assert self._cmstack is not None + self._db_ex_adapter = None await self._cmstack.aclose() # type: ignore # TODO: fix self._pg_conn = None self._cmstack = None diff --git a/lib/dl_compeng_pg/dl_compeng_pg/compeng_pg_base/exec_adapter_base.py b/lib/dl_compeng_pg/dl_compeng_pg/compeng_pg_base/exec_adapter_base.py index 3bb50450e..700baaaf6 100644 --- a/lib/dl_compeng_pg/dl_compeng_pg/compeng_pg_base/exec_adapter_base.py +++ b/lib/dl_compeng_pg/dl_compeng_pg/compeng_pg_base/exec_adapter_base.py @@ -17,6 +17,7 @@ from sqlalchemy.sql.base import Executable from dl_constants.enums import UserDataType +from dl_core.connectors.base.query_compiler import QueryCompiler from dl_core.data_processing.processing.db_base.exec_adapter_base import ProcessorDbExecAdapterBase from dl_core.data_processing.streaming import AsyncChunkedBase from dl_core.db.sa_types import make_sa_type @@ -69,13 +70,12 @@ async def create_table( table_name: str, names: Sequence[str], user_types: Sequence[UserDataType], - ) -> sa.sql.selectable.TableClause: + ) -> None: """Create table in database""" table = self._make_sa_table(table_name=table_name, names=names, user_types=user_types) self._log.info(f"Creating PG processor table {table_name}: {table}") await self._execute_ddl(sa.schema.CreateTable(table)) - return table async def _drop_table(self, table_name: str) -> None: await self._execute_ddl(sa.schema.DropTable(sa.table(table_name))) # type: ignore @@ -96,3 +96,6 @@ async def insert_data_into_table( data: AsyncChunkedBase, ) -> None: """,,,""" + + def get_query_compiler(self) -> QueryCompiler: + return QueryCompiler(dialect=self.dialect) diff --git a/lib/dl_compeng_pg/dl_compeng_pg/compeng_pg_base/op_executors.py b/lib/dl_compeng_pg/dl_compeng_pg/compeng_pg_base/op_executors.py deleted file mode 100644 index 8e9de8348..000000000 --- a/lib/dl_compeng_pg/dl_compeng_pg/compeng_pg_base/op_executors.py +++ /dev/null @@ -1,91 +0,0 @@ -from __future__ import annotations - -import attr -import shortuuid -import sqlalchemy as sa - -from dl_compeng_pg.compeng_pg_base.exec_adapter_base import PostgreSQLExecAdapterAsync -from dl_constants.enums import JoinType -from dl_core.connectors.base.query_compiler import QueryCompiler -from dl_core.data_processing.prepared_components.primitives import PreparedSingleFromInfo -from dl_core.data_processing.processing.db_base.op_executors import ( - OpExecutorAsync, - log_op, -) -from dl_core.data_processing.processing.operation import ( - BaseOp, - UploadOp, -) -from dl_core.data_processing.stream_base import ( - DataSourceVS, - DataStreamAsync, -) - - -COMPENG_SUPPORTED_JOIN_TYPES = frozenset( - { - JoinType.inner, - JoinType.left, - JoinType.right, - JoinType.full, - } -) - - -@attr.s -class PgOpExecutorAsync(OpExecutorAsync): - @property - def pgex_adapter(self) -> PostgreSQLExecAdapterAsync: - assert isinstance(self.db_ex_adapter, PostgreSQLExecAdapterAsync) - return self.db_ex_adapter - - -class UploadOpExecutorAsync(PgOpExecutorAsync): - """Dumps incoming stream to a database table""" - - @log_op # type: ignore # TODO: fix - async def execute(self, op: BaseOp) -> DataSourceVS: # type: ignore # TODO: fix - assert isinstance(op, UploadOp) - - source_stream = self.ctx.get_stream(op.source_stream_id) - assert isinstance(source_stream, DataStreamAsync) - - table_name = shortuuid.uuid() - await self.pgex_adapter.create_table( - table_name=table_name, - names=source_stream.names, - user_types=source_stream.user_types, - ) - await self.pgex_adapter.insert_data_into_table( - table_name=table_name, - names=source_stream.names, - user_types=source_stream.user_types, - data=source_stream.data, - ) - - alias = op.alias - sql_source = sa.alias(sa.table(table_name), alias) - - prep_src_info = PreparedSingleFromInfo( - id=op.result_id, - alias=alias, - query_compiler=QueryCompiler(dialect=self.pgex_adapter.dialect), - sql_source=sql_source, - col_names=source_stream.names, - user_types=source_stream.user_types, - data_source_list=None, - supported_join_types=COMPENG_SUPPORTED_JOIN_TYPES, - db_name=None, - connect_args={}, - pass_db_query_to_user=False, - target_connection_ref=None, - ) - - return DataSourceVS( - id=op.dest_stream_id, - result_id=prep_src_info.id, - prep_src_info=prep_src_info, - names=prep_src_info.col_names, - user_types=prep_src_info.user_types, - alias=op.alias, - ) diff --git a/lib/dl_compeng_pg/dl_compeng_pg/compeng_pg_base/processor_base.py b/lib/dl_compeng_pg/dl_compeng_pg/compeng_pg_base/processor_base.py index 25ef089fa..0f2e8c992 100644 --- a/lib/dl_compeng_pg/dl_compeng_pg/compeng_pg_base/processor_base.py +++ b/lib/dl_compeng_pg/dl_compeng_pg/compeng_pg_base/processor_base.py @@ -2,51 +2,51 @@ import abc from typing import ( - ClassVar, - Dict, Generic, Optional, - Type, TypeVar, ) import attr from dl_compeng_pg.compeng_pg_base.exec_adapter_base import PostgreSQLExecAdapterAsync -from dl_compeng_pg.compeng_pg_base.op_executors import UploadOpExecutorAsync from dl_compeng_pg.compeng_pg_base.pool_base import BasePgPoolWrapper from dl_constants.enums import UserDataType -from dl_core.data_processing.processing.context import OpExecutionContext -from dl_core.data_processing.processing.db_base.op_executors import ( - CalcOpExecutorAsync, - DownloadOpExecutorAsync, - JoinOpExecutorAsync, - OpExecutorAsync, -) -from dl_core.data_processing.processing.operation import ( - BaseOp, - CalcOp, - DownloadOp, - JoinOp, - UploadOp, +from dl_core.data_processing.cache.primitives import CacheTTLConfig +from dl_core.data_processing.cache.utils import ( + CompengOptionsBuilder, + DatasetOptionsBuilder, ) -from dl_core.data_processing.processing.processor import OperationProcessorAsyncBase -from dl_core.data_processing.stream_base import AbstractStream +from dl_core.data_processing.processing.context import OpExecutionContext +from dl_core.data_processing.processing.db_base.exec_adapter_base import ProcessorDbExecAdapterBase +from dl_core.data_processing.processing.db_base.processor_base import ExecutorBasedOperationProcessor -_ADAPTER_TV = TypeVar("_ADAPTER_TV", bound=PostgreSQLExecAdapterAsync) _POOL_TV = TypeVar("_POOL_TV", bound=BasePgPoolWrapper) _CONN_TV = TypeVar("_CONN_TV") @attr.s -class PostgreSQLOperationProcessor( - OperationProcessorAsyncBase, Generic[_ADAPTER_TV, _POOL_TV, _CONN_TV], metaclass=abc.ABCMeta -): +class PostgreSQLOperationProcessor(ExecutorBasedOperationProcessor, Generic[_POOL_TV, _CONN_TV], metaclass=abc.ABCMeta): _pg_pool: _POOL_TV = attr.ib() _task_timeout: Optional[int] = attr.ib(default=None) - _pgex_adapter: Optional[_ADAPTER_TV] = attr.ib(init=False, default=None) _pg_conn: Optional[_CONN_TV] = attr.ib(init=False, default=None) + _default_cache_ttl_config: CacheTTLConfig = attr.ib(factory=CacheTTLConfig) + + def _make_cache_options_builder(self) -> DatasetOptionsBuilder: + assert self._default_cache_ttl_config is not None + return CompengOptionsBuilder(default_ttl_config=self._default_cache_ttl_config) + + def _make_db_ex_adapter(self) -> Optional[ProcessorDbExecAdapterBase]: + # The adapter has to be initialized asynchronously, so don't create it here yet + return None + + @property + def db_ex_adapter(self) -> PostgreSQLExecAdapterAsync: + assert self._db_ex_adapter is not None + # Add the sub-type condition because it needs to support DDL actions + assert isinstance(self._db_ex_adapter, PostgreSQLExecAdapterAsync) + return self._db_ex_adapter @abc.abstractmethod async def start(self) -> None: @@ -57,20 +57,11 @@ async def end(self) -> None: """Cleanup.""" async def ping(self) -> Optional[int]: - assert self._pgex_adapter is not None - result = await self._pgex_adapter.scalar("select 1", user_type=UserDataType.integer) + ctx = OpExecutionContext(processing_id="", streams=[], operations=[]) + result = await self.db_ex_adapter.scalar( + "select 1", + user_type=UserDataType.integer, + ctx=ctx, + ) assert result is None or isinstance(result, int) return result - - _executors: ClassVar[Dict[Type[BaseOp], Type[OpExecutorAsync]]] = { - UploadOp: UploadOpExecutorAsync, - DownloadOp: DownloadOpExecutorAsync, - CalcOp: CalcOpExecutorAsync, - JoinOp: JoinOpExecutorAsync, - } - - async def execute_operation(self, op: BaseOp, ctx: OpExecutionContext) -> AbstractStream: - opex_cls: Type[OpExecutorAsync] = self._executors[type(op)] - assert self._pgex_adapter is not None - opex = opex_cls(db_ex_adapter=self._pgex_adapter, ctx=ctx) # type: ignore # TODO: fix - return await opex.execute(op) diff --git a/lib/dl_core/dl_core/aio/web_app_services/data_processing/data_processor.py b/lib/dl_core/dl_core/aio/web_app_services/data_processing/data_processor.py index 37475cd43..21c932d99 100644 --- a/lib/dl_core/dl_core/aio/web_app_services/data_processing/data_processor.py +++ b/lib/dl_core/dl_core/aio/web_app_services/data_processing/data_processor.py @@ -3,6 +3,7 @@ import abc import logging from typing import ( + TYPE_CHECKING, ClassVar, Type, TypeVar, @@ -12,7 +13,11 @@ from aiohttp import web import attr -from dl_core.data_processing.processing.processor import OperationProcessorAsyncBase +from dl_core.data_processing.processing.db_base.processor_base import ExecutorBasedOperationProcessor + + +if TYPE_CHECKING: + from dl_core.services_registry.top_level import ServicesRegistry LOGGER = logging.getLogger(__name__) @@ -65,7 +70,11 @@ def get_app_instance(cls: Type[_DATA_PROC_SRV_TV], app: web.Application) -> _DAT return service @abc.abstractmethod - def get_data_processor(self) -> OperationProcessorAsyncBase: + def get_data_processor( + self, + service_registry: ServicesRegistry, + reporting_enabled: bool, + ) -> ExecutorBasedOperationProcessor: raise NotImplementedError @classmethod diff --git a/lib/dl_core/dl_core/data_processing/cache/primitives.py b/lib/dl_core/dl_core/data_processing/cache/primitives.py index 50dbf63f1..ca4eca563 100644 --- a/lib/dl_core/dl_core/data_processing/cache/primitives.py +++ b/lib/dl_core/dl_core/data_processing/cache/primitives.py @@ -39,6 +39,9 @@ class BIQueryCacheOptions: ttl_sec: int refresh_ttl_on_read: bool + def clone(self, **kwargs: Any) -> BIQueryCacheOptions: + return attr.evolve(self, **kwargs) + class DataKeyPart(NamedTuple): part_type: str @@ -87,6 +90,7 @@ def key_parts_hash(self) -> str: return self._key_parts_hash def extend(self, part_type: str, part_content: Hashable) -> LocalKeyRepresentation: + assert part_content is not None new_part = DataKeyPart(part_type=part_type, part_content=part_content) return LocalKeyRepresentation(key_parts=self.key_parts + (new_part,)) diff --git a/lib/dl_core/dl_core/data_processing/cache/processing_helper.py b/lib/dl_core/dl_core/data_processing/cache/processing_helper.py index 6672102ad..3097b97e8 100644 --- a/lib/dl_core/dl_core/data_processing/cache/processing_helper.py +++ b/lib/dl_core/dl_core/data_processing/cache/processing_helper.py @@ -24,9 +24,7 @@ ) from dl_core.data_processing.cache.primitives import BIQueryCacheOptions from dl_core.data_processing.types import TJSONExtChunkStream - import dl_core.data_source # noqa - from dl_core.services_registry import ServicesRegistry # noqa - import dl_core.us_dataset # noqa + from dl_core.services_registry import ServicesRegistry LOGGER = logging.getLogger(__name__) @@ -171,6 +169,7 @@ async def run_with_cache( await cem.finalize( result=result_as_list, ) + LOGGER.info("Saved to cache") except Exception: LOGGER.error("Error during finalizing cache (after a generate success)", exc_info=True) diff --git a/lib/dl_core/dl_core/data_processing/cache/utils.py b/lib/dl_core/dl_core/data_processing/cache/utils.py index c3904af85..c0a199b22 100644 --- a/lib/dl_core/dl_core/data_processing/cache/utils.py +++ b/lib/dl_core/dl_core/data_processing/cache/utils.py @@ -19,6 +19,7 @@ DataKeyPart, LocalKeyRepresentation, ) +from dl_core.query.bi_query import QueryAndResultInfo from dl_core.serialization import hashable_dumps from dl_core.us_connection_base import ( ConnectionBase, @@ -32,7 +33,7 @@ from dl_constants.enums import UserDataType from dl_constants.types import TJSONExt - from dl_core.data_processing.prepared_components.primitives import PreparedMultiFromInfo + from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo from dl_core.data_processing.stream_base import DataStreamBase from dl_core.data_source.base import DataSource from dl_core.us_dataset import Dataset @@ -84,34 +85,14 @@ def get_query_str_for_cache(query: Select, dialect: DefaultDialect) -> str: ) @staticmethod - def config_to_ttl_info( - ttl_config: CacheTTLConfig, - is_materialized: bool, - data_source_list: Optional[Collection[DataSource]] = None, - ) -> CacheTTLInfo: - ttl_info = CacheTTLInfo( + def config_to_ttl_info(ttl_config: CacheTTLConfig) -> CacheTTLInfo: + return CacheTTLInfo( ttl_sec=ttl_config.ttl_sec_direct, refresh_ttl_on_read=False, ) - if not is_materialized: - return ttl_info - if data_source_list is None: - return ttl_info - - data_dump_id_list = [dsrc.data_dump_id for dsrc in data_source_list] - if not all(data_dump_id_list): - return ttl_info - - # Materialized - return ttl_info.clone( - ttl_sec=ttl_config.ttl_sec_materialized, - refresh_ttl_on_read=True, - ) - def get_cache_ttl_info( self, - is_materialized: bool, data_source_list: Collection[DataSource], # For future use dataset: Optional[Dataset] = None, # noqa @@ -124,22 +105,64 @@ def get_cache_ttl_info( connection=actual_connection, dataset=dataset, ) - return self.config_to_ttl_info( - ttl_config=ttl_config, - is_materialized=is_materialized, - data_source_list=data_source_list, - ) + return self.config_to_ttl_info(ttl_config=ttl_config) + + def get_data_key( + self, + *, + query_res_info: QueryAndResultInfo, + from_info: Optional[PreparedFromInfo] = None, + base_key: LocalKeyRepresentation = LocalKeyRepresentation(), + ) -> Optional[LocalKeyRepresentation]: + return base_key + + +@attr.s +class DatasetOptionsBuilder(CacheOptionsBuilderBase): + cache_enabled: bool = attr.ib(kw_only=True, default=True) + + def get_cache_options( + self, + joint_dsrc_info: PreparedFromInfo, + query: Select, + user_types: list[UserDataType], + dataset: Dataset, + data_key: LocalKeyRepresentation, + role: DataSourceRole = DataSourceRole.origin, + ) -> BIQueryCacheOptions: + raise NotImplementedError @attr.s -class CacheOptionsBuilderDataProcessor(CacheOptionsBuilderBase): +class CompengOptionsBuilder(DatasetOptionsBuilder): # TODO: Move to compeng package + cache_enabled: bool = attr.ib(kw_only=True, default=True) + + def get_cache_options( + self, + joint_dsrc_info: PreparedFromInfo, + query: Select, + user_types: list[UserDataType], + dataset: Dataset, + data_key: LocalKeyRepresentation, + role: DataSourceRole = DataSourceRole.origin, + ) -> BIQueryCacheOptions: + ttl_info = self.get_cache_ttl_info( + data_source_list=joint_dsrc_info.data_source_list, + dataset=dataset, + ) + return BIQueryCacheOptions( + cache_enabled=self.cache_enabled, + key=data_key, + ttl_sec=ttl_info.ttl_sec, + refresh_ttl_on_read=ttl_info.refresh_ttl_on_read, + ) + def get_cache_options_for_stream( self, stream: DataStreamBase, dataset: Optional[Dataset] = None, ) -> BIQueryCacheOptions: ttl_info = self.get_cache_ttl_info( - is_materialized=stream.meta.is_materialized, data_source_list=stream.meta.data_source_list, dataset=dataset, ) @@ -154,45 +177,37 @@ def get_cache_options_for_stream( @attr.s -class SelectorCacheOptionsBuilder(CacheOptionsBuilderBase): +class SelectorCacheOptionsBuilder(DatasetOptionsBuilder): _is_bleeding_edge_user: bool = attr.ib(default=False) _us_entry_buffer: USEntryBuffer = attr.ib(kw_only=True) - def get_cache_enabled(self, joint_dsrc_info: PreparedMultiFromInfo) -> bool: + def get_cache_enabled(self, joint_dsrc_info: PreparedFromInfo) -> bool: assert joint_dsrc_info.data_source_list is not None cache_enabled = all(dsrc.cache_enabled for dsrc in joint_dsrc_info.data_source_list) return cache_enabled def get_cache_options( self, - role: DataSourceRole, - joint_dsrc_info: PreparedMultiFromInfo, + joint_dsrc_info: PreparedFromInfo, query: Select, user_types: list[UserDataType], dataset: Dataset, + data_key: LocalKeyRepresentation, + role: DataSourceRole = DataSourceRole.origin, ) -> BIQueryCacheOptions: """Returns cache key, TTL for new entries, refresh TTL flag""" - merged_data_dump_id = "N/A" - assert joint_dsrc_info.data_source_list is not None - if role != DataSourceRole.origin: - data_dump_id_list = [dsrc.data_dump_id for dsrc in joint_dsrc_info.data_source_list] - if all(data_dump_id_list): - merged_data_dump_id = "+".join(data_dump_id_list) # type: ignore - compiled_query = self.get_query_str_for_cache( query=query, dialect=joint_dsrc_info.query_compiler.dialect, ) local_key_rep: Optional[LocalKeyRepresentation] = self.make_data_select_cache_key( - joint_dsrc_info=joint_dsrc_info, + from_info=joint_dsrc_info, compiled_query=compiled_query, user_types=user_types, - data_dump_id=merged_data_dump_id, is_bleeding_edge_user=self._is_bleeding_edge_user, ) ttl_info = self.get_cache_ttl_info( - is_materialized=role != DataSourceRole.origin, data_source_list=joint_dsrc_info.data_source_list, dataset=dataset, ) @@ -209,27 +224,26 @@ def get_cache_options( def make_data_select_cache_key( self, - joint_dsrc_info: PreparedMultiFromInfo, + from_info: PreparedFromInfo, compiled_query: str, user_types: list[UserDataType], - data_dump_id: str, is_bleeding_edge_user: bool, + base_key: LocalKeyRepresentation = LocalKeyRepresentation(), ) -> LocalKeyRepresentation: - assert joint_dsrc_info.target_connection_ref is not None - target_connection = self._us_entry_buffer.get_entry(joint_dsrc_info.target_connection_ref) + assert from_info.target_connection_ref is not None + target_connection = self._us_entry_buffer.get_entry(from_info.target_connection_ref) assert isinstance(target_connection, ConnectionBase) connection_id = target_connection.uuid assert connection_id is not None - local_key_rep = target_connection.get_cache_key_part() - if joint_dsrc_info.db_name is not None: + local_key_rep = base_key.multi_extend(*target_connection.get_cache_key_part().key_parts) + if from_info.db_name is not None: # FIXME: Replace with key parts for every participating dsrc # For now db_name will be duplicated in some source types # (one from connection, one from source) - local_key_rep = local_key_rep.extend(part_type="db_name", part_content=joint_dsrc_info.db_name) + local_key_rep = local_key_rep.extend(part_type="db_name", part_content=from_info.db_name) local_key_rep = local_key_rep.extend(part_type="query", part_content=str(compiled_query)) local_key_rep = local_key_rep.extend(part_type="user_types", part_content=tuple(user_types or ())) - local_key_rep = local_key_rep.extend(part_type="data_dump_id", part_content=data_dump_id or "N/A") local_key_rep = local_key_rep.extend( part_type="is_bleeding_edge_user", part_content=is_bleeding_edge_user, @@ -237,6 +251,26 @@ def make_data_select_cache_key( return local_key_rep + def get_data_key( + self, + *, + query_res_info: QueryAndResultInfo, + from_info: Optional[PreparedFromInfo] = None, + base_key: LocalKeyRepresentation = LocalKeyRepresentation(), + ) -> Optional[LocalKeyRepresentation]: + compiled_query = self.get_query_str_for_cache( + query=query_res_info.query, + dialect=from_info.query_compiler.dialect, + ) + data_key: Optional[LocalKeyRepresentation] = self.make_data_select_cache_key( + base_key=base_key, + from_info=from_info, + compiled_query=compiled_query, + user_types=query_res_info.user_types, + is_bleeding_edge_user=self._is_bleeding_edge_user, + ) + return data_key + @attr.s class DashSQLCacheOptionsBuilder(CacheOptionsBuilderBase): @@ -252,12 +286,13 @@ def get_cache_options( params: TJSONExt, db_params: dict[str, str], connector_specific_params: TJSONExt, + data_key: LocalKeyRepresentation = LocalKeyRepresentation(), ) -> BIQueryCacheOptions: cache_enabled = self.get_cache_enabled(conn=conn) ttl_config = self.get_actual_ttl_config(connection=conn, dataset=None) - ttl_info = self.config_to_ttl_info(ttl_config, is_materialized=False, data_source_list=None) + ttl_info = self.config_to_ttl_info(ttl_config) - local_key_rep: Optional[LocalKeyRepresentation] = conn.get_cache_key_part() + local_key_rep: LocalKeyRepresentation = data_key.multi_extend(*conn.get_cache_key_part().key_parts) local_key_rep = local_key_rep.multi_extend( DataKeyPart(part_type="query", part_content=query_text), DataKeyPart(part_type="params", part_content=hashable_dumps(params)), diff --git a/lib/dl_core/dl_core/data_processing/prepared_components/default_manager.py b/lib/dl_core/dl_core/data_processing/prepared_components/default_manager.py index 1005b363e..7c43fa566 100644 --- a/lib/dl_core/dl_core/data_processing/prepared_components/default_manager.py +++ b/lib/dl_core/dl_core/data_processing/prepared_components/default_manager.py @@ -32,8 +32,8 @@ class DefaultPreparedComponentManager(PreparedComponentManagerBase): """ _dataset: Dataset = attr.ib(kw_only=True) - _role: DataSourceRole = attr.ib(kw_only=True) _us_entry_buffer: USEntryBuffer = attr.ib(kw_only=True) + _role: DataSourceRole = attr.ib(kw_only=True, default=DataSourceRole.origin) _ds_accessor: DatasetComponentAccessor = attr.ib(init=False) _dsrc_coll_factory: DataSourceCollectionFactory = attr.ib(init=False) diff --git a/lib/dl_core/dl_core/data_processing/prepared_components/primitives.py b/lib/dl_core/dl_core/data_processing/prepared_components/primitives.py index 172c59eec..73b06cdf8 100644 --- a/lib/dl_core/dl_core/data_processing/prepared_components/primitives.py +++ b/lib/dl_core/dl_core/data_processing/prepared_components/primitives.py @@ -6,6 +6,7 @@ Collection, Optional, Sequence, + TypeVar, ) import attr @@ -23,6 +24,9 @@ import dl_core.data_source +_PREP_FROM_TV = TypeVar("_PREP_FROM_TV", bound="PreparedFromInfo") + + @attr.s(frozen=True) class PreparedFromInfo: """ @@ -36,7 +40,7 @@ class PreparedFromInfo: sql_source: Optional[SqlSourceType] = attr.ib(kw_only=True) query_compiler: QueryCompiler = attr.ib(kw_only=True) supported_join_types: Collection[JoinType] = attr.ib(kw_only=True) - data_source_list: Optional[tuple[dl_core.data_source.BaseSQLDataSource, ...]] = attr.ib(kw_only=True) + data_source_list: Optional[tuple[dl_core.data_source.DataSource, ...]] = attr.ib(kw_only=True) db_name: Optional[str] = attr.ib(kw_only=True) connect_args: dict[str, Any] = attr.ib(kw_only=True) pass_db_query_to_user: bool = attr.ib(kw_only=True) @@ -50,6 +54,9 @@ def non_null_sql_source(self) -> SqlSourceType: assert self.sql_source is not None return self.sql_source + def clone(self: _PREP_FROM_TV, **kwargs: Any) -> _PREP_FROM_TV: + return attr.evolve(self, **kwargs) + @attr.s(frozen=True) class PreparedMultiFromInfo(PreparedFromInfo): diff --git a/lib/dl_core/dl_core/data_processing/processing/cache/__init__.py b/lib/dl_core/dl_core/data_processing/processing/cache/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_core/dl_core/data_processing/processing/cache/exec_adapter.py b/lib/dl_core/dl_core/data_processing/processing/cache/exec_adapter.py new file mode 100644 index 000000000..2c6b7ff2e --- /dev/null +++ b/lib/dl_core/dl_core/data_processing/processing/cache/exec_adapter.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +import time +from typing import ( + TYPE_CHECKING, + Awaitable, + Callable, + Collection, + Optional, + Sequence, + Union, +) + +import attr + +from dl_api_commons.reporting.models import DataProcessingCacheInfoReportingRecord +from dl_constants.enums import ( + JoinType, + UserDataType, +) +from dl_core.connectors.base.query_compiler import QueryCompiler +from dl_core.data_processing.cache.primitives import LocalKeyRepresentation +from dl_core.data_processing.cache.processing_helper import ( + CacheProcessingHelper, + CacheSituation, +) +from dl_core.data_processing.processing.context import OpExecutionContext +from dl_core.data_processing.processing.db_base.exec_adapter_base import ProcessorDbExecAdapterBase +from dl_core.data_processing.processing.db_base.processor_base import ExecutorBasedOperationProcessor +from dl_core.data_processing.streaming import AsyncChunkedBase +from dl_core.us_dataset import Dataset + + +if TYPE_CHECKING: + from sqlalchemy.sql.selectable import Select + + from dl_core.base_models import ConnectionRef + from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo + from dl_core.data_processing.types import TValuesChunkStream + + +@attr.s +class CacheExecAdapter(ProcessorDbExecAdapterBase): # noqa + _dataset: Dataset = attr.ib(kw_only=True) + _main_processor: ExecutorBasedOperationProcessor = attr.ib(kw_only=True) + _use_cache: bool = attr.ib(kw_only=True) + _use_locked_cache: bool = attr.ib(kw_only=True) + + def _save_data_proc_cache_info_reporting_record(self, ctx: OpExecutionContext, cache_full_hit: bool) -> None: + data_proc_cache_record = DataProcessingCacheInfoReportingRecord( + timestamp=time.time(), + processing_id=ctx.processing_id, + cache_full_hit=cache_full_hit, + ) + self.add_reporting_record(data_proc_cache_record) + + async def _execute_and_fetch( + self, + *, + query: Union[str, Select], + user_types: Sequence[UserDataType], + chunk_size: int, + joint_dsrc_info: Optional[PreparedFromInfo] = None, + query_id: str, + ctx: OpExecutionContext, + data_key: LocalKeyRepresentation, + preparation_callback: Optional[Callable[[], Awaitable[None]]], + ) -> TValuesChunkStream: + # Ignore preparation_callback - we do not need to prepare real data here + + # Resolve TTL info and save BIQueryCacheOptions object + cache_options = self._cache_options_builder.get_cache_options( + joint_dsrc_info=joint_dsrc_info, + query=query, + user_types=list(user_types), + dataset=self._dataset, + data_key=data_key, + ) + + ds_id = self._dataset.uuid + cache_helper = CacheProcessingHelper( + entity_id=ds_id, # type: ignore # TODO: fix + service_registry=self._service_registry, + ) + + original_ctx = ctx.clone() + + async def _get_from_source() -> Optional[TValuesChunkStream]: + result_data = await self._main_processor.db_ex_adapter.fetch_data_from_select( + query=query, + user_types=user_types, + chunk_size=chunk_size, + joint_dsrc_info=joint_dsrc_info, + query_id=query_id, + ctx=original_ctx, + data_key=data_key, + preparation_callback=preparation_callback, + ) + return result_data + + cache_full_hit: Optional[bool] = None + try: + sit, result_iter = await cache_helper.run_with_cache( + allow_cache_read=self._use_cache, + generate_func=_get_from_source, # type: ignore # TODO: fix + cache_options=cache_options, + use_locked_cache=self._use_locked_cache, + ) + if sit == CacheSituation.full_hit: + cache_full_hit = True + elif sit == CacheSituation.generated: + cache_full_hit = False + finally: + self._save_data_proc_cache_info_reporting_record(ctx=ctx, cache_full_hit=cache_full_hit) + self._main_processor.db_ex_adapter.post_cache_usage(query_id=query_id, cache_full_hit=cache_full_hit) + + return result_iter + + async def create_table( + self, + *, + table_name: str, + names: Sequence[str], + user_types: Sequence[UserDataType], + ) -> None: + # Proxy the action to main processor + await self._main_processor.db_ex_adapter.create_table( + table_name=table_name, + names=names, + user_types=user_types, + ) + + async def insert_data_into_table( + self, + *, + table_name: str, + names: Sequence[str], + user_types: Sequence[UserDataType], + data: AsyncChunkedBase, + ) -> None: + # Proxy the action to main processor + await self._main_processor.db_ex_adapter.insert_data_into_table( + table_name=table_name, + names=names, + user_types=user_types, + data=data, + ) + + def get_query_compiler(self) -> QueryCompiler: + return self._main_processor.db_ex_adapter.get_query_compiler() + + def get_supported_join_types(self) -> Collection[JoinType]: + return self._main_processor.db_ex_adapter.get_supported_join_types() + + def pre_query_execute( + self, + query_id: str, + compiled_query: str, + target_connection_ref: Optional[ConnectionRef], + ) -> None: + self._main_processor.db_ex_adapter.pre_query_execute( + query_id=query_id, + compiled_query=compiled_query, + target_connection_ref=target_connection_ref, + ) + + def post_query_execute( + self, + query_id: str, + exec_exception: Optional[Exception], + ) -> None: + self._main_processor.db_ex_adapter.post_query_execute(query_id=query_id, exec_exception=exec_exception) diff --git a/lib/dl_core/dl_core/data_processing/processing/cache/processor.py b/lib/dl_core/dl_core/data_processing/processing/cache/processor.py new file mode 100644 index 000000000..98f62a3cd --- /dev/null +++ b/lib/dl_core/dl_core/data_processing/processing/cache/processor.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from typing import Optional + +import attr + +from dl_core.data_processing.cache.utils import CacheOptionsBuilderBase +from dl_core.data_processing.processing.cache.exec_adapter import CacheExecAdapter +from dl_core.data_processing.processing.db_base.exec_adapter_base import ProcessorDbExecAdapterBase +from dl_core.data_processing.processing.db_base.processor_base import ExecutorBasedOperationProcessor +from dl_core.data_processing.processing.processor import OperationProcessorAsyncBase +from dl_core.us_dataset import Dataset + + +@attr.s +class CacheOperationProcessor(ExecutorBasedOperationProcessor, OperationProcessorAsyncBase): + _dataset: Dataset = attr.ib(kw_only=True) + _main_processor: ExecutorBasedOperationProcessor = attr.ib(kw_only=True) + _use_cache: bool = attr.ib(kw_only=True, default=True) + _use_locked_cache: bool = attr.ib(kw_only=True, default=True) + + def _make_cache_options_builder(self) -> CacheOptionsBuilderBase: + return self._main_processor._cache_options_builder + + def _make_db_ex_adapter(self) -> Optional[ProcessorDbExecAdapterBase]: + return CacheExecAdapter( + service_registry=self._service_registry, + reporting_enabled=self._reporting_enabled, + cache_options_builder=self._cache_options_builder, + dataset=self._dataset, + main_processor=self._main_processor, + use_cache=self._use_cache, + use_locked_cache=self._use_locked_cache, + ) + + async def ping(self) -> Optional[int]: + return await self._main_processor.ping() + + async def start(self) -> None: + await self._main_processor.start() + + async def end(self) -> None: + await self._main_processor.end() diff --git a/lib/dl_core/dl_core/data_processing/processing/db_base/exec_adapter_base.py b/lib/dl_core/dl_core/data_processing/processing/db_base/exec_adapter_base.py index ea106a568..c71650fed 100644 --- a/lib/dl_core/dl_core/data_processing/processing/db_base/exec_adapter_base.py +++ b/lib/dl_core/dl_core/data_processing/processing/db_base/exec_adapter_base.py @@ -4,7 +4,10 @@ import logging from typing import ( TYPE_CHECKING, + Awaitable, + Callable, ClassVar, + Collection, Optional, Sequence, Union, @@ -14,15 +17,25 @@ import sqlalchemy as sa from sqlalchemy.sql.selectable import Select +from dl_constants.enums import JoinType from dl_constants.types import TBIDataValue +from dl_core.connectors.base.query_compiler import QueryCompiler +from dl_core.data_processing.cache.primitives import LocalKeyRepresentation +from dl_core.data_processing.cache.utils import DatasetOptionsBuilder +from dl_core.data_processing.processing.context import OpExecutionContext +from dl_core.data_processing.streaming import AsyncChunkedBase from dl_core.data_processing.types import TValuesChunkStream +from dl_core.query.bi_query import QueryAndResultInfo from dl_core.utils import make_id if TYPE_CHECKING: + from dl_api_commons.reporting.records import ReportingRecord + from dl_api_commons.reporting.registry import ReportingRegistry from dl_constants.enums import UserDataType - from dl_core.data_processing.cache.primitives import LocalKeyRepresentation - from dl_core.data_processing.prepared_components.primitives import PreparedMultiFromInfo + from dl_core.base_models import ConnectionRef + from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo + from dl_core.services_registry.top_level import ServicesRegistry LOGGER = logging.getLogger(__name__) @@ -33,16 +46,30 @@ class ProcessorDbExecAdapterBase(abc.ABC): _default_chunk_size: ClassVar[int] = 1000 _log: ClassVar[logging.Logger] = LOGGER.getChild("ProcessorDbExecAdapterBase") + _cache_options_builder: DatasetOptionsBuilder = attr.ib(kw_only=True) + _service_registry: ServicesRegistry = attr.ib(kw_only=True) + _reporting_enabled: bool = attr.ib(kw_only=True, default=True) + + @property + def _reporting_registry(self) -> ReportingRegistry: + return self._service_registry.get_reporting_registry() + + def add_reporting_record(self, record: ReportingRecord) -> None: + if self._reporting_enabled: + self._reporting_registry.save_reporting_record(record) @abc.abstractmethod async def _execute_and_fetch( self, *, - query: Union[Select, str], + query: Select | str, user_types: Sequence[UserDataType], chunk_size: int, - joint_dsrc_info: Optional[PreparedMultiFromInfo] = None, + joint_dsrc_info: Optional[PreparedFromInfo] = None, query_id: str, + ctx: OpExecutionContext, + data_key: LocalKeyRepresentation, + preparation_callback: Optional[Callable[[], Awaitable[None]]], ) -> TValuesChunkStream: """ Execute SELECT statement. @@ -50,13 +77,23 @@ async def _execute_and_fetch( """ raise NotImplementedError - async def scalar(self, query: Union[str, Select], user_type: UserDataType) -> TBIDataValue: + async def scalar( + self, + query: str | Select, + user_type: UserDataType, + ctx: OpExecutionContext, + data_key: LocalKeyRepresentation = LocalKeyRepresentation(), + preparation_callback: Optional[Callable[[], Awaitable[None]]] = None, + ) -> TBIDataValue: """Execute a statement returning a scalar value.""" data_stream = await self._execute_and_fetch( query_id=make_id(), query=query, user_types=[user_type], chunk_size=5, + ctx=ctx, + data_key=data_key, + preparation_callback=preparation_callback, ) data = await data_stream.all() assert len(data) == 1, f"Expected 1 entry, got {len(data)}" @@ -69,8 +106,11 @@ async def fetch_data_from_select( query: Union[str, sa.sql.selectable.Select], user_types: Sequence[UserDataType], chunk_size: Optional[int] = None, - joint_dsrc_info: Optional[PreparedMultiFromInfo] = None, + joint_dsrc_info: Optional[PreparedFromInfo] = None, query_id: str, + ctx: OpExecutionContext, + data_key: LocalKeyRepresentation = LocalKeyRepresentation(), + preparation_callback: Optional[Callable[[], Awaitable[None]]] = None, ) -> TValuesChunkStream: """Fetch data from a table""" @@ -83,14 +123,86 @@ async def fetch_data_from_select( chunk_size=chunk_size, joint_dsrc_info=joint_dsrc_info, query_id=query_id, + ctx=ctx, + data_key=data_key, + preparation_callback=preparation_callback, + ) + + def _make_query_res_info( + self, + query: Union[str, Select], + user_types: Sequence[UserDataType], + ) -> QueryAndResultInfo: + query_res_info = QueryAndResultInfo( + query=query, # type: ignore # TODO: fix + user_types=list(user_types), + # This is basically legacy and will be removed. + # col_names are not really used anywhere, just passed around a lot. + # So we generate random ones here + col_names=[f"col_{i}" for i in range(len(user_types))], ) + return query_res_info def get_data_key( self, *, - query_id: str, - query: Union[str, Select], + query: str | Select, user_types: Sequence[UserDataType], - joint_dsrc_info: Optional[PreparedMultiFromInfo] = None, + from_info: Optional[PreparedFromInfo] = None, + base_key: LocalKeyRepresentation = LocalKeyRepresentation(), ) -> Optional[LocalKeyRepresentation]: - return None + query_res_info = self._make_query_res_info(query=query, user_types=user_types) + data_key = self._cache_options_builder.get_data_key( + from_info=from_info, + query_res_info=query_res_info, + base_key=base_key, + ) + return data_key + + async def create_table( + self, + *, + table_name: str, + names: Sequence[str], + user_types: Sequence[UserDataType], + ) -> sa.sql.selectable.TableClause: + """Create table""" + raise NotImplementedError # By default DDL is not supported + + async def insert_data_into_table( + self, + *, + table_name: str, + names: Sequence[str], + user_types: Sequence[UserDataType], + data: AsyncChunkedBase, + ) -> None: + """,,,""" + raise NotImplementedError # By default DDL is not supported + + def get_supported_join_types(self) -> Collection[JoinType]: + frozenset( + { + JoinType.inner, + JoinType.left, + JoinType.right, + JoinType.full, + } + ) + + def get_query_compiler(self) -> QueryCompiler: + raise NotImplementedError # By default no specifc QueryCompiler is defined + + def pre_query_execute( + self, + query_id: str, + compiled_query: str, + target_connection_ref: Optional[ConnectionRef], + ) -> None: + pass + + def post_query_execute(self, query_id: str, exec_exception: Optional[Exception]) -> None: + pass + + def post_cache_usage(self, query_id: str, cache_full_hit: bool) -> None: + pass diff --git a/lib/dl_core/dl_core/data_processing/processing/db_base/op_executors.py b/lib/dl_core/dl_core/data_processing/processing/db_base/op_executors.py index ce83567e6..a3154aa7b 100644 --- a/lib/dl_core/dl_core/data_processing/processing/db_base/op_executors.py +++ b/lib/dl_core/dl_core/data_processing/processing/db_base/op_executors.py @@ -1,14 +1,15 @@ from __future__ import annotations +import asyncio from functools import wraps import logging from typing import ( Awaitable, Callable, - List, ) import attr +import shortuuid import sqlalchemy as sa from sqlalchemy.sql.selectable import ( Alias, @@ -16,9 +17,9 @@ Subquery, ) +from dl_core.data_processing.cache.primitives import LocalKeyRepresentation from dl_core.data_processing.prepared_components.primitives import ( PreparedFromInfo, - PreparedMultiFromInfo, PreparedSingleFromInfo, ) from dl_core.data_processing.processing.context import OpExecutionContext @@ -30,6 +31,7 @@ JoinOp, MultiSourceOp, SingleSourceOp, + UploadOp, ) from dl_core.data_processing.source_builder import SqlSourceBuilder from dl_core.data_processing.stream_base import ( @@ -100,34 +102,36 @@ async def execute(self, op: BaseOp) -> DataStreamAsync: # type: ignore # TODO: query = query.original assert isinstance(query, Select), f"Got type {type(query).__name__} for query, expected Select" + joint_dsrc_info = source_stream.prep_src_info.clone(sql_source=query) query_debug_str = compile_query_for_debug(query=query, dialect=query_compiler.dialect) LOGGER.info(f"Going to database with SQL query:\n{query_debug_str}") - joint_dsrc_info = PreparedMultiFromInfo( - sql_source=query, - data_source_list=source_stream.prep_src_info.data_source_list, # type: ignore # TODO: fix - db_name=source_stream.prep_src_info.db_name, - connect_args=source_stream.prep_src_info.connect_args, - query_compiler=source_stream.prep_src_info.query_compiler, - supported_join_types=source_stream.prep_src_info.supported_join_types, - pass_db_query_to_user=source_stream.prep_src_info.pass_db_query_to_user, - target_connection_ref=source_stream.prep_src_info.target_connection_ref, - ) # FIXME: replace usage with prep_src_info - query_id = make_id() + data_key = source_stream.data_key + + self.db_ex_adapter.pre_query_execute( + query_id=query_id, + compiled_query=query_debug_str, + target_connection_ref=joint_dsrc_info.target_connection_ref, + ) + data = await self.db_ex_adapter.fetch_data_from_select( query=query, user_types=source_stream.user_types, joint_dsrc_info=joint_dsrc_info, query_id=query_id, + ctx=self.ctx, + data_key=data_key, + preparation_callback=source_stream.prepare, ) if op.row_count_hard_limit is not None: data = data.limit(op.row_count_hard_limit) - data_key = self.db_ex_adapter.get_data_key( - query=query, user_types=source_stream.user_types, joint_dsrc_info=joint_dsrc_info, query_id=query_id + self.db_ex_adapter.post_query_execute( + query_id=query_id, + exec_exception=None, # FIXME ) pass_db_query_to_user = joint_dsrc_info.pass_db_query_to_user @@ -178,13 +182,15 @@ async def execute(self, op: BaseOp) -> DataSourceVS: # type: ignore # TODO: fi LOGGER.info(f"Generated lazy query: {query_debug_str}") - names = [] - user_types = [] - for expr_ctx in op.bi_query.select_expressions: - assert expr_ctx.alias is not None - assert expr_ctx.user_type is not None - names.append(expr_ctx.alias) - user_types.append(expr_ctx.user_type) + names = op.bi_query.get_names() + user_types = op.bi_query.get_user_types() + + data_key = self.db_ex_adapter.get_data_key( + query=query, + user_types=source_stream.user_types, + from_info=from_info, + base_key=source_stream.data_key, + ) alias = op.alias prep_src_info = PreparedSingleFromInfo( @@ -208,6 +214,9 @@ async def execute(self, op: BaseOp) -> DataSourceVS: # type: ignore # TODO: fi prep_src_info=prep_src_info, names=prep_src_info.col_names, user_types=user_types, + data_key=data_key, + meta=DataRequestMetaInfo(data_source_list=prep_src_info.data_source_list), + preparation_callback=source_stream.prepare, ) @@ -216,11 +225,21 @@ class JoinOpExecutorAsync(OpExecutorAsync): async def execute(self, op: BaseOp) -> JointDataSourceVS: # type: ignore # TODO: fix assert isinstance(op, JoinOp) - prepared_sources: List[PreparedSingleFromInfo] = [] + prepared_sources: list[PreparedSingleFromInfo] = [] + data_key = LocalKeyRepresentation() + callbacks: list[Callable[[], Awaitable[None]]] = [] for stream_id in op.source_stream_ids: stream = self.ctx.get_stream(stream_id=stream_id) assert isinstance(stream, DataSourceVS) prepared_sources.append(stream.prep_src_info) + data_key = data_key.multi_extend(*stream.data_key.key_parts) + callbacks.append(stream.prepare) + + async def joint_preparation_callback() -> None: + results = await asyncio.gather(*(cb() for cb in callbacks), return_exceptions=True) + for res in results: + if isinstance(res, Exception): + raise res from res source_builder = SqlSourceBuilder() @@ -231,5 +250,70 @@ async def execute(self, op: BaseOp) -> JointDataSourceVS: # type: ignore # TOD use_empty_source=op.use_empty_source, ) return JointDataSourceVS( - id=op.dest_stream_id, names=[], user_types=[], joint_dsrc_info=joint_dsrc_info # not used # not used + id=op.dest_stream_id, + names=[], + user_types=[], + joint_dsrc_info=joint_dsrc_info, # not used # not used + meta=DataRequestMetaInfo( + data_source_list=joint_dsrc_info.data_source_list, + ), + data_key=data_key, + preparation_callback=joint_preparation_callback, + ) + + +class UploadOpExecutorAsync(OpExecutorAsync): + """Dumps incoming stream to a database table""" + + @log_op # type: ignore # TODO: fix + async def execute(self, op: BaseOp) -> DataSourceVS: # type: ignore # TODO: fix + assert isinstance(op, UploadOp) + + source_stream = self.ctx.get_stream(op.source_stream_id) + assert isinstance(source_stream, DataStreamAsync) + + table_name = shortuuid.uuid() + + async def upload_data() -> None: + LOGGER.info(f"Uploading to table {table_name}") + await self.db_ex_adapter.create_table( + table_name=table_name, + names=source_stream.names, + user_types=source_stream.user_types, + ) + await self.db_ex_adapter.insert_data_into_table( + table_name=table_name, + names=source_stream.names, + user_types=source_stream.user_types, + data=source_stream.data, + ) + + alias = op.alias + sql_source = sa.alias(sa.table(table_name), alias) + + prep_src_info = PreparedSingleFromInfo( + id=op.result_id, + alias=alias, + query_compiler=self.db_ex_adapter.get_query_compiler(), + sql_source=sql_source, + col_names=source_stream.names, + user_types=source_stream.user_types, + data_source_list=tuple(source_stream.meta.data_source_list), + supported_join_types=self.db_ex_adapter.get_supported_join_types(), + db_name=None, + connect_args={}, + pass_db_query_to_user=False, + target_connection_ref=None, + ) + + return DataSourceVS( + id=op.dest_stream_id, + result_id=prep_src_info.id, + prep_src_info=prep_src_info, + names=prep_src_info.col_names, + user_types=prep_src_info.user_types, + alias=op.alias, + data_key=source_stream.data_key, + meta=DataRequestMetaInfo(data_source_list=prep_src_info.data_source_list), + preparation_callback=upload_data, ) diff --git a/lib/dl_core/dl_core/data_processing/processing/db_base/processor_base.py b/lib/dl_core/dl_core/data_processing/processing/db_base/processor_base.py new file mode 100644 index 000000000..60ea61555 --- /dev/null +++ b/lib/dl_core/dl_core/data_processing/processing/db_base/processor_base.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from typing import ( + ClassVar, + Optional, + Type, +) + +import attr + +from dl_core.data_processing.processing.context import OpExecutionContext +from dl_core.data_processing.processing.db_base.op_executors import ( + CalcOpExecutorAsync, + DownloadOpExecutorAsync, + JoinOpExecutorAsync, + OpExecutorAsync, + UploadOpExecutorAsync, +) +from dl_core.data_processing.processing.operation import ( + BaseOp, + CalcOp, + DownloadOp, + JoinOp, + UploadOp, +) +from dl_core.data_processing.processing.processor import OperationProcessorAsyncBase +from dl_core.data_processing.stream_base import AbstractStream + + +@attr.s +class ExecutorBasedOperationProcessor(OperationProcessorAsyncBase): + _executors: ClassVar[dict[Type[BaseOp], Type[OpExecutorAsync]]] = { + DownloadOp: DownloadOpExecutorAsync, + CalcOp: CalcOpExecutorAsync, + JoinOp: JoinOpExecutorAsync, + UploadOp: UploadOpExecutorAsync, + } + + async def ping(self) -> Optional[int]: + return 1 + + async def start(self) -> None: + pass + + async def stop(self) -> None: + pass + + def get_executor_class(self, op_type: Type[BaseOp]) -> Type[OpExecutorAsync]: + return self._executors[op_type] + + async def execute_operation(self, op: BaseOp, ctx: OpExecutionContext) -> AbstractStream: + opex_cls: Type[OpExecutorAsync] = self.get_executor_class(type(op)) + opex = opex_cls(db_ex_adapter=self.db_ex_adapter, ctx=ctx) # type: ignore # TODO: fix + return await opex.execute(op) diff --git a/lib/dl_core/dl_core/data_processing/processing/operation.py b/lib/dl_core/dl_core/data_processing/processing/operation.py index 6c250535a..aaa78a3fb 100644 --- a/lib/dl_core/dl_core/data_processing/processing/operation.py +++ b/lib/dl_core/dl_core/data_processing/processing/operation.py @@ -4,6 +4,7 @@ AbstractSet, ClassVar, Collection, + Hashable, Optional, Tuple, Type, @@ -79,6 +80,7 @@ class CalcOp(SingleSourceOp): result_id: AvatarId = attr.ib(kw_only=True) bi_query: BIQuery = attr.ib(kw_only=True) alias: str = attr.ib(kw_only=True) + data_key_data: Hashable = attr.ib(kw_only=True) # To be implemented: diff --git a/lib/dl_core/dl_core/data_processing/processing/processor.py b/lib/dl_core/dl_core/data_processing/processing/processor.py index 7aa07c9ec..63968d7e5 100644 --- a/lib/dl_core/dl_core/data_processing/processing/processor.py +++ b/lib/dl_core/dl_core/data_processing/processing/processor.py @@ -19,7 +19,9 @@ DataProcessingEndReportingRecord, DataProcessingStartReportingRecord, ) +from dl_core.data_processing.cache.utils import DatasetOptionsBuilder from dl_core.data_processing.processing.context import OpExecutionContext +from dl_core.data_processing.processing.db_base.exec_adapter_base import ProcessorDbExecAdapterBase from dl_core.data_processing.processing.operation import ( BaseOp, MultiSourceOp, @@ -32,8 +34,8 @@ if TYPE_CHECKING: - from dl_api_commons.reporting.registry import ReportingRegistry # noqa - from dl_core.services_registry import ServicesRegistry # noqa + from dl_api_commons.reporting.registry import ReportingRegistry + from dl_core.services_registry import ServicesRegistry LOGGER = logging.getLogger(__name__) @@ -42,7 +44,30 @@ _OP_PROC_TV = TypeVar("_OP_PROC_TV", bound="OperationProcessorAsyncBase") +@attr.s class OperationProcessorAsyncBase(abc.ABC): + _service_registry: ServicesRegistry = attr.ib(kw_only=True) # Service registry override + _reporting_enabled: bool = attr.ib(kw_only=True, default=True) + _cache_options_builder: DatasetOptionsBuilder = attr.ib(init=False) + _db_ex_adapter: Optional[ProcessorDbExecAdapterBase] = attr.ib(init=False, default=None) + + def __attrs_post_init__(self) -> None: + self._cache_options_builder = self._make_cache_options_builder() + self._db_ex_adapter = self._make_db_ex_adapter() + + @abc.abstractmethod + def _make_cache_options_builder(self) -> DatasetOptionsBuilder: + raise NotImplementedError + + @abc.abstractmethod + def _make_db_ex_adapter(self) -> Optional[ProcessorDbExecAdapterBase]: + raise NotImplementedError + + @property + def db_ex_adapter(self) -> ProcessorDbExecAdapterBase: + assert self._db_ex_adapter is not None + return self._db_ex_adapter + @abc.abstractmethod async def ping(self) -> Optional[int]: """Check processor readiness""" @@ -168,18 +193,13 @@ async def run( return result - -@attr.s -class SROperationProcessorAsyncBase(OperationProcessorAsyncBase): - _service_registry: "ServicesRegistry" = attr.ib(default=None, kw_only=True) # Service registry override - @property - def service_registry(self) -> "ServicesRegistry": - if self._service_registry is not None: - return self._service_registry + def service_registry(self) -> ServicesRegistry: + assert self._service_registry is not None + return self._service_registry @property - def _reporting_registry(self) -> "ReportingRegistry": + def _reporting_registry(self) -> ReportingRegistry: return self.service_registry.get_reporting_registry() def _save_start_exec_reporting_record(self, ctx: OpExecutionContext) -> None: diff --git a/lib/dl_core/dl_core/data_processing/processing/processor_dataset_cached.py b/lib/dl_core/dl_core/data_processing/processing/processor_dataset_cached.py deleted file mode 100644 index 5726e30ec..000000000 --- a/lib/dl_core/dl_core/data_processing/processing/processor_dataset_cached.py +++ /dev/null @@ -1,284 +0,0 @@ -from __future__ import annotations - -import logging -import time -from typing import ( - TYPE_CHECKING, - Collection, - List, - Optional, -) - -import attr -import sqlalchemy as sa -from sqlalchemy.dialects.postgresql.base import PGDialect - -from dl_api_commons.reporting.models import DataProcessingCacheInfoReportingRecord -from dl_core.connectors.base.query_compiler import QueryCompiler -from dl_core.data_processing.cache.primitives import LocalKeyRepresentation -from dl_core.data_processing.cache.processing_helper import ( - CacheProcessingHelper, - CacheSituation, -) -from dl_core.data_processing.cache.utils import CacheOptionsBuilderDataProcessor -from dl_core.data_processing.processing.operation import ( - BaseOp, - CalcOp, - MultiSourceOp, - SingleSourceOp, -) -from dl_core.data_processing.processing.processor import SROperationProcessorAsyncBase -from dl_core.data_processing.stream_base import ( - CacheVirtualStream, - DataRequestMetaInfo, - DataStreamAsync, - DataStreamBase, -) - - -if TYPE_CHECKING: - from dl_core.data_processing.processing.context import OpExecutionContext - from dl_core.data_processing.processing.processor import OperationProcessorAsyncBase - from dl_core.data_processing.stream_base import AbstractStream - from dl_core.data_processing.types import TValuesChunkStream - from dl_core.us_dataset import Dataset - - -LOGGER = logging.getLogger(__name__) - - -@attr.s -class SingleStreamCacheOperationProcessor(SROperationProcessorAsyncBase): - _dataset: Dataset = attr.ib(kw_only=True) - _query_compiler: QueryCompiler = attr.ib(kw_only=True) - _cache_options_builder: CacheOptionsBuilderDataProcessor = attr.ib(kw_only=True) - _main_processor: OperationProcessorAsyncBase = attr.ib(kw_only=True) - - _cache_helper: Optional[CacheProcessingHelper] = attr.ib(init=False) - - async def ping(self) -> Optional[int]: - return await self._main_processor.ping() # TODO: also ping cache engine? - - @_query_compiler.default # noqa - def _default_query_compiler(self) -> QueryCompiler: - return QueryCompiler(dialect=PGDialect()) - - @_cache_options_builder.default # noqa - def _cache_options_builder_default(self) -> CacheOptionsBuilderDataProcessor: - return CacheOptionsBuilderDataProcessor(default_ttl_config=self._service_registry.default_cache_ttl_config) - - def __attrs_post_init__(self): # type: ignore # TODO: fix - # cache_engine_factory = self.service_registry.get_cache_engine_factory() - ds_id = self._dataset.uuid - self._cache_helper = CacheProcessingHelper( - entity_id=ds_id, # type: ignore # TODO: fix - service_registry=self.service_registry, - ) - - # TODO FIX: @altvod Move data key building to CacheOptionsBuilderDataProcessor - def _make_op_data_key(self, op: BaseOp, ctx: OpExecutionContext) -> Optional[LocalKeyRepresentation]: - """Generate new data key for operation""" - - base_key: Optional[LocalKeyRepresentation] - if isinstance(op, SingleSourceOp): - # inherit key from single source stream - stream = ctx.get_stream(op.source_stream_id) - assert isinstance(stream, DataStreamBase) - base_key = stream.data_key - elif isinstance(op, MultiSourceOp): - # combine key from multiple source streams - base_key = LocalKeyRepresentation() - for stream_id in op.source_stream_ids: - stream = ctx.get_stream(stream_id) - assert isinstance(stream, DataStreamBase) - if stream.data_key is None: - base_key = None - break - base_key = base_key.multi_extend(*stream.data_key.key_parts) - else: - raise TypeError(f"Invalid operation type {type(op)}") - - if base_key is None: - return None - - data_key = base_key.extend(part_type="operation", part_content=op.__class__.__name__) # type: ignore # TODO: fix - if isinstance(op, CalcOp): - data_key = data_key.extend( - part_type="query", - part_content=CacheOptionsBuilderDataProcessor.get_query_str_for_cache( - query=self._query_compiler.compile_select( - bi_query=op.bi_query, - # The real table name doesn't really matter here - sql_source=sa.table("table"), - ), - dialect=self._query_compiler.dialect, - ), - ) - - return data_key - - async def execute_operation(self, op: BaseOp, ctx: OpExecutionContext) -> CacheVirtualStream: - """ - Imitate execution by constructing a new virtual stream - with an updated data key for the cache engine - """ - - if isinstance(op, SingleSourceOp): - source_stream = ctx.get_stream(op.source_stream_id) - assert isinstance(source_stream, DataStreamBase) - user_types = source_stream.user_types - names = source_stream.names - stream_meta = source_stream.meta - elif isinstance(op, MultiSourceOp): - source_streams = [ctx.get_stream(source_stream_id) for source_stream_id in sorted(op.source_stream_ids)] - names = [] # not used - user_types = [] # not used - stream_meta = DataRequestMetaInfo( - query_id=None, # FIXME - query=None, - is_materialized=all( - stream.meta.is_materialized for stream in source_streams if isinstance(stream, DataStreamBase) - ), - data_source_list=[ - dsrc for s in source_streams if isinstance(s, DataStreamBase) for dsrc in s.meta.data_source_list - ], - ) - else: - raise TypeError(f"Invalid operation type {type(op)}") - - data_key = self._make_op_data_key(op=op, ctx=ctx) - - return CacheVirtualStream( - id=op.dest_stream_id, - names=names, - user_types=user_types, - data_key=data_key, - meta=stream_meta, - ) - - def _save_cache_info_reporting_record(self, ctx: OpExecutionContext, cache_full_hit: bool) -> None: - report = DataProcessingCacheInfoReportingRecord( - timestamp=time.time(), - processing_id=ctx.processing_id, - cache_full_hit=cache_full_hit, - ) - self._reporting_registry.save_reporting_record(report=report) - - async def execute_operations( - self, - ctx: OpExecutionContext, - output_stream_ids: Collection[str], - ) -> List[DataStreamAsync]: - """ - Turn all virtual output streams into real data-containing ones - by getting data from the cache or the source. - """ - if len(output_stream_ids) != 1: - raise ValueError("This cacher is only applicable for single-output streams") - - original_ctx = ctx.clone() - - # Uses the recursive logic to call `self.execute_operation` which converts - # the streams into `VirtualStream` objects (and likely mutates the `ctx`). - - # TODO: refactor to use an explicit self-method call instead of super() - output_streams = await super().execute_operations(ctx=ctx, output_stream_ids=output_stream_ids) - assert len(output_streams) == 1 - (virtual_stream,) = output_streams - assert isinstance(virtual_stream, CacheVirtualStream) - # Resolve TTL info and save BIQueryCacheOptions object - cache_options_builder = self._cache_options_builder - cache_options = cache_options_builder.get_cache_options_for_stream(virtual_stream, self._dataset) - - ds_id = self._dataset.uuid - cache_helper = CacheProcessingHelper( - entity_id=ds_id, # type: ignore # TODO: fix - service_registry=self.service_registry, - ) - - async def _get_from_source() -> Optional[TValuesChunkStream]: - streams = await self._main_processor.execute_operations( - ctx=original_ctx, - output_stream_ids=output_stream_ids, - ) - assert len(streams) == 1 - (stream,) = streams - return stream.data - - cache_full_hit = None - try: - sit, result_iter = await cache_helper.run_with_cache( - generate_func=_get_from_source, # type: ignore # TODO: fix - cache_options=cache_options, - ) - if sit == CacheSituation.full_hit: - cache_full_hit = True - elif sit == CacheSituation.generated: - cache_full_hit = False - finally: - self._save_cache_info_reporting_record( - ctx=ctx, - cache_full_hit=cache_full_hit, # type: ignore # TODO: fix - ) - - result_stream = DataStreamAsync( - id=virtual_stream.id, - names=virtual_stream.names, - user_types=virtual_stream.user_types, - data=result_iter, # type: ignore # TODO: fix - meta=virtual_stream.meta, - data_key=cache_options.key, - ) - return [result_stream] - - def _validate_input_stream(self, stream: AbstractStream, op: BaseOp) -> None: - if not isinstance(stream, CacheVirtualStream): - super()._validate_input_stream(stream, op) - - def _validate_output_stream(self, stream: AbstractStream, op: BaseOp) -> None: - if not isinstance(stream, CacheVirtualStream): - super()._validate_output_stream(stream, op) - - -@attr.s -class CachedDatasetProcessor(SROperationProcessorAsyncBase): # noqa - _dataset: Dataset = attr.ib(kw_only=True) - _main_processor: OperationProcessorAsyncBase = attr.ib(kw_only=True) - - _cache_processor: SingleStreamCacheOperationProcessor = attr.ib(init=False) - - def __attrs_post_init__(self) -> None: - self._cache_processor = SingleStreamCacheOperationProcessor( - dataset=self._dataset, - service_registry=self.service_registry, - main_processor=self._main_processor, - ) - - async def ping(self) -> Optional[int]: - return await self._main_processor.ping() # TODO: also ping cache engine? - - async def execute_operations( - self, - ctx: OpExecutionContext, - output_stream_ids: Collection[str], - ) -> List[DataStreamAsync]: - if len(output_stream_ids) != 1: - LOGGER.warning("Cannot currently use cache for multiple (%d) output streams", len(output_stream_ids)) - return await self._main_processor.execute_operations( - ctx=ctx, - output_stream_ids=output_stream_ids, - ) - - cache_processor = self._cache_processor - return await cache_processor.execute_operations( - ctx=ctx, - output_stream_ids=output_stream_ids, - ) - - async def start(self) -> None: - await self._main_processor.start() - await self._cache_processor.start() - - async def end(self) -> None: - await self._cache_processor.end() - await self._main_processor.end() diff --git a/lib/dl_core/dl_core/data_processing/processing/source_db/processor.py b/lib/dl_core/dl_core/data_processing/processing/source_db/processor.py index 97db7b769..0133dd3a7 100644 --- a/lib/dl_core/dl_core/data_processing/processing/source_db/processor.py +++ b/lib/dl_core/dl_core/data_processing/processing/source_db/processor.py @@ -1,69 +1,48 @@ from __future__ import annotations -from typing import ( - ClassVar, - Dict, - Optional, - Type, -) +from typing import Optional import attr from dl_constants.enums import DataSourceRole -from dl_core.data_processing.processing.context import OpExecutionContext -from dl_core.data_processing.processing.db_base.exec_adapter_base import ProcessorDbExecAdapterBase -from dl_core.data_processing.processing.db_base.op_executors import ( - CalcOpExecutorAsync, - DownloadOpExecutorAsync, - JoinOpExecutorAsync, - OpExecutorAsync, -) -from dl_core.data_processing.processing.operation import ( - BaseOp, - CalcOp, - DownloadOp, - JoinOp, +from dl_core.data_processing.cache.primitives import CacheTTLConfig +from dl_core.data_processing.cache.utils import ( + DatasetOptionsBuilder, + SelectorCacheOptionsBuilder, ) -from dl_core.data_processing.processing.processor import OperationProcessorAsyncBase +from dl_core.data_processing.processing.db_base.exec_adapter_base import ProcessorDbExecAdapterBase +from dl_core.data_processing.processing.db_base.processor_base import ExecutorBasedOperationProcessor from dl_core.data_processing.processing.source_db.selector_exec_adapter import SourceDbExecAdapter from dl_core.data_processing.selectors.base import DataSelectorAsyncBase -from dl_core.data_processing.stream_base import AbstractStream from dl_core.us_dataset import Dataset from dl_core.us_manager.local_cache import USEntryBuffer @attr.s -class SourceDbOperationProcessor(OperationProcessorAsyncBase): +class SourceDbOperationProcessor(ExecutorBasedOperationProcessor): _role: DataSourceRole = attr.ib(kw_only=True) _dataset: Dataset = attr.ib(kw_only=True) _selector: DataSelectorAsyncBase = attr.ib(kw_only=True) _row_count_hard_limit: Optional[int] = attr.ib(kw_only=True, default=None) _us_entry_buffer: USEntryBuffer = attr.ib(kw_only=True) + _is_bleeding_edge_user: bool = attr.ib(default=False) + _default_cache_ttl_config: CacheTTLConfig = attr.ib(default=None) - _db_ex_adapter: Optional[ProcessorDbExecAdapterBase] = attr.ib(init=False, default=None) - - _executors: ClassVar[Dict[Type[BaseOp], Type[OpExecutorAsync]]] = { - DownloadOp: DownloadOpExecutorAsync, - CalcOp: CalcOpExecutorAsync, - JoinOp: JoinOpExecutorAsync, - } - - async def ping(self) -> Optional[int]: - return 1 + def _make_cache_options_builder(self) -> DatasetOptionsBuilder: + return SelectorCacheOptionsBuilder( + default_ttl_config=self._default_cache_ttl_config, + is_bleeding_edge_user=self._is_bleeding_edge_user, + us_entry_buffer=self._us_entry_buffer, + ) - async def start(self) -> None: - self._db_ex_adapter = SourceDbExecAdapter( + def _make_db_ex_adapter(self) -> Optional[ProcessorDbExecAdapterBase]: + return SourceDbExecAdapter( + service_registry=self.service_registry, + reporting_enabled=self._reporting_enabled, role=self._role, dataset=self._dataset, selector=self._selector, row_count_hard_limit=self._row_count_hard_limit, us_entry_buffer=self._us_entry_buffer, + cache_options_builder=self._cache_options_builder, ) - - async def stop(self) -> None: - self._db_ex_adapter = None - - async def execute_operation(self, op: BaseOp, ctx: OpExecutionContext) -> AbstractStream: - opex_cls: Type[OpExecutorAsync] = self._executors[type(op)] - opex = opex_cls(db_ex_adapter=self._db_ex_adapter, ctx=ctx) # type: ignore # TODO: fix - return await opex.execute(op) diff --git a/lib/dl_core/dl_core/data_processing/processing/source_db/selector_exec_adapter.py b/lib/dl_core/dl_core/data_processing/processing/source_db/selector_exec_adapter.py index ceb460fbd..f99244002 100644 --- a/lib/dl_core/dl_core/data_processing/processing/source_db/selector_exec_adapter.py +++ b/lib/dl_core/dl_core/data_processing/processing/source_db/selector_exec_adapter.py @@ -1,7 +1,10 @@ from __future__ import annotations +import time from typing import ( TYPE_CHECKING, + Awaitable, + Callable, Optional, Sequence, Union, @@ -9,20 +12,28 @@ import attr +from dl_api_commons.reporting.models import ( + QueryExecutionCacheInfoReportingRecord, + QueryExecutionEndReportingRecord, + QueryExecutionStartReportingRecord, +) from dl_constants.enums import UserDataType +from dl_core.base_models import WorkbookEntryLocation from dl_core.data_processing.prepared_components.default_manager import DefaultPreparedComponentManager +from dl_core.data_processing.processing.context import OpExecutionContext from dl_core.data_processing.processing.db_base.exec_adapter_base import ProcessorDbExecAdapterBase -from dl_core.data_processing.selectors.dataset_base import DatasetDataSelectorAsyncBase -from dl_core.query.bi_query import QueryAndResultInfo +from dl_core.data_processing.selectors.utils import get_query_type +from dl_core.us_connection_base import ExecutorBasedMixin if TYPE_CHECKING: from sqlalchemy.sql.selectable import Select from dl_constants.enums import DataSourceRole + from dl_core.base_models import ConnectionRef from dl_core.data_processing.cache.primitives import LocalKeyRepresentation from dl_core.data_processing.prepared_components.manager_base import PreparedComponentManagerBase - from dl_core.data_processing.prepared_components.primitives import PreparedMultiFromInfo + from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo from dl_core.data_processing.selectors.base import DataSelectorAsyncBase from dl_core.data_processing.types import TValuesChunkStream from dl_core.us_dataset import Dataset @@ -50,33 +61,24 @@ def get_prep_component_manager(self) -> PreparedComponentManagerBase: assert self._prep_component_manager is not None return self._prep_component_manager - def _make_query_res_info( - self, - query: Union[str, Select], - user_types: Sequence[UserDataType], - ) -> QueryAndResultInfo: - query_res_info = QueryAndResultInfo( - query=query, # type: ignore # TODO: fix - user_types=list(user_types), - # This is basically legacy and will be removed. - # col_names are not really used anywhere, just passed around a lot. - # So we generate random ones here - col_names=[f"col_{i}" for i in range(len(user_types))], - ) - return query_res_info - async def _execute_and_fetch( self, *, query: Union[str, Select], user_types: Sequence[UserDataType], chunk_size: int, - joint_dsrc_info: Optional[PreparedMultiFromInfo] = None, + joint_dsrc_info: Optional[PreparedFromInfo] = None, query_id: str, + ctx: OpExecutionContext, + data_key: LocalKeyRepresentation, + preparation_callback: Optional[Callable[[], Awaitable[None]]], ) -> TValuesChunkStream: assert not isinstance(query, str), "String queries are not supported by source DB processor" assert joint_dsrc_info is not None, "joint_dsrc_info is required for source DB processor" + if preparation_callback is not None: + await preparation_callback() + query_res_info = self._make_query_res_info(query=query, user_types=user_types) data_stream = await self._selector.get_data_stream( query_id=query_id, @@ -87,22 +89,68 @@ async def _execute_and_fetch( ) return data_stream.data - def get_data_key( + def _save_start_exec_reporting_record( self, - *, query_id: str, - query: Union[str, Select], - user_types: Sequence[UserDataType], - joint_dsrc_info: Optional[PreparedMultiFromInfo] = None, - ) -> Optional[LocalKeyRepresentation]: - selector = self._selector - assert isinstance(selector, DatasetDataSelectorAsyncBase) - query_res_info = self._make_query_res_info(query=query, user_types=user_types) - query_execution_ctx = selector.build_query_execution_ctx( - query_res_info=query_res_info, - joint_dsrc_info=joint_dsrc_info, # type: ignore # TODO: fix - role=self._role, + compiled_query: str, + target_connection_ref: Optional[ConnectionRef], + ) -> None: + assert target_connection_ref is not None + target_connection = self._us_entry_buffer.get_entry(entry_id=target_connection_ref) + assert isinstance(target_connection, ExecutorBasedMixin) + + workbook_id = ( + target_connection.entry_key.workbook_id + if isinstance(target_connection.entry_key, WorkbookEntryLocation) + else None + ) + dataset_id = self._dataset.uuid + assert dataset_id is not None + record = QueryExecutionStartReportingRecord( + timestamp=time.time(), + query_id=query_id, + dataset_id=dataset_id, + query_type=get_query_type( + connection=target_connection, + conn_sec_mgr=self._service_registry.get_conn_executor_factory().conn_security_manager, + ), + connection_type=target_connection.conn_type, + conn_reporting_data=target_connection.get_conn_dto().conn_reporting_data(), + query=compiled_query, + workbook_id=workbook_id, + ) + self.add_reporting_record(record) + + def _save_end_exec_reporting_record(self, query_id: str, exec_exception: Optional[Exception]) -> None: + record = QueryExecutionEndReportingRecord( + timestamp=time.time(), + query_id=query_id, + exception=exec_exception, + ) + self.add_reporting_record(record) + + def _save_query_exec_cache_info_reporting_record(self, query_id: str, cache_full_hit: bool) -> None: + query_exec_cache_record = QueryExecutionCacheInfoReportingRecord( query_id=query_id, + cache_full_hit=cache_full_hit, + timestamp=time.time(), ) - data_key = selector.get_data_key(query_execution_ctx=query_execution_ctx) - return data_key + self.add_reporting_record(query_exec_cache_record) + + def pre_query_execute( + self, + query_id: str, + compiled_query: str, + target_connection_ref: Optional[ConnectionRef], + ) -> None: + self._save_start_exec_reporting_record( + query_id=query_id, + compiled_query=compiled_query, + target_connection_ref=target_connection_ref, + ) + + def post_query_execute(self, query_id: str, exec_exception: Optional[Exception]) -> None: + self._save_end_exec_reporting_record(query_id=query_id, exec_exception=exec_exception) + + def post_cache_usage(self, query_id: str, cache_full_hit: bool) -> None: + self._save_query_exec_cache_info_reporting_record(query_id=query_id, cache_full_hit=cache_full_hit) diff --git a/lib/dl_core/dl_core/data_processing/selectors/base.py b/lib/dl_core/dl_core/data_processing/selectors/base.py index e58521676..629d27b28 100644 --- a/lib/dl_core/dl_core/data_processing/selectors/base.py +++ b/lib/dl_core/dl_core/data_processing/selectors/base.py @@ -19,7 +19,7 @@ UserDataType, ) from dl_core.data_processing.cache.primitives import BIQueryCacheOptions - from dl_core.data_processing.prepared_components.primitives import PreparedMultiFromInfo + from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo from dl_core.data_processing.stream_base import DataStreamAsync from dl_core.query.bi_query import QueryAndResultInfo from dl_core.us_connection_base import ExecutorBasedMixin @@ -50,7 +50,7 @@ async def get_data_stream( query_id: Optional[str] = None, role: DataSourceRole, query_res_info: QueryAndResultInfo, - joint_dsrc_info: PreparedMultiFromInfo, + joint_dsrc_info: PreparedFromInfo, row_count_hard_limit: Optional[int] = None, stream_id: Optional[str] = None, ) -> DataStreamAsync: diff --git a/lib/dl_core/dl_core/data_processing/selectors/dataset_base.py b/lib/dl_core/dl_core/data_processing/selectors/dataset_base.py index 68ee917fc..7e98e804d 100644 --- a/lib/dl_core/dl_core/data_processing/selectors/dataset_base.py +++ b/lib/dl_core/dl_core/data_processing/selectors/dataset_base.py @@ -38,7 +38,7 @@ if TYPE_CHECKING: from dl_api_commons.reporting.registry import ReportingRegistry from dl_core.data_processing.cache.primitives import LocalKeyRepresentation - from dl_core.data_processing.prepared_components.primitives import PreparedMultiFromInfo + from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo from dl_core.data_processing.types import TValuesChunkStream from dl_core.services_registry import ServicesRegistry from dl_core.us_dataset import Dataset @@ -132,7 +132,7 @@ async def get_data_stream( query_id: Optional[str] = None, role: DataSourceRole, query_res_info: QueryAndResultInfo, - joint_dsrc_info: PreparedMultiFromInfo, + joint_dsrc_info: PreparedFromInfo, row_count_hard_limit: Optional[int] = None, stream_id: Optional[str] = None, ) -> DataStreamAsync: @@ -180,7 +180,6 @@ async def get_data_stream( meta=DataRequestMetaInfo( query_id=query_id, query=query_execution_ctx.compiled_query, - is_materialized=role != DataSourceRole.origin, data_source_list=data_source_list, ), data_key=data_key, @@ -200,7 +199,7 @@ def build_query_execution_ctx( query_id: str, query_res_info: QueryAndResultInfo, role: DataSourceRole, - joint_dsrc_info: PreparedMultiFromInfo, + joint_dsrc_info: PreparedFromInfo, ) -> BIQueryExecutionContext: compiled_query = utils.compile_query_for_debug(query_res_info.query, joint_dsrc_info.query_compiler.dialect) LOGGER.info(f"SQL query for dataset: {compiled_query}") diff --git a/lib/dl_core/dl_core/data_processing/selectors/dataset_cache_base.py b/lib/dl_core/dl_core/data_processing/selectors/dataset_cache_base.py index d997e4d33..98794ef05 100644 --- a/lib/dl_core/dl_core/data_processing/selectors/dataset_cache_base.py +++ b/lib/dl_core/dl_core/data_processing/selectors/dataset_cache_base.py @@ -10,7 +10,7 @@ from dl_core.data_processing.cache.exc import CachePreparationFailed from dl_core.data_processing.cache.primitives import LocalKeyRepresentation from dl_core.data_processing.cache.utils import SelectorCacheOptionsBuilder -from dl_core.data_processing.prepared_components.primitives import PreparedMultiFromInfo +from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo from dl_core.data_processing.selectors.base import BIQueryExecutionContext from dl_core.data_processing.selectors.dataset_base import DatasetDataSelectorAsyncBase from dl_core.query.bi_query import QueryAndResultInfo @@ -53,7 +53,7 @@ def build_query_execution_ctx( # type: ignore # TODO: fix query_id: str, query_res_info: QueryAndResultInfo, role: DataSourceRole, - joint_dsrc_info: PreparedMultiFromInfo, + joint_dsrc_info: PreparedFromInfo, ) -> BIQueryExecutionContext: q_exec_ctx: BIQueryExecutionContext = super().build_query_execution_ctx( query_id=query_id, @@ -69,6 +69,7 @@ def build_query_execution_ctx( # type: ignore # TODO: fix query=q_exec_ctx.query, user_types=q_exec_ctx.requested_bi_types, dataset=self.dataset, + data_key=LocalKeyRepresentation(), ) return attr.evolve( diff --git a/lib/dl_core/dl_core/data_processing/selectors/dataset_cached_lazy.py b/lib/dl_core/dl_core/data_processing/selectors/dataset_cached_lazy.py index 35f79c1b8..57a7f4a1f 100644 --- a/lib/dl_core/dl_core/data_processing/selectors/dataset_cached_lazy.py +++ b/lib/dl_core/dl_core/data_processing/selectors/dataset_cached_lazy.py @@ -12,7 +12,7 @@ from dl_constants.enums import DataSourceRole from dl_core.data_processing.selectors.base import BIQueryExecutionContext -from dl_core.data_processing.selectors.dataset_cached import CachedDatasetDataSelectorAsync +from dl_core.data_processing.selectors.db import DatasetDbDataSelectorAsync from dl_core.data_processing.streaming import ( AsyncChunked, AsyncChunkedBase, @@ -28,7 +28,7 @@ @attr.s -class LazyCachedDatasetDataSelectorAsync(CachedDatasetDataSelectorAsync): +class LazyCachedDatasetDataSelectorAsync(DatasetDbDataSelectorAsync): """ Lazy asynchronous cached dataset data selector """ diff --git a/lib/dl_core/dl_core/data_processing/stream_base.py b/lib/dl_core/dl_core/data_processing/stream_base.py index 3d7709e50..ede0a98e9 100644 --- a/lib/dl_core/dl_core/data_processing/stream_base.py +++ b/lib/dl_core/dl_core/data_processing/stream_base.py @@ -2,10 +2,14 @@ from typing import ( TYPE_CHECKING, + Any, + Awaitable, + Callable, Collection, Iterable, Optional, Sequence, + TypeVar, ) import attr @@ -24,26 +28,31 @@ import dl_core.data_source +_DATA_STREAM_TV = TypeVar("_DATA_STREAM_TV", bound="AbstractStream") + + @attr.s class AbstractStream: id: str = attr.ib(kw_only=True) names: Sequence[str] = attr.ib(kw_only=True) user_types: Sequence[UserDataType] = attr.ib(kw_only=True) + data_key: Optional[LocalKeyRepresentation] = attr.ib(kw_only=True) + meta: DataRequestMetaInfo = attr.ib(kw_only=True) + + def clone(self: _DATA_STREAM_TV, **kwargs: Any) -> _DATA_STREAM_TV: + return attr.evolve(self, **kwargs) @attr.s class DataRequestMetaInfo: query_id: Optional[str] = attr.ib(default=None, kw_only=True) query: Optional[str] = attr.ib(default=None, kw_only=True) - is_materialized: bool = attr.ib(default=False, kw_only=True) data_source_list: Collection[dl_core.data_source.DataSource] = attr.ib(default=(), kw_only=True) pass_db_query_to_user: bool = attr.ib(default=True, kw_only=True) @attr.s class DataStreamBase(AbstractStream): - meta: DataRequestMetaInfo = attr.ib(kw_only=True) - data_key: Optional[LocalKeyRepresentation] = attr.ib(kw_only=True) # data: either sync or async, defined in subclasses @property @@ -62,12 +71,19 @@ class DataStreamAsync(DataStreamBase): @attr.s -class VirtualStream(AbstractStream): +class AsyncVirtualStream(AbstractStream): """A representation of data that is being streamed in an external system (database)""" + _preparation_callback: Optional[Callable[[], Awaitable[None]]] = attr.ib(kw_only=True) + + async def prepare(self) -> None: + if self._preparation_callback is None: + return + await self._preparation_callback() + @attr.s -class DataSourceVS(VirtualStream): +class DataSourceVS(AsyncVirtualStream): """A data source avatar""" result_id: AvatarId = attr.ib(kw_only=True) @@ -76,12 +92,7 @@ class DataSourceVS(VirtualStream): @attr.s -class JointDataSourceVS(VirtualStream): +class JointDataSourceVS(AsyncVirtualStream): """Joint data source info""" joint_dsrc_info: PreparedMultiFromInfo = attr.ib(kw_only=True) - - -@attr.s -class CacheVirtualStream(DataStreamBase): - """Represents a virtual stream for cache processor. Has no real data""" diff --git a/lib/dl_core/dl_core/query/bi_query.py b/lib/dl_core/dl_core/query/bi_query.py index d40d9ed88..8fd2259c1 100644 --- a/lib/dl_core/dl_core/query/bi_query.py +++ b/lib/dl_core/dl_core/query/bi_query.py @@ -2,10 +2,8 @@ from itertools import chain from typing import ( - List, Optional, Sequence, - Set, Union, ) @@ -37,7 +35,21 @@ class BIQuery: limit: Optional[int] = None offset: Optional[int] = None - def get_required_avatar_ids(self) -> Set[str]: + def get_names(self) -> list[str]: + names: list[str] = [] + for expr_ctx in self.select_expressions: + assert expr_ctx.alias is not None + names.append(expr_ctx.alias) + return names + + def get_user_types(self) -> list[UserDataType]: + user_types: list[UserDataType] = [] + for expr_ctx in self.select_expressions: + assert expr_ctx.user_type is not None + user_types.append(expr_ctx.user_type) + return user_types + + def get_required_avatar_ids(self) -> set[str]: """Collect source avatar references from all expressions in the query.""" return set( @@ -62,5 +74,5 @@ def get_required_avatar_ids(self) -> Set[str]: @attr.s class QueryAndResultInfo: query: Select = attr.ib(kw_only=True) - user_types: List[UserDataType] = attr.ib(kw_only=True) - col_names: List[str] = attr.ib(kw_only=True) + user_types: list[UserDataType] = attr.ib(kw_only=True) + col_names: list[str] = attr.ib(kw_only=True) diff --git a/lib/dl_core/dl_core/services_registry/data_processor_factory.py b/lib/dl_core/dl_core/services_registry/data_processor_factory.py index d61ee52a6..77898e64a 100644 --- a/lib/dl_core/dl_core/services_registry/data_processor_factory.py +++ b/lib/dl_core/dl_core/services_registry/data_processor_factory.py @@ -13,13 +13,10 @@ ProcessorType, SelectorType, ) -from dl_core.data_processing.processing.processor import OperationProcessorAsyncBase -from dl_core.data_processing.processing.processor_dataset_cached import CachedDatasetProcessor +from dl_core.data_processing.processing.cache.processor import CacheOperationProcessor +from dl_core.data_processing.processing.db_base.processor_base import ExecutorBasedOperationProcessor from dl_core.data_processing.processing.source_db.processor import SourceDbOperationProcessor -from dl_core.services_registry.data_processor_factory_base import ( - BaseClosableDataProcessorFactory, - DataProcessorFactory, -) +from dl_core.services_registry.data_processor_factory_base import BaseClosableDataProcessorFactory from dl_core.us_dataset import Dataset from dl_core.us_manager.local_cache import USEntryBuffer @@ -29,6 +26,8 @@ @attr.s(frozen=True) class SourceDataProcessorFactory(BaseClosableDataProcessorFactory): + _is_bleeding_edge_user: bool = attr.ib(default=False) + def _create_data_processor( self, dataset: Dataset, @@ -36,26 +35,39 @@ def _create_data_processor( *, us_entry_buffer: USEntryBuffer, allow_cache_usage: bool = True, + reporting_enabled: bool = True, # SOURCE_DB-specific selector_type: Optional[SelectorType] = None, role: Optional[DataSourceRole] = None, **kwargs: Any, - ) -> OperationProcessorAsyncBase: + ) -> ExecutorBasedOperationProcessor: assert selector_type is not None assert role is not None selector_factory = self.services_registry.get_selector_factory() selector = selector_factory.get_dataset_selector( dataset=dataset, selector_type=selector_type, - allow_cache_usage=allow_cache_usage, + allow_cache_usage=False, # Use data processor-level cache us_entry_buffer=us_entry_buffer, ) processor = SourceDbOperationProcessor( + service_registry=self.services_registry, dataset=dataset, selector=selector, role=role, us_entry_buffer=us_entry_buffer, + is_bleeding_edge_user=self._is_bleeding_edge_user, + default_cache_ttl_config=self.services_registry.default_cache_ttl_config, ) + + if allow_cache_usage: + processor = CacheOperationProcessor( + service_registry=self.services_registry, + dataset=dataset, + main_processor=processor, + use_cache=allow_cache_usage, + ) + return processor @@ -68,28 +80,36 @@ def _create_data_processor( *, us_entry_buffer: USEntryBuffer, allow_cache_usage: bool = True, + reporting_enabled: bool = True, **kwargs: Any, - ) -> OperationProcessorAsyncBase: - processor: OperationProcessorAsyncBase + ) -> ExecutorBasedOperationProcessor: + processor: ExecutorBasedOperationProcessor dproc_srv_factory = self.services_registry.get_data_processor_service_factory() if dproc_srv_factory is None: raise ValueError("Processor factory was created without a PG pool. Cannot create a PG processor") data_proc_service = dproc_srv_factory(processor_type) - processor = data_proc_service.get_data_processor() + processor = data_proc_service.get_data_processor( + service_registry=self.services_registry, + reporting_enabled=reporting_enabled, + ) if allow_cache_usage: - processor = CachedDatasetProcessor( + processor = CacheOperationProcessor( service_registry=self.services_registry, dataset=dataset, main_processor=processor, + use_cache=allow_cache_usage, ) return processor @attr.s(frozen=True) -class DefaultDataProcessorFactory(DataProcessorFactory): +class DefaultDataProcessorFactory(BaseClosableDataProcessorFactory): + _is_bleeding_edge_user: bool = attr.ib(default=False) + + # internal props _source_data_processor_factory: SourceDataProcessorFactory = attr.ib(init=False) _compeng_data_processor_factory: CompengDataProcessorFactory = attr.ib(init=False) @@ -97,6 +117,7 @@ class DefaultDataProcessorFactory(DataProcessorFactory): def _make_source_data_processor_factory(self) -> SourceDataProcessorFactory: return SourceDataProcessorFactory( services_registry_ref=self._services_registry_ref, # type: ignore # mypy bug + is_bleeding_edge_user=self._is_bleeding_edge_user, ) @_compeng_data_processor_factory.default @@ -112,15 +133,17 @@ async def get_data_processor( *, us_entry_buffer: USEntryBuffer, allow_cache_usage: bool = True, + reporting_enabled: bool = True, **kwargs: Any, - ) -> OperationProcessorAsyncBase: - processor: OperationProcessorAsyncBase + ) -> ExecutorBasedOperationProcessor: + processor: ExecutorBasedOperationProcessor if processor_type == ProcessorType.SOURCE_DB: processor = await self._source_data_processor_factory.get_data_processor( dataset=dataset, processor_type=processor_type, us_entry_buffer=us_entry_buffer, allow_cache_usage=allow_cache_usage, + reporting_enabled=reporting_enabled, **kwargs, ) else: @@ -129,6 +152,7 @@ async def get_data_processor( processor_type=processor_type, us_entry_buffer=us_entry_buffer, allow_cache_usage=allow_cache_usage, + reporting_enabled=reporting_enabled, **kwargs, ) diff --git a/lib/dl_core/dl_core/services_registry/data_processor_factory_base.py b/lib/dl_core/dl_core/services_registry/data_processor_factory_base.py index 91304d6f9..2cb4d6f06 100644 --- a/lib/dl_core/dl_core/services_registry/data_processor_factory_base.py +++ b/lib/dl_core/dl_core/services_registry/data_processor_factory_base.py @@ -11,7 +11,7 @@ import attr from dl_constants.enums import ProcessorType -from dl_core.data_processing.processing.processor import OperationProcessorAsyncBase +from dl_core.data_processing.processing.db_base.processor_base import ExecutorBasedOperationProcessor from dl_core.us_dataset import Dataset from dl_core.us_manager.local_cache import USEntryBuffer from dl_core.utils import FutureRef @@ -40,8 +40,9 @@ async def get_data_processor( *, us_entry_buffer: USEntryBuffer, allow_cache_usage: bool = True, + reporting_enabled: bool = True, **kwargs: Any, - ) -> OperationProcessorAsyncBase: + ) -> ExecutorBasedOperationProcessor: pass @abc.abstractmethod @@ -51,7 +52,7 @@ async def close_async(self) -> None: @attr.s(frozen=True) class BaseClosableDataProcessorFactory(DataProcessorFactory): - _created_data_processors: list[OperationProcessorAsyncBase] = attr.ib(factory=list, init=False) + _created_data_processors: list[ExecutorBasedOperationProcessor] = attr.ib(factory=list, init=False) async def get_data_processor( self, @@ -60,13 +61,15 @@ async def get_data_processor( *, us_entry_buffer: USEntryBuffer, allow_cache_usage: bool = True, + reporting_enabled: bool = True, **kwargs: Any, - ) -> OperationProcessorAsyncBase: + ) -> ExecutorBasedOperationProcessor: processor = self._create_data_processor( dataset, processor_type, us_entry_buffer=us_entry_buffer, allow_cache_usage=allow_cache_usage, + reporting_enabled=reporting_enabled, **kwargs, ) self._created_data_processors.append(processor) @@ -74,7 +77,6 @@ async def get_data_processor( await processor.start() return processor - @abc.abstractmethod def _create_data_processor( # type: ignore # TODO: fix self, dataset: Dataset, @@ -82,12 +84,13 @@ def _create_data_processor( # type: ignore # TODO: fix *, us_entry_buffer: USEntryBuffer, allow_cache_usage: bool = True, + reporting_enabled: bool = True, **kwargs, - ) -> OperationProcessorAsyncBase: - pass + ) -> ExecutorBasedOperationProcessor: + raise NotImplementedError async def close_async(self) -> None: - async def close_processor(s: "OperationProcessorAsyncBase") -> None: + async def close_processor(s: ExecutorBasedOperationProcessor) -> None: # noinspection PyBroadException try: await s.end() diff --git a/lib/dl_core/dl_core/services_registry/selector_factory.py b/lib/dl_core/dl_core/services_registry/selector_factory.py index af3f52ef7..784e00bdb 100644 --- a/lib/dl_core/dl_core/services_registry/selector_factory.py +++ b/lib/dl_core/dl_core/services_registry/selector_factory.py @@ -35,7 +35,7 @@ def get_dataset_selector( selector_type: SelectorType, *, us_entry_buffer: USEntryBuffer, - allow_cache_usage: bool = True, + allow_cache_usage: bool = True, # TODO: Remove cache from selectors ) -> DatasetDataSelectorAsyncBase: pass @@ -65,7 +65,12 @@ def get_dataset_selector( us_entry_buffer: USEntryBuffer, allow_cache_usage: bool = True, ) -> DatasetDataSelectorAsyncBase: - selector = self._create_dataset_selector(dataset, selector_type, us_entry_buffer=us_entry_buffer) + selector = self._create_dataset_selector( + dataset=dataset, + selector_type=selector_type, + us_entry_buffer=us_entry_buffer, + allow_cache_usage=allow_cache_usage, + ) self._created_dataset_selectors.append(selector) return selector @@ -116,8 +121,8 @@ def _create_dataset_selector( return LazyCachedDatasetDataSelectorAsync( # type: ignore # TODO: fix dataset=dataset, service_registry=self.services_registry, - allow_cache_usage=allow_cache_usage, - is_bleeding_edge_user=self._is_bleeding_edge_user, + # allow_cache_usage=allow_cache_usage, + # is_bleeding_edge_user=self._is_bleeding_edge_user, us_entry_buffer=us_entry_buffer, ) else: diff --git a/lib/dl_core/dl_core/services_registry/sr_factories.py b/lib/dl_core/dl_core/services_registry/sr_factories.py index 0eacbe025..bf4652746 100644 --- a/lib/dl_core/dl_core/services_registry/sr_factories.py +++ b/lib/dl_core/dl_core/services_registry/sr_factories.py @@ -24,6 +24,7 @@ from dl_configs.enums import RequiredService from dl_constants.enums import ProcessorType from dl_core.services_registry.conn_executor_factory import DefaultConnExecutorFactory +from dl_core.services_registry.data_processor_factory import DefaultDataProcessorFactory from dl_core.services_registry.file_uploader_client_factory import ( FileUploaderClientFactory, FileUploaderSettings, @@ -162,6 +163,10 @@ def make_service_registry( services_registry_ref=sr_ref, is_bleeding_edge_user=self.is_bleeding_edge_user(request_context_info), ), + data_processor_factory=DefaultDataProcessorFactory( + services_registry_ref=sr_ref, + is_bleeding_edge_user=self.is_bleeding_edge_user(request_context_info), + ), file_uploader_client_factory=FileUploaderClientFactory( self.file_uploader_settings, ) diff --git a/lib/dl_core_testing/dl_core_testing/data.py b/lib/dl_core_testing/dl_core_testing/data.py index 1b3d78848..829328540 100644 --- a/lib/dl_core_testing/dl_core_testing/data.py +++ b/lib/dl_core_testing/dl_core_testing/data.py @@ -18,6 +18,7 @@ ) from dl_core.components.accessor import DatasetComponentAccessor from dl_core.components.ids import AvatarId +from dl_core.data_processing.cache.primitives import LocalKeyRepresentation from dl_core.data_processing.prepared_components.default_manager import DefaultPreparedComponentManager from dl_core.data_processing.processing.operation import ( CalcOp, @@ -26,6 +27,7 @@ ) from dl_core.data_processing.selectors.base import DataSelectorAsyncBase from dl_core.data_processing.stream_base import ( + DataRequestMetaInfo, DataSourceVS, DataStream, DataStreamAsync, @@ -89,6 +91,7 @@ def _get_avatar_virtual_data_stream( prep_src_info = prep_component_manager.get_prepared_source( avatar_id=avatar_id, alias=alias, from_subquery=from_subquery, subquery_limit=subquery_limit ) + data_key = LocalKeyRepresentation().extend(part_type="avatar_id", part_content=avatar_id) return DataSourceVS( id=stream_id, alias=alias, @@ -96,6 +99,9 @@ def _get_avatar_virtual_data_stream( names=prep_src_info.col_names, user_types=prep_src_info.user_types, prep_src_info=prep_src_info, + data_key=data_key, + meta=DataRequestMetaInfo(data_source_list=prep_src_info.data_source_list), + preparation_callback=None, ) async def get_data_stream_async( @@ -109,6 +115,7 @@ async def get_data_stream_async( join_on_expressions: Collection[JoinOnExpressionCtx] = (), from_subquery: bool = False, subquery_limit: Optional[int] = None, + allow_cache_usage: bool = True, ) -> DataStreamAsync: if root_avatar_id is None: root_avatar_id = self._ds_accessor.get_root_avatar_strict().id @@ -122,6 +129,7 @@ async def get_data_stream_async( selector_type=self._selector_type, role=role, us_entry_buffer=self._us_entry_buffer, + allow_cache_usage=allow_cache_usage, ) streams = [ self._get_avatar_virtual_data_stream( @@ -146,6 +154,7 @@ async def get_data_stream_async( result_id="res", bi_query=bi_query, alias="res", + data_key_data="__qwerty", # just a random hashable ), DownloadOp( source_stream_id="calc_0", @@ -173,6 +182,7 @@ def get_data_stream( join_on_expressions: Collection[JoinOnExpressionCtx] = (), from_subquery: bool = False, subquery_limit: Optional[int] = None, + allow_cache_usage: bool = True, ) -> DataStream: async_data_stream = await_sync( self.get_data_stream_async( @@ -184,6 +194,7 @@ def get_data_stream( join_on_expressions=join_on_expressions, from_subquery=from_subquery, subquery_limit=subquery_limit, + allow_cache_usage=allow_cache_usage, ) ) return DataStream( diff --git a/lib/dl_query_processing/dl_query_processing/execution/executor.py b/lib/dl_query_processing/dl_query_processing/execution/executor.py index 6912a459c..b7e09c048 100644 --- a/lib/dl_query_processing/dl_query_processing/execution/executor.py +++ b/lib/dl_query_processing/dl_query_processing/execution/executor.py @@ -21,6 +21,7 @@ ) from dl_constants.types import TBIDataRow from dl_core.components.ids import AvatarId +from dl_core.data_processing.cache.primitives import LocalKeyRepresentation from dl_core.data_processing.prepared_components.default_manager import DefaultPreparedComponentManager from dl_core.data_processing.processing.operation import ( BaseOp, @@ -32,6 +33,7 @@ from dl_core.data_processing.processing.processor import OperationProcessorAsyncBase from dl_core.data_processing.stream_base import ( AbstractStream, + DataRequestMetaInfo, DataSourceVS, DataStreamAsync, ) @@ -234,6 +236,7 @@ async def _process_multi_query( alias=translated_flat_query.alias, bi_query=bi_query, dest_stream_id=result_to_stream_id_map[result_id], + data_key_data=translated_flat_query.extract, ) assert isinstance(op, CalcOp) # for typing operations.append(op) @@ -298,6 +301,7 @@ def _make_source_db_input_streams( subquery_limit=subquery_limit, ) result_id = avatar_id + data_key = LocalKeyRepresentation().extend(part_type="avatar_id", part_content=avatar_id) stream = DataSourceVS( id=make_id(), alias=alias, @@ -305,6 +309,9 @@ def _make_source_db_input_streams( names=prep_src_info.col_names, user_types=prep_src_info.user_types, prep_src_info=prep_src_info, + data_key=data_key, + meta=DataRequestMetaInfo(data_source_list=prep_src_info.data_source_list), + preparation_callback=None, ) streams_by_result_id[result_id] = stream stream_aliases[stream.id] = stream.alias