From f11d9f36af41f528b8bd0f3175a3de2566be141a Mon Sep 17 00:00:00 2001 From: Grigory Statsenko Date: Thu, 21 Dec 2023 14:43:53 +0100 Subject: [PATCH] Removed selectors (#179) --- .../processing/source_db/processor.py | 3 - .../source_db/selector_exec_adapter.py | 88 ++++++- .../data_processing/selectors/__init__.py | 0 .../dl_core/data_processing/selectors/base.py | 72 ------ .../data_processing/selectors/dataset_base.py | 231 ------------------ .../dl_core/data_processing/selectors/db.py | 89 ------- .../data_processing/selectors/utils.py | 72 ------ .../data_processor_factory.py | 7 - .../services_registry/selector_factory.py | 111 --------- .../dl_core/services_registry/sr_factories.py | 5 - .../dl_core/services_registry/top_level.py | 23 -- 11 files changed, 79 insertions(+), 622 deletions(-) delete mode 100644 lib/dl_core/dl_core/data_processing/selectors/__init__.py delete mode 100644 lib/dl_core/dl_core/data_processing/selectors/base.py delete mode 100644 lib/dl_core/dl_core/data_processing/selectors/dataset_base.py delete mode 100644 lib/dl_core/dl_core/data_processing/selectors/db.py delete mode 100644 lib/dl_core/dl_core/data_processing/selectors/utils.py delete mode 100644 lib/dl_core/dl_core/services_registry/selector_factory.py diff --git a/lib/dl_core/dl_core/data_processing/processing/source_db/processor.py b/lib/dl_core/dl_core/data_processing/processing/source_db/processor.py index 0133dd3a7..c95221de4 100644 --- a/lib/dl_core/dl_core/data_processing/processing/source_db/processor.py +++ b/lib/dl_core/dl_core/data_processing/processing/source_db/processor.py @@ -13,7 +13,6 @@ from dl_core.data_processing.processing.db_base.exec_adapter_base import ProcessorDbExecAdapterBase from dl_core.data_processing.processing.db_base.processor_base import ExecutorBasedOperationProcessor from dl_core.data_processing.processing.source_db.selector_exec_adapter import SourceDbExecAdapter -from dl_core.data_processing.selectors.base import DataSelectorAsyncBase from dl_core.us_dataset import Dataset from dl_core.us_manager.local_cache import USEntryBuffer @@ -22,7 +21,6 @@ class SourceDbOperationProcessor(ExecutorBasedOperationProcessor): _role: DataSourceRole = attr.ib(kw_only=True) _dataset: Dataset = attr.ib(kw_only=True) - _selector: DataSelectorAsyncBase = attr.ib(kw_only=True) _row_count_hard_limit: Optional[int] = attr.ib(kw_only=True, default=None) _us_entry_buffer: USEntryBuffer = attr.ib(kw_only=True) _is_bleeding_edge_user: bool = attr.ib(default=False) @@ -41,7 +39,6 @@ def _make_db_ex_adapter(self) -> Optional[ProcessorDbExecAdapterBase]: reporting_enabled=self._reporting_enabled, role=self._role, dataset=self._dataset, - selector=self._selector, row_count_hard_limit=self._row_count_hard_limit, us_entry_buffer=self._us_entry_buffer, cache_options_builder=self._cache_options_builder, diff --git a/lib/dl_core/dl_core/data_processing/processing/source_db/selector_exec_adapter.py b/lib/dl_core/dl_core/data_processing/processing/source_db/selector_exec_adapter.py index f53669cf8..f9ce59f61 100644 --- a/lib/dl_core/dl_core/data_processing/processing/source_db/selector_exec_adapter.py +++ b/lib/dl_core/dl_core/data_processing/processing/source_db/selector_exec_adapter.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging import time from typing import ( TYPE_CHECKING, @@ -17,34 +18,62 @@ QueryExecutionEndReportingRecord, QueryExecutionStartReportingRecord, ) -from dl_constants.enums import UserDataType +from dl_constants.enums import ( + ReportingQueryType, + UserDataType, +) +from dl_core import utils from dl_core.base_models import WorkbookEntryLocation +from dl_core.connection_executors import ConnExecutorQuery from dl_core.data_processing.prepared_components.default_manager import DefaultPreparedComponentManager from dl_core.data_processing.processing.context import OpExecutionContext from dl_core.data_processing.processing.db_base.exec_adapter_base import ProcessorDbExecAdapterBase -from dl_core.data_processing.selectors.utils import get_query_type -from dl_core.us_connection_base import ExecutorBasedMixin +from dl_core.data_processing.streaming import ( + AsyncChunked, + AsyncChunkedBase, + LazyAsyncChunked, +) +from dl_core.query.bi_query import QueryAndResultInfo +from dl_core.us_connection_base import ( + ClassicConnectionSQL, + ConnectionBase, + ExecutorBasedMixin, +) if TYPE_CHECKING: from sqlalchemy.sql.selectable import Select from dl_constants.enums import DataSourceRole + from dl_constants.types import TBIDataValue from dl_core.base_models import ConnectionRef + from dl_core.connections_security.base import ConnectionSecurityManager from dl_core.data_processing.cache.primitives import LocalKeyRepresentation from dl_core.data_processing.prepared_components.manager_base import PreparedComponentManagerBase from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo - from dl_core.data_processing.selectors.base import DataSelectorAsyncBase from dl_core.data_processing.types import TValuesChunkStream from dl_core.us_dataset import Dataset from dl_core.us_manager.local_cache import USEntryBuffer +LOGGER = logging.getLogger(__name__) + + +def get_query_type(connection: ConnectionBase, conn_sec_mgr: ConnectionSecurityManager) -> ReportingQueryType: + if connection.is_always_internal_source: + return ReportingQueryType.internal + + if isinstance(connection, ClassicConnectionSQL): + if conn_sec_mgr.is_internal_connection(connection.get_conn_dto()): + return ReportingQueryType.internal + + return ReportingQueryType.external + + @attr.s class SourceDbExecAdapter(ProcessorDbExecAdapterBase): # noqa _role: DataSourceRole = attr.ib(kw_only=True) _dataset: Dataset = attr.ib(kw_only=True) - _selector: DataSelectorAsyncBase = attr.ib(kw_only=True) _prep_component_manager: Optional[PreparedComponentManagerBase] = attr.ib(kw_only=True, default=None) _row_count_hard_limit: Optional[int] = attr.ib(kw_only=True, default=None) _us_entry_buffer: USEntryBuffer = attr.ib(kw_only=True) @@ -61,6 +90,49 @@ def get_prep_component_manager(self) -> PreparedComponentManagerBase: assert self._prep_component_manager is not None return self._prep_component_manager + async def _get_data_stream_from_source( + self, + *, + query_res_info: QueryAndResultInfo, + joint_dsrc_info: PreparedFromInfo, + row_count_hard_limit: Optional[int] = None, + ) -> TValuesChunkStream: + """Generate data stream from a data source""" + + compiled_query = utils.compile_query_for_debug(query_res_info.query, joint_dsrc_info.query_compiler.dialect) + LOGGER.info(f"SQL query for dataset: {compiled_query}") + + assert joint_dsrc_info.target_connection_ref is not None + target_connection = self._us_entry_buffer.get_entry(joint_dsrc_info.target_connection_ref) + assert isinstance(target_connection, ExecutorBasedMixin) + + ce_factory = self._service_registry.get_conn_executor_factory() + ce = ce_factory.get_async_conn_executor(target_connection) + + exec_result = await ce.execute( + ConnExecutorQuery( + query=query_res_info.query, + db_name=joint_dsrc_info.db_name, + user_types=query_res_info.user_types, + debug_compiled_query=compiled_query, + chunk_size=None, + ) + ) + wrapped_result_iter = AsyncChunked(chunked_data=exec_result.result) + + async def initialize_data_stream() -> AsyncChunkedBase[list[TBIDataValue]]: + return wrapped_result_iter # type: ignore # TODO: fix + + async def finalize_data_stream() -> None: + pass + + result_iter = LazyAsyncChunked(initializer=initialize_data_stream, finalizer=finalize_data_stream) + + if row_count_hard_limit is not None: + result_iter = result_iter.limit(max_count=row_count_hard_limit) + + return result_iter + async def _execute_and_fetch( self, *, @@ -80,14 +152,12 @@ async def _execute_and_fetch( await preparation_callback() query_res_info = self._make_query_res_info(query=query, user_types=user_types) - data_stream = await self._selector.get_data_stream( - query_id=query_id, - role=self._role, + data_stream_data = await self._get_data_stream_from_source( joint_dsrc_info=joint_dsrc_info, query_res_info=query_res_info, row_count_hard_limit=self._row_count_hard_limit, ) - return data_stream.data + return data_stream_data def _save_start_exec_reporting_record( self, diff --git a/lib/dl_core/dl_core/data_processing/selectors/__init__.py b/lib/dl_core/dl_core/data_processing/selectors/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/lib/dl_core/dl_core/data_processing/selectors/base.py b/lib/dl_core/dl_core/data_processing/selectors/base.py deleted file mode 100644 index 629d27b28..000000000 --- a/lib/dl_core/dl_core/data_processing/selectors/base.py +++ /dev/null @@ -1,72 +0,0 @@ -from __future__ import annotations - -import abc -from typing import ( - TYPE_CHECKING, - List, - Optional, - Sequence, -) - -import attr - - -if TYPE_CHECKING: - from sqlalchemy.sql.selectable import Select - - from dl_constants.enums import ( - DataSourceRole, - UserDataType, - ) - from dl_core.data_processing.cache.primitives import BIQueryCacheOptions - from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo - from dl_core.data_processing.stream_base import DataStreamAsync - from dl_core.query.bi_query import QueryAndResultInfo - from dl_core.us_connection_base import ExecutorBasedMixin - - -class NoData(Exception): - pass - - -@attr.s(frozen=True, auto_attribs=True) -class BIQueryExecutionContext: - query_id: str - query: Select - compiled_query: str # for logs only - target_connection: ExecutorBasedMixin - requested_bi_types: List[UserDataType] - result_col_names: Sequence[str] - target_db_name: Optional[str] = attr.ib(default=None) - cache_options: Optional[BIQueryCacheOptions] = attr.ib(default=None) - connect_args: dict = attr.ib(default=None) - - -class DataSelectorAsyncBase(abc.ABC): - @abc.abstractmethod - async def get_data_stream( - self, - *, - query_id: Optional[str] = None, - role: DataSourceRole, - query_res_info: QueryAndResultInfo, - joint_dsrc_info: PreparedFromInfo, - row_count_hard_limit: Optional[int] = None, - stream_id: Optional[str] = None, - ) -> DataStreamAsync: - """ - Fetch data from the database. - Return SQL query compiled as a string and an iterable or result rows. - - :param role: data source role to use - :param query_res_info: an object containing a compiled SA ``Select`` and info about result columns - :param joint_dsrc_info: represents the combined SQL data source (joined tables) - :param row_count_hard_limit: Result rows count limit. Will raise ResultRowCountLimitExceeded if receive more. - :return: ``DataStreamAsync`` instance containing an async iterable of lists - and some metadata such as compiled query, column names and data types - """ - - raise NotImplementedError - - async def close(self) -> None: - pass diff --git a/lib/dl_core/dl_core/data_processing/selectors/dataset_base.py b/lib/dl_core/dl_core/data_processing/selectors/dataset_base.py deleted file mode 100644 index 7e98e804d..000000000 --- a/lib/dl_core/dl_core/data_processing/selectors/dataset_base.py +++ /dev/null @@ -1,231 +0,0 @@ -from __future__ import annotations - -import abc -from contextlib import contextmanager -import logging -import time -from typing import ( - TYPE_CHECKING, - Generator, - Optional, -) - -import attr - -from dl_api_commons.reporting.models import ( - QueryExecutionEndReportingRecord, - QueryExecutionStartReportingRecord, -) -from dl_constants.enums import DataSourceRole -from dl_core import utils -from dl_core.base_models import WorkbookEntryLocation -from dl_core.data_processing.selectors.base import ( - BIQueryExecutionContext, - DataSelectorAsyncBase, - NoData, -) -from dl_core.data_processing.selectors.utils import get_query_type -from dl_core.data_processing.stream_base import ( - DataRequestMetaInfo, - DataStreamAsync, -) -import dl_core.exc as exc -from dl_core.query.bi_query import QueryAndResultInfo -from dl_core.us_connection_base import ExecutorBasedMixin -from dl_core.utils import make_id - - -if TYPE_CHECKING: - from dl_api_commons.reporting.registry import ReportingRegistry - from dl_core.data_processing.cache.primitives import LocalKeyRepresentation - from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo - from dl_core.data_processing.types import TValuesChunkStream - from dl_core.services_registry import ServicesRegistry - from dl_core.us_dataset import Dataset - from dl_core.us_manager.local_cache import USEntryBuffer - - -LOGGER = logging.getLogger(__name__) - - -# noinspection PyDataclass -@attr.s -class DatasetDataSelectorAsyncBase(DataSelectorAsyncBase, metaclass=abc.ABCMeta): - dataset: Dataset = attr.ib(kw_only=True) - _us_entry_buffer: USEntryBuffer = attr.ib(kw_only=True) - _service_registry: ServicesRegistry = attr.ib(kw_only=True) # Service registry override - - """ - Base class for dataset-dependent asynchronous data selectors - :param dataset: the dataset to operate on - """ - - def _save_start_exec_reporting_record( - self, - query_execution_ctx: BIQueryExecutionContext, - ) -> None: - connection = query_execution_ctx.target_connection - workbook_id = ( - connection.entry_key.workbook_id if isinstance(connection.entry_key, WorkbookEntryLocation) else None - ) - report = QueryExecutionStartReportingRecord( - timestamp=time.time(), - query_id=query_execution_ctx.query_id, - dataset_id=self.dataset.uuid, # type: ignore # TODO: fix - query_type=get_query_type( - connection=connection, - conn_sec_mgr=self.service_registry.get_conn_executor_factory().conn_security_manager, - ), - connection_type=connection.conn_type, - conn_reporting_data=connection.get_conn_dto().conn_reporting_data(), - query=query_execution_ctx.compiled_query, - workbook_id=workbook_id, - ) - self.reporting_registry.save_reporting_record(report=report) - - def _save_end_exec_reporting_record( - self, - query_execution_ctx: BIQueryExecutionContext, - exec_exception: Optional[Exception], - ) -> None: - report = QueryExecutionEndReportingRecord( - timestamp=time.time(), - query_id=query_execution_ctx.query_id, - exception=exec_exception, - ) - self.reporting_registry.save_reporting_record(report=report) - - def pre_exec(self, query_execution_ctx: BIQueryExecutionContext) -> None: - self._save_start_exec_reporting_record(query_execution_ctx) - - def post_exec( - self, - query_execution_ctx: BIQueryExecutionContext, - exec_exception: Optional[Exception], - ) -> None: - self._save_end_exec_reporting_record(query_execution_ctx, exec_exception) - - @contextmanager - def _execution_cm( - self, - query_execution_ctx: BIQueryExecutionContext, - ) -> Generator[None, None, None]: - self.pre_exec(query_execution_ctx=query_execution_ctx) - exec_exception: Optional[Exception] = None - try: - yield - except Exception as fired_exception: - exec_exception = fired_exception - raise - finally: - self.post_exec(query_execution_ctx=query_execution_ctx, exec_exception=exec_exception) - - def get_data_key( - self, - query_execution_ctx: BIQueryExecutionContext, - ) -> Optional[LocalKeyRepresentation]: - return None - - async def get_data_stream( - self, - *, - query_id: Optional[str] = None, - role: DataSourceRole, - query_res_info: QueryAndResultInfo, - joint_dsrc_info: PreparedFromInfo, - row_count_hard_limit: Optional[int] = None, - stream_id: Optional[str] = None, - ) -> DataStreamAsync: - """Generate data stream from a data source""" - - query_id = query_id or make_id() - query_execution_ctx = self.build_query_execution_ctx( - query_id=query_id, - query_res_info=query_res_info, - role=role, - joint_dsrc_info=joint_dsrc_info, - ) - - if not isinstance(query_execution_ctx.target_connection, ExecutorBasedMixin): - raise exc.NotAvailableError( - f"Connection {type(query_execution_ctx.target_connection).__qualname__}" - f" does not support async data selection" - ) - - with self._execution_cm(query_execution_ctx): - result_iter = await self.execute_query_context( - role=role, - query_execution_ctx=query_execution_ctx, - row_count_hard_limit=row_count_hard_limit, - ) - if result_iter is None: - raise NoData("Got no data from selector") - - if row_count_hard_limit is not None: - result_iter = result_iter.limit(max_count=row_count_hard_limit) - - data_key = self.get_data_key(query_execution_ctx=query_execution_ctx) - stream_id = stream_id or make_id() - LOGGER.info("Making data stream %s,", stream_id, extra=dict(data_key=str(data_key))) - assert stream_id is not None - - data_source_list = joint_dsrc_info.data_source_list - assert data_source_list is not None - - return DataStreamAsync( - id=stream_id, - user_types=query_execution_ctx.requested_bi_types, - names=query_execution_ctx.result_col_names, - data=result_iter, - meta=DataRequestMetaInfo( - query_id=query_id, - query=query_execution_ctx.compiled_query, - data_source_list=data_source_list, - ), - data_key=data_key, - ) - - @property - def service_registry(self) -> ServicesRegistry: - return self._service_registry - - @property - def reporting_registry(self) -> ReportingRegistry: - return self.service_registry.get_reporting_registry() - - def build_query_execution_ctx( - self, - *, - query_id: str, - query_res_info: QueryAndResultInfo, - role: DataSourceRole, - joint_dsrc_info: PreparedFromInfo, - ) -> BIQueryExecutionContext: - compiled_query = utils.compile_query_for_debug(query_res_info.query, joint_dsrc_info.query_compiler.dialect) - LOGGER.info(f"SQL query for dataset: {compiled_query}") - - assert joint_dsrc_info.target_connection_ref is not None - target_connection = self._us_entry_buffer.get_entry(joint_dsrc_info.target_connection_ref) - assert isinstance(target_connection, ExecutorBasedMixin) - - return BIQueryExecutionContext( - query_id=query_id, - query=query_res_info.query, - compiled_query=compiled_query, - target_connection=target_connection, - target_db_name=joint_dsrc_info.db_name, - requested_bi_types=query_res_info.user_types, - result_col_names=query_res_info.col_names, - cache_options=None, - connect_args=joint_dsrc_info.connect_args, - ) - - @abc.abstractmethod - async def execute_query_context( - self, - role: DataSourceRole, - query_execution_ctx: BIQueryExecutionContext, - row_count_hard_limit: Optional[int] = None, - ) -> Optional[TValuesChunkStream]: - """Get data using info in ``query_execution_ctx``""" - raise NotImplementedError diff --git a/lib/dl_core/dl_core/data_processing/selectors/db.py b/lib/dl_core/dl_core/data_processing/selectors/db.py deleted file mode 100644 index 93b714de7..000000000 --- a/lib/dl_core/dl_core/data_processing/selectors/db.py +++ /dev/null @@ -1,89 +0,0 @@ -from __future__ import annotations - -from typing import ( - TYPE_CHECKING, - Optional, - Sequence, -) - -import attr - -from dl_constants.enums import DataSourceRole -from dl_core.connection_executors import ConnExecutorQuery -from dl_core.data_processing.selectors.dataset_base import DatasetDataSelectorAsyncBase -from dl_core.data_processing.selectors.utils import select_data_context -from dl_core.data_processing.streaming import ( - AsyncChunked, - AsyncChunkedBase, - LazyAsyncChunked, -) -import dl_core.exc as exc -from dl_core.us_connection_base import ExecutorBasedMixin - - -if TYPE_CHECKING: - from dl_constants.types import TBIDataValue - from dl_core.data_processing.selectors.base import BIQueryExecutionContext - - -@attr.s -class DatasetDbDataSelectorAsync(DatasetDataSelectorAsyncBase): - """Async selector that fetches data from the database""" - - # TODO: Merge all selector logic into data processors - - _active_queries: list[BIQueryExecutionContext] = attr.ib(init=False, factory=list) - - def post_exec( - self, - query_execution_ctx: BIQueryExecutionContext, - exec_exception: Optional[Exception], - ) -> None: - """Lazy behaviour: since the query has not even been executed at this point, do nothing.""" - - async def close(self) -> None: - """Close all active queries""" - while self._active_queries: - await self._close_and_remove_active_query(query_execution_ctx=self._active_queries[-1]) - - async def _close_and_remove_active_query(self, query_execution_ctx: BIQueryExecutionContext): # type: ignore # TODO: fix - if query_execution_ctx in self._active_queries: - self._active_queries.remove(query_execution_ctx) - super().post_exec(query_execution_ctx=query_execution_ctx, exec_exception=None) - - async def execute_query_context( - self, - role: DataSourceRole, - query_execution_ctx: BIQueryExecutionContext, - row_count_hard_limit: Optional[int] = None, - ) -> Optional[AsyncChunkedBase[Sequence[TBIDataValue]]]: - if not isinstance(query_execution_ctx.target_connection, ExecutorBasedMixin): - raise exc.NotAvailableError( - f"Connection {type(query_execution_ctx.target_connection).__qualname__}" - f" does not support async data selection" - ) - - with select_data_context(role=role): - ce_factory = self.service_registry.get_conn_executor_factory() - ce = ce_factory.get_async_conn_executor(query_execution_ctx.target_connection) - - exec_result = await ce.execute( - ConnExecutorQuery( - query=query_execution_ctx.query, - db_name=query_execution_ctx.target_db_name, - user_types=query_execution_ctx.requested_bi_types, - # connect_args=query_execution_ctx.connect_args, - debug_compiled_query=query_execution_ctx.compiled_query, - chunk_size=None, - ) - ) - wrapped_result_iter = AsyncChunked(chunked_data=exec_result.result) - - async def initialize_data_stream() -> AsyncChunkedBase[list[TBIDataValue]]: - self._active_queries.append(query_execution_ctx) - return wrapped_result_iter # type: ignore # TODO: fix - - async def finalize_data_stream() -> None: - await self._close_and_remove_active_query(query_execution_ctx=query_execution_ctx) - - return LazyAsyncChunked(initializer=initialize_data_stream, finalizer=finalize_data_stream) diff --git a/lib/dl_core/dl_core/data_processing/selectors/utils.py b/lib/dl_core/dl_core/data_processing/selectors/utils.py deleted file mode 100644 index 8480290b4..000000000 --- a/lib/dl_core/dl_core/data_processing/selectors/utils.py +++ /dev/null @@ -1,72 +0,0 @@ -from __future__ import annotations - -from contextlib import contextmanager -import logging -from typing import ( - TYPE_CHECKING, - Generator, - Sequence, -) - -import sqlalchemy as sa - -from dl_constants.enums import ( - DataSourceRole, - ReportingQueryType, -) -import dl_core.exc as exc -from dl_core.query.bi_query import BIQuery -from dl_core.query.expression import ExpressionCtx -from dl_core.us_connection_base import ClassicConnectionSQL - - -if TYPE_CHECKING: - from dl_core.connections_security.base import ConnectionSecurityManager - from dl_core.us_connection_base import ConnectionBase - - -LOGGER = logging.getLogger(__name__) - - -@contextmanager -def select_data_context(role: DataSourceRole) -> Generator[None, None, None]: - """ - Re-raise database-related errors taking dataset settings into account - """ - try: - yield - except exc.SourceDoesNotExist as err: - if role == DataSourceRole.materialization: - raise exc.MaterializationNotFinished(db_message=err.db_message, query=err.query) - raise - - -def get_value_range_query(expression: ExpressionCtx, dimension_filters: Sequence[ExpressionCtx] = ()) -> BIQuery: - return BIQuery( - select_expressions=[ - ExpressionCtx( - expression=sa.func.min(expression.expression), - alias="min_val", - user_type=expression.user_type, - avatar_ids=expression.avatar_ids, - ), - ExpressionCtx( - expression=sa.func.max(expression.expression), - alias="max_val", - user_type=expression.user_type, - avatar_ids=expression.avatar_ids, - ), - ], - dimension_filters=dimension_filters, - ) - - -def get_query_type(connection: ConnectionBase, conn_sec_mgr: ConnectionSecurityManager) -> ReportingQueryType: - if connection.is_always_internal_source: - return ReportingQueryType.internal - - if isinstance(connection, ClassicConnectionSQL): - if conn_sec_mgr.is_internal_connection(connection.get_conn_dto()): - return ReportingQueryType.internal - - return ReportingQueryType.external diff --git a/lib/dl_core/dl_core/services_registry/data_processor_factory.py b/lib/dl_core/dl_core/services_registry/data_processor_factory.py index 5da50f310..b6adfa9a7 100644 --- a/lib/dl_core/dl_core/services_registry/data_processor_factory.py +++ b/lib/dl_core/dl_core/services_registry/data_processor_factory.py @@ -40,16 +40,9 @@ def _create_data_processor( **kwargs: Any, ) -> ExecutorBasedOperationProcessor: assert role is not None - selector_factory = self.services_registry.get_selector_factory() - selector = selector_factory.get_dataset_selector( - dataset=dataset, - allow_cache_usage=False, # Use data processor-level cache - us_entry_buffer=us_entry_buffer, - ) processor = SourceDbOperationProcessor( service_registry=self.services_registry, dataset=dataset, - selector=selector, role=role, us_entry_buffer=us_entry_buffer, is_bleeding_edge_user=self._is_bleeding_edge_user, diff --git a/lib/dl_core/dl_core/services_registry/selector_factory.py b/lib/dl_core/dl_core/services_registry/selector_factory.py deleted file mode 100644 index 14849cd54..000000000 --- a/lib/dl_core/dl_core/services_registry/selector_factory.py +++ /dev/null @@ -1,111 +0,0 @@ -from __future__ import annotations - -import abc -import asyncio -import logging -from typing import ( - TYPE_CHECKING, - List, -) - -import attr - -from dl_api_commons.base_models import RequestContextInfo -from dl_core.data_processing.selectors.dataset_base import DatasetDataSelectorAsyncBase -from dl_core.data_processing.selectors.db import DatasetDbDataSelectorAsync -from dl_core.us_dataset import Dataset -from dl_core.us_manager.local_cache import USEntryBuffer -from dl_core.utils import FutureRef - - -if TYPE_CHECKING: - from dl_core.services_registry.top_level import ServicesRegistry - - -LOGGER = logging.getLogger(__name__) - - -class SelectorFactory(metaclass=abc.ABCMeta): - @abc.abstractmethod - def get_dataset_selector( - self, - dataset: Dataset, - *, - us_entry_buffer: USEntryBuffer, - allow_cache_usage: bool = True, # TODO: Remove cache from selectors - ) -> DatasetDataSelectorAsyncBase: - pass - - @abc.abstractmethod - async def close_async(self) -> None: - pass - - -@attr.s(frozen=True) -class BaseClosableSelectorFactory(SelectorFactory, metaclass=abc.ABCMeta): - _services_registry_ref: FutureRef[ServicesRegistry] = attr.ib() - _created_dataset_selectors: List[DatasetDataSelectorAsyncBase] = attr.ib(factory=list) - - @property - def services_registry(self) -> ServicesRegistry: - return self._services_registry_ref.ref - - @property - def rci(self) -> RequestContextInfo: - return self.services_registry.rci - - def get_dataset_selector( - self, - dataset: Dataset, - *, - us_entry_buffer: USEntryBuffer, - allow_cache_usage: bool = True, - ) -> DatasetDataSelectorAsyncBase: - selector = self._create_dataset_selector( - dataset=dataset, - us_entry_buffer=us_entry_buffer, - allow_cache_usage=allow_cache_usage, - ) - self._created_dataset_selectors.append(selector) - - return selector - - @abc.abstractmethod - def _create_dataset_selector( - self, - dataset: Dataset, - *, - us_entry_buffer: USEntryBuffer, - allow_cache_usage: bool = True, - ) -> DatasetDataSelectorAsyncBase: - pass - - async def close_async(self) -> None: - async def close_selector(s: "DatasetDataSelectorAsyncBase") -> None: - # noinspection PyBroadException - try: - await s.close() - except Exception: - LOGGER.exception("Error during selector closing") - - await asyncio.gather(*[close_selector(selector) for selector in self._created_dataset_selectors]) - - -@attr.s(frozen=True) -class DefaultSelectorFactory(BaseClosableSelectorFactory): - _is_bleeding_edge_user: bool = attr.ib(default=False) - - def _create_dataset_selector( - self, - dataset: Dataset, - *, - us_entry_buffer: USEntryBuffer, - allow_cache_usage: bool = True, - ) -> DatasetDataSelectorAsyncBase: - return DatasetDbDataSelectorAsync( - dataset=dataset, - service_registry=self.services_registry, - # allow_cache_usage=allow_cache_usage, - # is_bleeding_edge_user=self._is_bleeding_edge_user, - us_entry_buffer=us_entry_buffer, - ) diff --git a/lib/dl_core/dl_core/services_registry/sr_factories.py b/lib/dl_core/dl_core/services_registry/sr_factories.py index bf4652746..4e6ff638c 100644 --- a/lib/dl_core/dl_core/services_registry/sr_factories.py +++ b/lib/dl_core/dl_core/services_registry/sr_factories.py @@ -30,7 +30,6 @@ FileUploaderSettings, ) from dl_core.services_registry.rqe_caches import RQECachesSetting -from dl_core.services_registry.selector_factory import DefaultSelectorFactory from dl_core.services_registry.top_level import ( DefaultServicesRegistry, ServicesRegistry, @@ -159,10 +158,6 @@ def make_service_registry( data_processor_service_factory=data_processor_service_factory, connectors_settings=self.connectors_settings, reporting_registry=reporting_registry, - selector_factory=DefaultSelectorFactory( - services_registry_ref=sr_ref, - is_bleeding_edge_user=self.is_bleeding_edge_user(request_context_info), - ), data_processor_factory=DefaultDataProcessorFactory( services_registry_ref=sr_ref, is_bleeding_edge_user=self.is_bleeding_edge_user(request_context_info), diff --git a/lib/dl_core/dl_core/services_registry/top_level.py b/lib/dl_core/dl_core/services_registry/top_level.py index 6fbfc23ac..a9b81a8b2 100644 --- a/lib/dl_core/dl_core/services_registry/top_level.py +++ b/lib/dl_core/dl_core/services_registry/top_level.py @@ -28,10 +28,6 @@ from dl_core.services_registry.data_processor_factory_base import BaseClosableDataProcessorFactory from dl_core.services_registry.inst_specific_sr import InstallationSpecificServiceRegistry from dl_core.services_registry.rqe_caches import RQECachesSetting -from dl_core.services_registry.selector_factory import ( - DefaultSelectorFactory, - SelectorFactory, -) from dl_core.us_manager.mutation_cache.engine_factory import ( DefaultMutationCacheEngineFactory, MutationCacheEngineFactory, @@ -96,10 +92,6 @@ def get_reporting_registry(self) -> ReportingRegistry: def get_compute_executor(self) -> ComputeExecutor: pass - @abc.abstractmethod - def get_selector_factory(self) -> SelectorFactory: - pass - @abc.abstractmethod def get_data_processor_service_factory(self) -> Optional[Callable[[ProcessorType], DataProcessorService]]: pass @@ -165,7 +157,6 @@ class DefaultServicesRegistry(ServicesRegistry): # type: ignore # TODO: fix _conn_exec_factory: Optional[ConnExecutorFactory] = attr.ib(default=None) _caches_redis_client_factory: Optional[Callable[[bool], Optional[redis.asyncio.Redis]]] = attr.ib(default=None) _compute_executor: ComputeExecutor = attr.ib() - _selector_factory: SelectorFactory = attr.ib() _cache_engine_factory: CacheEngineFactory = attr.ib() _mutation_cache_engine_factory: MutationCacheEngineFactory = attr.ib(default=None) _data_processor_service_factory: Optional[Callable[[ProcessorType], DataProcessorService]] = attr.ib(default=None) @@ -181,10 +172,6 @@ class DefaultServicesRegistry(ServicesRegistry): # type: ignore # TODO: fix def _default_compute_executor(self) -> ComputeExecutor: return ComputeExecutorTPE() # type: ignore # Incompatible return value type (got "ComputeExecutorTPE", expected "ComputeExecutor") - @_selector_factory.default # noqa - def _default_selector_factory(self) -> SelectorFactory: - return DefaultSelectorFactory(services_registry_ref=FutureRef.fulfilled(self)) - @_cache_engine_factory.default # noqa def _default_cache_engine_factory(self) -> CacheEngineFactory: return DefaultCacheEngineFactory(services_registry_ref=FutureRef.fulfilled(self)) @@ -226,9 +213,6 @@ def get_reporting_registry(self) -> ReportingRegistry: def get_compute_executor(self) -> ComputeExecutor: return self._compute_executor - def get_selector_factory(self) -> SelectorFactory: - return self._selector_factory - def get_cache_engine_factory(self) -> Optional[CacheEngineFactory]: # type: ignore # TODO: fix return self._cache_engine_factory @@ -288,10 +272,6 @@ async def close_async(self) -> None: # Processor factory must be closed before selectors # because processors might use selectors while closing await self._data_processor_factory.close_async() - if self._selector_factory is not None: - # Should not parallelize with selectors factory closing - # because its selectors may use CE in closing procedure - await self._selector_factory.close_async() if self._conn_exec_factory is not None: await self._conn_exec_factory.close_async() if self._task_processor_factory is not None: @@ -338,9 +318,6 @@ def get_reporting_registry(self) -> ReportingRegistry: def get_compute_executor(self) -> ComputeExecutor: raise NotImplementedError(self.NOT_IMPLEMENTED_MSG) - def get_selector_factory(self) -> SelectorFactory: - raise NotImplementedError(self.NOT_IMPLEMENTED_MSG) - def get_cache_engine_factory(self) -> Optional[CacheEngineFactory]: # type: ignore # TODO: fix raise NotImplementedError(self.NOT_IMPLEMENTED_MSG)