Skip to content

Commit

Permalink
Removed selectors (#179)
Browse files Browse the repository at this point in the history
  • Loading branch information
altvod authored Dec 21, 2023
1 parent 880190b commit f11d9f3
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 622 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from dl_core.data_processing.processing.db_base.exec_adapter_base import ProcessorDbExecAdapterBase
from dl_core.data_processing.processing.db_base.processor_base import ExecutorBasedOperationProcessor
from dl_core.data_processing.processing.source_db.selector_exec_adapter import SourceDbExecAdapter
from dl_core.data_processing.selectors.base import DataSelectorAsyncBase
from dl_core.us_dataset import Dataset
from dl_core.us_manager.local_cache import USEntryBuffer

Expand All @@ -22,7 +21,6 @@
class SourceDbOperationProcessor(ExecutorBasedOperationProcessor):
_role: DataSourceRole = attr.ib(kw_only=True)
_dataset: Dataset = attr.ib(kw_only=True)
_selector: DataSelectorAsyncBase = attr.ib(kw_only=True)
_row_count_hard_limit: Optional[int] = attr.ib(kw_only=True, default=None)
_us_entry_buffer: USEntryBuffer = attr.ib(kw_only=True)
_is_bleeding_edge_user: bool = attr.ib(default=False)
Expand All @@ -41,7 +39,6 @@ def _make_db_ex_adapter(self) -> Optional[ProcessorDbExecAdapterBase]:
reporting_enabled=self._reporting_enabled,
role=self._role,
dataset=self._dataset,
selector=self._selector,
row_count_hard_limit=self._row_count_hard_limit,
us_entry_buffer=self._us_entry_buffer,
cache_options_builder=self._cache_options_builder,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import logging
import time
from typing import (
TYPE_CHECKING,
Expand All @@ -17,34 +18,62 @@
QueryExecutionEndReportingRecord,
QueryExecutionStartReportingRecord,
)
from dl_constants.enums import UserDataType
from dl_constants.enums import (
ReportingQueryType,
UserDataType,
)
from dl_core import utils
from dl_core.base_models import WorkbookEntryLocation
from dl_core.connection_executors import ConnExecutorQuery
from dl_core.data_processing.prepared_components.default_manager import DefaultPreparedComponentManager
from dl_core.data_processing.processing.context import OpExecutionContext
from dl_core.data_processing.processing.db_base.exec_adapter_base import ProcessorDbExecAdapterBase
from dl_core.data_processing.selectors.utils import get_query_type
from dl_core.us_connection_base import ExecutorBasedMixin
from dl_core.data_processing.streaming import (
AsyncChunked,
AsyncChunkedBase,
LazyAsyncChunked,
)
from dl_core.query.bi_query import QueryAndResultInfo
from dl_core.us_connection_base import (
ClassicConnectionSQL,
ConnectionBase,
ExecutorBasedMixin,
)


if TYPE_CHECKING:
from sqlalchemy.sql.selectable import Select

from dl_constants.enums import DataSourceRole
from dl_constants.types import TBIDataValue
from dl_core.base_models import ConnectionRef
from dl_core.connections_security.base import ConnectionSecurityManager
from dl_core.data_processing.cache.primitives import LocalKeyRepresentation
from dl_core.data_processing.prepared_components.manager_base import PreparedComponentManagerBase
from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo
from dl_core.data_processing.selectors.base import DataSelectorAsyncBase
from dl_core.data_processing.types import TValuesChunkStream
from dl_core.us_dataset import Dataset
from dl_core.us_manager.local_cache import USEntryBuffer


LOGGER = logging.getLogger(__name__)


def get_query_type(connection: ConnectionBase, conn_sec_mgr: ConnectionSecurityManager) -> ReportingQueryType:
if connection.is_always_internal_source:
return ReportingQueryType.internal

if isinstance(connection, ClassicConnectionSQL):
if conn_sec_mgr.is_internal_connection(connection.get_conn_dto()):
return ReportingQueryType.internal

return ReportingQueryType.external


@attr.s
class SourceDbExecAdapter(ProcessorDbExecAdapterBase): # noqa
_role: DataSourceRole = attr.ib(kw_only=True)
_dataset: Dataset = attr.ib(kw_only=True)
_selector: DataSelectorAsyncBase = attr.ib(kw_only=True)
_prep_component_manager: Optional[PreparedComponentManagerBase] = attr.ib(kw_only=True, default=None)
_row_count_hard_limit: Optional[int] = attr.ib(kw_only=True, default=None)
_us_entry_buffer: USEntryBuffer = attr.ib(kw_only=True)
Expand All @@ -61,6 +90,49 @@ def get_prep_component_manager(self) -> PreparedComponentManagerBase:
assert self._prep_component_manager is not None
return self._prep_component_manager

async def _get_data_stream_from_source(
self,
*,
query_res_info: QueryAndResultInfo,
joint_dsrc_info: PreparedFromInfo,
row_count_hard_limit: Optional[int] = None,
) -> TValuesChunkStream:
"""Generate data stream from a data source"""

compiled_query = utils.compile_query_for_debug(query_res_info.query, joint_dsrc_info.query_compiler.dialect)
LOGGER.info(f"SQL query for dataset: {compiled_query}")

assert joint_dsrc_info.target_connection_ref is not None
target_connection = self._us_entry_buffer.get_entry(joint_dsrc_info.target_connection_ref)
assert isinstance(target_connection, ExecutorBasedMixin)

ce_factory = self._service_registry.get_conn_executor_factory()
ce = ce_factory.get_async_conn_executor(target_connection)

exec_result = await ce.execute(
ConnExecutorQuery(
query=query_res_info.query,
db_name=joint_dsrc_info.db_name,
user_types=query_res_info.user_types,
debug_compiled_query=compiled_query,
chunk_size=None,
)
)
wrapped_result_iter = AsyncChunked(chunked_data=exec_result.result)

async def initialize_data_stream() -> AsyncChunkedBase[list[TBIDataValue]]:
return wrapped_result_iter # type: ignore # TODO: fix

async def finalize_data_stream() -> None:
pass

result_iter = LazyAsyncChunked(initializer=initialize_data_stream, finalizer=finalize_data_stream)

if row_count_hard_limit is not None:
result_iter = result_iter.limit(max_count=row_count_hard_limit)

return result_iter

async def _execute_and_fetch(
self,
*,
Expand All @@ -80,14 +152,12 @@ async def _execute_and_fetch(
await preparation_callback()

query_res_info = self._make_query_res_info(query=query, user_types=user_types)
data_stream = await self._selector.get_data_stream(
query_id=query_id,
role=self._role,
data_stream_data = await self._get_data_stream_from_source(
joint_dsrc_info=joint_dsrc_info,
query_res_info=query_res_info,
row_count_hard_limit=self._row_count_hard_limit,
)
return data_stream.data
return data_stream_data

def _save_start_exec_reporting_record(
self,
Expand Down
Empty file.
72 changes: 0 additions & 72 deletions lib/dl_core/dl_core/data_processing/selectors/base.py

This file was deleted.

Loading

0 comments on commit f11d9f3

Please sign in to comment.