Skip to content

Commit

Permalink
Removed service registry usage from data processors (#343)
Browse files Browse the repository at this point in the history
  • Loading branch information
altvod authored Mar 1, 2024
1 parent 54ff668 commit 83c2bf4
Show file tree
Hide file tree
Showing 17 changed files with 76 additions and 71 deletions.
2 changes: 1 addition & 1 deletion lib/dl_api_lib/dl_api_lib/app/data_api/resources/ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class PingReadyView(BaseView):
async def is_pg_ready(self) -> bool:
compeng: CompEngPgService = CompEngPgService.get_app_instance(self.request.app)
processor = compeng.get_data_processor(
service_registry=self.dl_request.services_registry,
reporting_registry=self.dl_request.services_registry.get_reporting_registry(),
reporting_enabled=False,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

import attr

from dl_api_commons.reporting.registry import ReportingRegistry
from dl_compeng_pg.compeng_aiopg.pool_aiopg import AiopgPoolWrapper
from dl_compeng_pg.compeng_aiopg.processor_aiopg import AiopgOperationProcessor
from dl_compeng_pg.compeng_pg_base.data_processor_service_pg import CompEngPgService
from dl_compeng_pg.compeng_pg_base.pool_base import BasePgPoolWrapper
from dl_core.data_processing.processing.processor import OperationProcessorAsyncBase
from dl_core.services_registry.top_level import ServicesRegistry


@attr.s
Expand All @@ -19,11 +19,11 @@ def _get_pool_wrapper_cls(self) -> Type[BasePgPoolWrapper]:

def get_data_processor( # type: ignore # 2024-01-29 # TODO: Return type "OperationProcessorAsyncBase" of "get_data_processor" incompatible with return type "ExecutorBasedOperationProcessor" in supertype "DataProcessorService" [override]
self,
service_registry: ServicesRegistry,
reporting_registry: ReportingRegistry,
reporting_enabled: bool,
) -> OperationProcessorAsyncBase:
return AiopgOperationProcessor(
service_registry=service_registry,
reporting_registry=reporting_registry,
pg_pool=self.pool,
reporting_enabled=reporting_enabled,
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class AiopgOperationProcessor(PostgreSQLOperationProcessor[AiopgPoolWrapper, aio
async def start(self) -> None:
self._pg_conn = await self._pg_pool.pool.acquire()
self._db_ex_adapter = AiopgExecAdapter(
service_registry=self.service_registry,
reporting_registry=self._reporting_registry,
reporting_enabled=self._reporting_enabled,
conn=self._pg_conn, # type: ignore # 2024-01-29 # TODO: Argument "conn" to "AiopgExecAdapter" has incompatible type "SAConnection | None"; expected "SAConnection" [arg-type]
cache_options_builder=self._cache_options_builder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

import attr

from dl_api_commons.reporting.registry import ReportingRegistry
from dl_compeng_pg.compeng_asyncpg.pool_asyncpg import AsyncpgPoolWrapper
from dl_compeng_pg.compeng_asyncpg.processor_asyncpg import AsyncpgOperationProcessor
from dl_compeng_pg.compeng_pg_base.data_processor_service_pg import CompEngPgService
from dl_compeng_pg.compeng_pg_base.pool_base import BasePgPoolWrapper
from dl_core.data_processing.processing.processor import OperationProcessorAsyncBase
from dl_core.services_registry.top_level import ServicesRegistry


@attr.s
Expand All @@ -19,11 +19,11 @@ def _get_pool_wrapper_cls(self) -> Type[BasePgPoolWrapper]:

def get_data_processor( # type: ignore # 2024-01-29 # TODO: Return type "OperationProcessorAsyncBase" of "get_data_processor" incompatible with return type "ExecutorBasedOperationProcessor" in supertype "DataProcessorService" [override]
self,
service_registry: ServicesRegistry,
reporting_registry: ReportingRegistry,
reporting_enabled: bool,
) -> OperationProcessorAsyncBase:
return AsyncpgOperationProcessor(
service_registry=service_registry,
reporting_registry=reporting_registry,
pg_pool=self.pool,
reporting_enabled=reporting_enabled,
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async def start(self) -> None:
self._pg_conn = pg_conn
await cmstack.enter_async_context(pg_conn.transaction())
self._db_ex_adapter = AsyncpgExecAdapter(
service_registry=self.service_registry,
reporting_registry=self._reporting_registry,
reporting_enabled=self._reporting_enabled,
conn=pg_conn,
cache_options_builder=self._cache_options_builder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


if TYPE_CHECKING:
from dl_core.services_registry.top_level import ServicesRegistry
from dl_api_commons.reporting.registry import ReportingRegistry


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -72,7 +72,7 @@ def get_app_instance(cls: Type[_DATA_PROC_SRV_TV], app: web.Application) -> _DAT
@abc.abstractmethod
def get_data_processor(
self,
service_registry: ServicesRegistry,
reporting_registry: ReportingRegistry,
reporting_enabled: bool,
) -> ExecutorBasedOperationProcessor:
raise NotImplementedError
Expand Down
10 changes: 1 addition & 9 deletions lib/dl_core/dl_core/data_processing/cache/processing_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
)
from dl_cache_engine.primitives import BIQueryCacheOptions
from dl_core.data_processing.types import TJSONExtChunkStream
from dl_core.services_registry import ServicesRegistry


LOGGER = logging.getLogger(__name__)
Expand All @@ -45,17 +44,10 @@ class CacheSituation(enum.IntEnum):

@attr.s
class CacheProcessingHelper:
entity_id: str = attr.ib(kw_only=True)
_service_registry: ServicesRegistry = attr.ib(kw_only=True)
_cache_engine: Optional[EntityCacheEngineAsync] = attr.ib(init=False, default=None)
_cache_engine: Optional[EntityCacheEngineAsync] = attr.ib(kw_only=True)

error_ttl_sec: ClassVar[float] = 1.5

def __attrs_post_init__(self) -> None:
cache_engine_factory = self._service_registry.get_cache_engine_factory()
if cache_engine_factory is not None:
self._cache_engine = cache_engine_factory.get_cache_engine(entity_id=self.entity_id)

async def get_cache_entry_manager(
self,
*,
Expand Down
8 changes: 4 additions & 4 deletions lib/dl_core/dl_core/data_processing/dashsql.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,10 @@ async def _generate_func() -> Optional[TJSONExtChunkStream]:
)
)

cache_helper = CacheProcessingHelper(
entity_id=conn_id,
service_registry=service_registry,
)
cache_engine_factory = service_registry.get_cache_engine_factory()
assert cache_engine_factory is not None
cache_engine = cache_engine_factory.get_cache_engine(entity_id=conn_id)
cache_helper = CacheProcessingHelper(cache_engine=cache_engine)
cache_options = self.make_cache_options()
if not cache_options.cache_enabled:
# cache not applicable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from dl_core.data_processing.processing.db_base.exec_adapter_base import ProcessorDbExecAdapterBase
from dl_core.data_processing.processing.db_base.processor_base import ExecutorBasedOperationProcessor
from dl_core.data_processing.streaming import AsyncChunkedBase
from dl_core.us_dataset import Dataset


if TYPE_CHECKING:
Expand All @@ -37,14 +36,16 @@
from dl_core.base_models import ConnectionRef
from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo
from dl_core.data_processing.types import TValuesChunkStream
from dl_core.services_registry.cache_engine_factory import CacheEngineFactory


@attr.s
class CacheExecAdapter(ProcessorDbExecAdapterBase): # noqa
_dataset: Dataset = attr.ib(kw_only=True)
_dataset_id: Optional[str] = attr.ib(kw_only=True)
_main_processor: ExecutorBasedOperationProcessor = attr.ib(kw_only=True)
_use_cache: bool = attr.ib(kw_only=True)
_use_locked_cache: bool = attr.ib(kw_only=True)
_cache_engine_factory: CacheEngineFactory = attr.ib(kw_only=True)

def _save_data_proc_cache_info_reporting_record(self, ctx: OpExecutionContext, cache_full_hit: bool) -> None:
data_proc_cache_record = DataProcessingCacheInfoReportingRecord(
Expand Down Expand Up @@ -74,11 +75,8 @@ async def _execute_and_fetch(
data_key=data_key,
)

ds_id = self._dataset.uuid
cache_helper = CacheProcessingHelper(
entity_id=ds_id, # type: ignore # TODO: fix
service_registry=self._service_registry,
)
cache_engine = self._cache_engine_factory.get_cache_engine(entity_id=self._dataset_id)
cache_helper = CacheProcessingHelper(cache_engine=cache_engine)

original_ctx = ctx.clone()

Expand Down
21 changes: 14 additions & 7 deletions lib/dl_core/dl_core/data_processing/processing/cache/processor.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,43 @@
from __future__ import annotations

from typing import Optional
from typing import (
TYPE_CHECKING,
Optional,
)

import attr

from dl_core.data_processing.cache.utils import CacheOptionsBuilderBase
from dl_core.data_processing.processing.cache.exec_adapter import CacheExecAdapter
from dl_core.data_processing.processing.db_base.exec_adapter_base import ProcessorDbExecAdapterBase
from dl_core.data_processing.processing.db_base.processor_base import ExecutorBasedOperationProcessor
from dl_core.data_processing.processing.processor import OperationProcessorAsyncBase
from dl_core.us_dataset import Dataset


if TYPE_CHECKING:
from dl_core.services_registry.cache_engine_factory import CacheEngineFactory


@attr.s
class CacheOperationProcessor(ExecutorBasedOperationProcessor, OperationProcessorAsyncBase):
_dataset: Dataset = attr.ib(kw_only=True)
class CacheOperationProcessor(ExecutorBasedOperationProcessor):
_dataset_id: Optional[str] = attr.ib(kw_only=True)
_main_processor: ExecutorBasedOperationProcessor = attr.ib(kw_only=True)
_use_cache: bool = attr.ib(kw_only=True, default=True)
_use_locked_cache: bool = attr.ib(kw_only=True, default=True)
_cache_engine_factory: CacheEngineFactory = attr.ib(kw_only=True)

def _make_cache_options_builder(self) -> CacheOptionsBuilderBase: # type: ignore # 2024-01-24 # TODO: Return type "CacheOptionsBuilderBase" of "_make_cache_options_builder" incompatible with return type "DatasetOptionsBuilder" in supertype "OperationProcessorAsyncBase" [override]
return self._main_processor._cache_options_builder

def _make_db_ex_adapter(self) -> Optional[ProcessorDbExecAdapterBase]:
return CacheExecAdapter(
service_registry=self._service_registry,
reporting_registry=self._reporting_registry,
reporting_enabled=self._reporting_enabled,
cache_options_builder=self._cache_options_builder,
dataset=self._dataset,
dataset_id=self._dataset_id,
main_processor=self._main_processor,
use_cache=self._use_cache,
use_locked_cache=self._use_locked_cache,
cache_engine_factory=self._cache_engine_factory,
)

async def ping(self) -> Optional[int]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from dl_constants.enums import UserDataType
from dl_core.base_models import ConnectionRef
from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo
from dl_core.services_registry.top_level import ServicesRegistry


LOGGER = logging.getLogger(__name__)
Expand All @@ -47,12 +46,8 @@ class ProcessorDbExecAdapterBase(abc.ABC):
_default_chunk_size: ClassVar[int] = 1000
_log: ClassVar[logging.Logger] = LOGGER.getChild("ProcessorDbExecAdapterBase")
_cache_options_builder: DatasetOptionsBuilder = attr.ib(kw_only=True)
_service_registry: ServicesRegistry = attr.ib(kw_only=True)
_reporting_enabled: bool = attr.ib(kw_only=True, default=True)

@property
def _reporting_registry(self) -> ReportingRegistry:
return self._service_registry.get_reporting_registry()
_reporting_registry: ReportingRegistry = attr.ib(kw_only=True)

def add_reporting_record(self, record: ReportingRecord) -> None:
if self._reporting_enabled:
Expand Down
12 changes: 1 addition & 11 deletions lib/dl_core/dl_core/data_processing/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@

if TYPE_CHECKING:
from dl_api_commons.reporting.registry import ReportingRegistry
from dl_core.services_registry import ServicesRegistry


LOGGER = logging.getLogger(__name__)
Expand All @@ -46,7 +45,7 @@

@attr.s
class OperationProcessorAsyncBase(abc.ABC):
_service_registry: ServicesRegistry = attr.ib(kw_only=True) # Service registry override
_reporting_registry: ReportingRegistry = attr.ib(kw_only=True)
_reporting_enabled: bool = attr.ib(kw_only=True, default=True)
_cache_options_builder: DatasetOptionsBuilder = attr.ib(init=False)
_db_ex_adapter: Optional[ProcessorDbExecAdapterBase] = attr.ib(init=False, default=None)
Expand Down Expand Up @@ -187,15 +186,6 @@ async def run(

return result

@property
def service_registry(self) -> ServicesRegistry:
assert self._service_registry is not None
return self._service_registry

@property
def _reporting_registry(self) -> ReportingRegistry:
return self.service_registry.get_reporting_registry()

def _save_start_exec_reporting_record(self, ctx: OpExecutionContext) -> None:
report = DataProcessingStartReportingRecord(
timestamp=time.time(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

from typing import Optional
from typing import (
TYPE_CHECKING,
Optional,
)

import attr

Expand All @@ -17,6 +20,11 @@
from dl_core.us_manager.local_cache import USEntryBuffer


if TYPE_CHECKING:
from dl_api_commons.base_models import RequestContextInfo
from dl_core.services_registry.conn_executor_factory_base import ConnExecutorFactory


@attr.s
class SourceDbOperationProcessor(ExecutorBasedOperationProcessor):
_role: DataSourceRole = attr.ib(kw_only=True)
Expand All @@ -25,6 +33,8 @@ class SourceDbOperationProcessor(ExecutorBasedOperationProcessor):
_us_entry_buffer: USEntryBuffer = attr.ib(kw_only=True)
_is_bleeding_edge_user: bool = attr.ib(default=False)
_default_cache_ttl_config: CacheTTLConfig = attr.ib(default=None)
_ce_factory: ConnExecutorFactory = attr.ib(kw_only=True)
_rci: RequestContextInfo = attr.ib(kw_only=True)

def _make_cache_options_builder(self) -> DatasetOptionsBuilder:
return SelectorCacheOptionsBuilder(
Expand All @@ -35,11 +45,13 @@ def _make_cache_options_builder(self) -> DatasetOptionsBuilder:

def _make_db_ex_adapter(self) -> Optional[ProcessorDbExecAdapterBase]:
return SourceDbExecAdapter(
service_registry=self.service_registry,
reporting_registry=self._reporting_registry,
reporting_enabled=self._reporting_enabled,
role=self._role,
dataset=self._dataset,
row_count_hard_limit=self._row_count_hard_limit,
us_entry_buffer=self._us_entry_buffer,
cache_options_builder=self._cache_options_builder,
ce_factory=self._ce_factory,
rci=self._rci,
)
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
if TYPE_CHECKING:
from sqlalchemy.sql.selectable import Select

from dl_api_commons.base_models import RequestContextInfo
from dl_cache_engine.primitives import LocalKeyRepresentation
from dl_constants.enums import DataSourceRole
from dl_constants.types import TBIDataValue
Expand All @@ -53,6 +54,7 @@
from dl_core.data_processing.prepared_components.manager_base import PreparedComponentManagerBase
from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo
from dl_core.data_processing.types import TValuesChunkStream
from dl_core.services_registry.conn_executor_factory_base import ConnExecutorFactory
from dl_core.us_dataset import Dataset
from dl_core.us_manager.local_cache import USEntryBuffer

Expand All @@ -78,6 +80,8 @@ class SourceDbExecAdapter(ProcessorDbExecAdapterBase): # noqa
_prep_component_manager: Optional[PreparedComponentManagerBase] = attr.ib(kw_only=True, default=None)
_row_count_hard_limit: Optional[int] = attr.ib(kw_only=True, default=None)
_us_entry_buffer: USEntryBuffer = attr.ib(kw_only=True)
_ce_factory: ConnExecutorFactory = attr.ib(kw_only=True)
_rci: RequestContextInfo = attr.ib(kw_only=True)

def __attrs_post_init__(self) -> None:
if self._prep_component_manager is None:
Expand Down Expand Up @@ -107,8 +111,7 @@ async def _get_data_stream_from_source(
target_connection = self._us_entry_buffer.get_entry(joint_dsrc_info.target_connection_ref)
assert isinstance(target_connection, ExecutorBasedMixin)

ce_factory = self._service_registry.get_conn_executor_factory()
ce = ce_factory.get_async_conn_executor(target_connection)
ce = self._ce_factory.get_async_conn_executor(target_connection)

exec_result = await ce.execute(
ConnExecutorQuery(
Expand Down Expand Up @@ -175,7 +178,7 @@ def _save_start_exec_reporting_record(
target_connection = self._us_entry_buffer.get_entry(entry_id=target_connection_ref)
assert isinstance(target_connection, ExecutorBasedMixin)

workbook_id = self._service_registry.rci.workbook_id or (
workbook_id = self._rci.workbook_id or (
target_connection.entry_key.workbook_id
if isinstance(target_connection.entry_key, WorkbookEntryLocation)
else None
Expand All @@ -187,7 +190,7 @@ def _save_start_exec_reporting_record(
dataset_id=dataset_id,
query_type=get_query_type(
connection=target_connection,
conn_sec_mgr=self._service_registry.get_conn_executor_factory().conn_security_manager,
conn_sec_mgr=self._ce_factory.conn_security_manager,
),
connection_type=target_connection.conn_type,
conn_reporting_data=target_connection.get_conn_dto().conn_reporting_data(),
Expand Down
Loading

0 comments on commit 83c2bf4

Please sign in to comment.