From c88dee56c238bbeed37700b600ce1d786c36d3c1 Mon Sep 17 00:00:00 2001 From: Grigory Statsenko Date: Tue, 31 Oct 2023 17:06:09 +0100 Subject: [PATCH] Removed legacy selector code --- lib/dl_api_lib/dl_api_lib/dataset/view.py | 3 - lib/dl_constants/dl_constants/enums.py | 5 - .../dl_core/data_processing/cache/utils.py | 19 ---- .../selectors/dataset_cache_base.py | 83 -------------- .../selectors/dataset_cached.py | 106 ------------------ .../selectors/dataset_cached_lazy.py | 78 ------------- .../dl_core/data_processing/selectors/db.py | 32 +++++- .../data_processor_factory.py | 4 - .../services_registry/selector_factory.py | 22 +--- lib/dl_core_testing/dl_core_testing/data.py | 15 --- .../dl_query_processing/execution/executor.py | 3 - 11 files changed, 33 insertions(+), 337 deletions(-) delete mode 100644 lib/dl_core/dl_core/data_processing/selectors/dataset_cache_base.py delete mode 100644 lib/dl_core/dl_core/data_processing/selectors/dataset_cached.py delete mode 100644 lib/dl_core/dl_core/data_processing/selectors/dataset_cached_lazy.py diff --git a/lib/dl_api_lib/dl_api_lib/dataset/view.py b/lib/dl_api_lib/dl_api_lib/dataset/view.py index 947b7e728..c01ddb6a3 100644 --- a/lib/dl_api_lib/dl_api_lib/dataset/view.py +++ b/lib/dl_api_lib/dl_api_lib/dataset/view.py @@ -20,7 +20,6 @@ CalcMode, DataSourceRole, ProcessorType, - SelectorType, ) from dl_core.fields import BIField from dl_core.us_connection_base import ClassicConnectionSQL @@ -130,7 +129,6 @@ class DatasetView(DatasetBaseWrapper): _SOURCE_DB_PROCESSOR_TYPE = ProcessorType.SOURCE_DB _COMPENG_PROCESSOR_TYPE = ProcessorType.ASYNCPG - _SELECTOR_TYPE = SelectorType.CACHED_LAZY _verbose_logging = True @@ -180,7 +178,6 @@ def make_query_executor(self, allow_cache_usage: bool) -> QueryExecutorBase: avatar_alias_mapper=self._avatar_alias_mapper, compeng_processor_type=self._COMPENG_PROCESSOR_TYPE, source_db_processor_type=self._SOURCE_DB_PROCESSOR_TYPE, - selector_type=self._SELECTOR_TYPE, allow_cache_usage=allow_cache_usage, us_manager=self._us_manager, compeng_semaphore=self._compeng_semaphore, diff --git a/lib/dl_constants/dl_constants/enums.py b/lib/dl_constants/dl_constants/enums.py index 9995b82f1..4558fe5cd 100644 --- a/lib/dl_constants/dl_constants/enums.py +++ b/lib/dl_constants/dl_constants/enums.py @@ -326,11 +326,6 @@ class RangeType(Enum): max = "max" -class SelectorType(Enum): - CACHED = auto() - CACHED_LAZY = auto() - - class ProcessorType(Enum): SOURCE_DB = auto() ASYNCPG = auto() diff --git a/lib/dl_core/dl_core/data_processing/cache/utils.py b/lib/dl_core/dl_core/data_processing/cache/utils.py index c0a199b22..3fbec2ddc 100644 --- a/lib/dl_core/dl_core/data_processing/cache/utils.py +++ b/lib/dl_core/dl_core/data_processing/cache/utils.py @@ -34,7 +34,6 @@ from dl_constants.enums import UserDataType from dl_constants.types import TJSONExt from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo - from dl_core.data_processing.stream_base import DataStreamBase from dl_core.data_source.base import DataSource from dl_core.us_dataset import Dataset from dl_core.us_manager.local_cache import USEntryBuffer @@ -157,24 +156,6 @@ def get_cache_options( refresh_ttl_on_read=ttl_info.refresh_ttl_on_read, ) - def get_cache_options_for_stream( - self, - stream: DataStreamBase, - dataset: Optional[Dataset] = None, - ) -> BIQueryCacheOptions: - ttl_info = self.get_cache_ttl_info( - data_source_list=stream.meta.data_source_list, - dataset=dataset, - ) - key = stream.data_key - cache_enabled = key is not None - return BIQueryCacheOptions( - cache_enabled=cache_enabled, - key=key, - ttl_sec=ttl_info.ttl_sec, - refresh_ttl_on_read=ttl_info.refresh_ttl_on_read, - ) - @attr.s class SelectorCacheOptionsBuilder(DatasetOptionsBuilder): diff --git a/lib/dl_core/dl_core/data_processing/selectors/dataset_cache_base.py b/lib/dl_core/dl_core/data_processing/selectors/dataset_cache_base.py deleted file mode 100644 index 98794ef05..000000000 --- a/lib/dl_core/dl_core/data_processing/selectors/dataset_cache_base.py +++ /dev/null @@ -1,83 +0,0 @@ -from __future__ import annotations - -import abc -import logging -from typing import Optional - -import attr - -from dl_constants.enums import DataSourceRole -from dl_core.data_processing.cache.exc import CachePreparationFailed -from dl_core.data_processing.cache.primitives import LocalKeyRepresentation -from dl_core.data_processing.cache.utils import SelectorCacheOptionsBuilder -from dl_core.data_processing.prepared_components.primitives import PreparedFromInfo -from dl_core.data_processing.selectors.base import BIQueryExecutionContext -from dl_core.data_processing.selectors.dataset_base import DatasetDataSelectorAsyncBase -from dl_core.query.bi_query import QueryAndResultInfo - - -LOGGER = logging.getLogger(__name__) - - -@attr.s -class DatasetCacheCommonDataSelectorAsyncBase(DatasetDataSelectorAsyncBase, metaclass=abc.ABCMeta): - """ - Abstract class that implement base cache-related operations: - - Cache key calculation - - TTL determination - - If cache usage is allowed in class, it will build cache options during building query execution context. - """ - - _allow_cache_usage: bool = attr.ib(default=True, kw_only=True) - _is_bleeding_edge_user: bool = attr.ib(default=False) - _cache_options_builder: SelectorCacheOptionsBuilder = attr.ib(kw_only=True) - - @_cache_options_builder.default # noqa - def _cache_options_builder_default(self) -> SelectorCacheOptionsBuilder: - return SelectorCacheOptionsBuilder( - default_ttl_config=self._service_registry.default_cache_ttl_config, - is_bleeding_edge_user=self._is_bleeding_edge_user, - us_entry_buffer=self._us_entry_buffer, - ) - - def get_data_key(self, query_execution_ctx: BIQueryExecutionContext) -> Optional[LocalKeyRepresentation]: - cache_options = query_execution_ctx.cache_options - if cache_options is not None: - return cache_options.key - return super().get_data_key(query_execution_ctx=query_execution_ctx) - - def build_query_execution_ctx( # type: ignore # TODO: fix - self, - *, - query_id: str, - query_res_info: QueryAndResultInfo, - role: DataSourceRole, - joint_dsrc_info: PreparedFromInfo, - ) -> BIQueryExecutionContext: - q_exec_ctx: BIQueryExecutionContext = super().build_query_execution_ctx( - query_id=query_id, - query_res_info=query_res_info, - role=role, - joint_dsrc_info=joint_dsrc_info, - ) - if self._allow_cache_usage: - try: - cache_options = self._cache_options_builder.get_cache_options( - joint_dsrc_info=joint_dsrc_info, - role=role, - query=q_exec_ctx.query, - user_types=q_exec_ctx.requested_bi_types, - dataset=self.dataset, - data_key=LocalKeyRepresentation(), - ) - - return attr.evolve( - q_exec_ctx, - cache_options=cache_options, - ) - except CachePreparationFailed: - LOGGER.exception("Cache preparation failed") - # Do not fail the request though (very likely it will still fail) - - return q_exec_ctx diff --git a/lib/dl_core/dl_core/data_processing/selectors/dataset_cached.py b/lib/dl_core/dl_core/data_processing/selectors/dataset_cached.py deleted file mode 100644 index 81a6e2e72..000000000 --- a/lib/dl_core/dl_core/data_processing/selectors/dataset_cached.py +++ /dev/null @@ -1,106 +0,0 @@ -from __future__ import annotations - -import logging -import time -from typing import ( - TYPE_CHECKING, - Optional, -) - -import attr - -from dl_api_commons.reporting.models import QueryExecutionCacheInfoReportingRecord -from dl_constants.enums import DataSourceRole -from dl_core.data_processing.cache.processing_helper import ( - CacheProcessingHelper, - CacheSituation, -) -from dl_core.data_processing.selectors.base import BIQueryExecutionContext -from dl_core.data_processing.selectors.dataset_cache_base import DatasetCacheCommonDataSelectorAsyncBase -from dl_core.data_processing.selectors.db import DatasetDbDataSelectorAsync - - -if TYPE_CHECKING: - from dl_core.data_processing.types import TValuesChunkStream - from dl_core.us_connection_base import ExecutorBasedMixin - -LOGGER = logging.getLogger(__name__) - - -@attr.s -class CachedDatasetDataSelectorAsync(DatasetCacheCommonDataSelectorAsyncBase): - """ - Asynchronous cached dataset data selector - - :param allow_cache_usage: if ``True``, cache **might** be used. Otherwise, - neither read nor write will be executed. - """ - - _allow_cache_usage: bool = attr.ib(default=True, kw_only=True) - # cache-wrapped selector - _db_selector: DatasetDbDataSelectorAsync = attr.ib(init=False) - _cache_helper: Optional[CacheProcessingHelper] = attr.ib(init=False, default=None) - - def __attrs_post_init__(self) -> None: - self._db_selector = DatasetDbDataSelectorAsync( - dataset=self.dataset, - service_registry=self._service_registry, - us_entry_buffer=self._us_entry_buffer, - ) - ds_id = self.dataset.uuid - if ds_id: - self._cache_helper = CacheProcessingHelper( - entity_id=ds_id, - service_registry=self.service_registry, - ) - - async def execute_query_context( - self, - role: DataSourceRole, - query_execution_ctx: BIQueryExecutionContext, - row_count_hard_limit: Optional[int] = None, - ) -> Optional[TValuesChunkStream]: - async def _request_db() -> Optional[TValuesChunkStream]: - result_iter = await self._db_selector.execute_query_context( - role=role, query_execution_ctx=query_execution_ctx, row_count_hard_limit=row_count_hard_limit - ) - - if result_iter is None: - return None - - # Is this still necessary, after passing `row_count_hard_limit` to the `db_selector`? - if row_count_hard_limit is not None: - result_iter = result_iter.limit(max_count=row_count_hard_limit) - - return result_iter - - cache_helper = self._cache_helper - cache_options = query_execution_ctx.cache_options - if cache_helper is None or cache_options is None: - # cache not applicable, call db selector directly - return await _request_db() - - target_connection: ExecutorBasedMixin = query_execution_ctx.target_connection - # TODO: make this env-configurable through settings. - use_locked_cache = target_connection.use_locked_cache - - cache_full_hit = None - try: - sit, result_iter = await cache_helper.run_with_cache( - generate_func=_request_db, # type: ignore # TODO: fix - cache_options=cache_options, - use_locked_cache=use_locked_cache, - ) - if sit == CacheSituation.full_hit: - cache_full_hit = True - elif sit == CacheSituation.generated: - cache_full_hit = False - finally: - query_cache_rr = QueryExecutionCacheInfoReportingRecord( - query_id=query_execution_ctx.query_id, - cache_full_hit=cache_full_hit, - timestamp=time.time(), - ) - self.reporting_registry.save_reporting_record(query_cache_rr) - - return result_iter # type: ignore # TODO: fix diff --git a/lib/dl_core/dl_core/data_processing/selectors/dataset_cached_lazy.py b/lib/dl_core/dl_core/data_processing/selectors/dataset_cached_lazy.py deleted file mode 100644 index 57a7f4a1f..000000000 --- a/lib/dl_core/dl_core/data_processing/selectors/dataset_cached_lazy.py +++ /dev/null @@ -1,78 +0,0 @@ -from __future__ import annotations - -import logging -from typing import ( - TYPE_CHECKING, - List, - Optional, - Sequence, -) - -import attr - -from dl_constants.enums import DataSourceRole -from dl_core.data_processing.selectors.base import BIQueryExecutionContext -from dl_core.data_processing.selectors.db import DatasetDbDataSelectorAsync -from dl_core.data_processing.streaming import ( - AsyncChunked, - AsyncChunkedBase, - LazyAsyncChunked, -) - - -if TYPE_CHECKING: - from dl_constants.types import TBIDataValue - - -LOGGER = logging.getLogger(__name__) - - -@attr.s -class LazyCachedDatasetDataSelectorAsync(DatasetDbDataSelectorAsync): - """ - Lazy asynchronous cached dataset data selector - """ - - _active_queries: List[BIQueryExecutionContext] = attr.ib(init=False, factory=list) - - # pre_exec is not redefined so that a reporting record is created - # even if the selector doesn't fetch any data in the end - - def post_exec( - self, - query_execution_ctx: BIQueryExecutionContext, - exec_exception: Optional[Exception], - ) -> None: - """Lazy behaviour: since the query has not even been executed at this point, do nothing.""" - - async def _close_and_remove_active_query(self, query_execution_ctx: BIQueryExecutionContext): # type: ignore # TODO: fix - if query_execution_ctx in self._active_queries: - self._active_queries.remove(query_execution_ctx) - super().post_exec(query_execution_ctx=query_execution_ctx, exec_exception=None) - - async def execute_query_context( - self, - role: DataSourceRole, - query_execution_ctx: BIQueryExecutionContext, - row_count_hard_limit: Optional[int] = None, - ) -> Optional[AsyncChunkedBase[Sequence[TBIDataValue]]]: - result_iter_awaitable = super().execute_query_context( - role=role, query_execution_ctx=query_execution_ctx, row_count_hard_limit=row_count_hard_limit - ) - - async def initialize_data_stream() -> AsyncChunkedBase[List[TBIDataValue]]: - wrapped_result_iter = await result_iter_awaitable - if wrapped_result_iter is None: - wrapped_result_iter = AsyncChunked.from_chunked_iterable([[]]) - self._active_queries.append(query_execution_ctx) - return wrapped_result_iter # type: ignore # TODO: fix - - async def finalize_data_stream() -> None: - await self._close_and_remove_active_query(query_execution_ctx=query_execution_ctx) - - return LazyAsyncChunked(initializer=initialize_data_stream, finalizer=finalize_data_stream) # type: ignore # TODO: fix - - async def close(self) -> None: - """Close all active queries""" - while self._active_queries: - await self._close_and_remove_active_query(query_execution_ctx=self._active_queries[-1]) diff --git a/lib/dl_core/dl_core/data_processing/selectors/db.py b/lib/dl_core/dl_core/data_processing/selectors/db.py index 72d0212a6..56753cb36 100644 --- a/lib/dl_core/dl_core/data_processing/selectors/db.py +++ b/lib/dl_core/dl_core/data_processing/selectors/db.py @@ -15,6 +15,7 @@ from dl_core.data_processing.streaming import ( AsyncChunked, AsyncChunkedBase, + LazyAsyncChunked, ) import dl_core.exc as exc from dl_core.us_connection_base import ExecutorBasedMixin @@ -28,6 +29,26 @@ @attr.s class DatasetDbDataSelectorAsync(DatasetDataSelectorAsyncBase): """Async selector that fetches data from the database""" + # TODO: Merge all selector logic into data processors + + _active_queries: list[BIQueryExecutionContext] = attr.ib(init=False, factory=list) + + def post_exec( + self, + query_execution_ctx: BIQueryExecutionContext, + exec_exception: Optional[Exception], + ) -> None: + """Lazy behaviour: since the query has not even been executed at this point, do nothing.""" + + async def close(self) -> None: + """Close all active queries""" + while self._active_queries: + await self._close_and_remove_active_query(query_execution_ctx=self._active_queries[-1]) + + async def _close_and_remove_active_query(self, query_execution_ctx: BIQueryExecutionContext): # type: ignore # TODO: fix + if query_execution_ctx in self._active_queries: + self._active_queries.remove(query_execution_ctx) + super().post_exec(query_execution_ctx=query_execution_ctx, exec_exception=None) async def execute_query_context( self, @@ -55,4 +76,13 @@ async def execute_query_context( chunk_size=None, ) ) - return AsyncChunked(chunked_data=exec_result.result) + wrapped_result_iter = AsyncChunked(chunked_data=exec_result.result) + + async def initialize_data_stream() -> AsyncChunkedBase[list[TBIDataValue]]: + self._active_queries.append(query_execution_ctx) + return wrapped_result_iter # type: ignore # TODO: fix + + async def finalize_data_stream() -> None: + await self._close_and_remove_active_query(query_execution_ctx=query_execution_ctx) + + return LazyAsyncChunked(initializer=initialize_data_stream, finalizer=finalize_data_stream) diff --git a/lib/dl_core/dl_core/services_registry/data_processor_factory.py b/lib/dl_core/dl_core/services_registry/data_processor_factory.py index 77898e64a..5da50f310 100644 --- a/lib/dl_core/dl_core/services_registry/data_processor_factory.py +++ b/lib/dl_core/dl_core/services_registry/data_processor_factory.py @@ -11,7 +11,6 @@ from dl_constants.enums import ( DataSourceRole, ProcessorType, - SelectorType, ) from dl_core.data_processing.processing.cache.processor import CacheOperationProcessor from dl_core.data_processing.processing.db_base.processor_base import ExecutorBasedOperationProcessor @@ -37,16 +36,13 @@ def _create_data_processor( allow_cache_usage: bool = True, reporting_enabled: bool = True, # SOURCE_DB-specific - selector_type: Optional[SelectorType] = None, role: Optional[DataSourceRole] = None, **kwargs: Any, ) -> ExecutorBasedOperationProcessor: - assert selector_type is not None assert role is not None selector_factory = self.services_registry.get_selector_factory() selector = selector_factory.get_dataset_selector( dataset=dataset, - selector_type=selector_type, allow_cache_usage=False, # Use data processor-level cache us_entry_buffer=us_entry_buffer, ) diff --git a/lib/dl_core/dl_core/services_registry/selector_factory.py b/lib/dl_core/dl_core/services_registry/selector_factory.py index 784e00bdb..db3c8493e 100644 --- a/lib/dl_core/dl_core/services_registry/selector_factory.py +++ b/lib/dl_core/dl_core/services_registry/selector_factory.py @@ -11,10 +11,8 @@ import attr from dl_api_commons.base_models import RequestContextInfo -from dl_constants.enums import SelectorType from dl_core.data_processing.selectors.dataset_base import DatasetDataSelectorAsyncBase -from dl_core.data_processing.selectors.dataset_cached import CachedDatasetDataSelectorAsync -from dl_core.data_processing.selectors.dataset_cached_lazy import LazyCachedDatasetDataSelectorAsync +from dl_core.data_processing.selectors.db import DatasetDbDataSelectorAsync from dl_core.us_dataset import Dataset from dl_core.us_manager.local_cache import USEntryBuffer from dl_core.utils import FutureRef @@ -32,7 +30,6 @@ class SelectorFactory(metaclass=abc.ABCMeta): def get_dataset_selector( self, dataset: Dataset, - selector_type: SelectorType, *, us_entry_buffer: USEntryBuffer, allow_cache_usage: bool = True, # TODO: Remove cache from selectors @@ -60,14 +57,12 @@ def rci(self) -> RequestContextInfo: def get_dataset_selector( self, dataset: Dataset, - selector_type: SelectorType, *, us_entry_buffer: USEntryBuffer, allow_cache_usage: bool = True, ) -> DatasetDataSelectorAsyncBase: selector = self._create_dataset_selector( dataset=dataset, - selector_type=selector_type, us_entry_buffer=us_entry_buffer, allow_cache_usage=allow_cache_usage, ) @@ -79,7 +74,6 @@ def get_dataset_selector( def _create_dataset_selector( self, dataset: Dataset, - selector_type: SelectorType, *, us_entry_buffer: USEntryBuffer, allow_cache_usage: bool = True, @@ -104,26 +98,14 @@ class DefaultSelectorFactory(BaseClosableSelectorFactory): def _create_dataset_selector( self, dataset: Dataset, - selector_type: SelectorType, *, us_entry_buffer: USEntryBuffer, allow_cache_usage: bool = True, ) -> DatasetDataSelectorAsyncBase: - if selector_type == SelectorType.CACHED: - return CachedDatasetDataSelectorAsync( # type: ignore # TODO: fix - dataset=dataset, - service_registry=self.services_registry, - allow_cache_usage=allow_cache_usage, - is_bleeding_edge_user=self._is_bleeding_edge_user, - us_entry_buffer=us_entry_buffer, - ) - elif selector_type == SelectorType.CACHED_LAZY: - return LazyCachedDatasetDataSelectorAsync( # type: ignore # TODO: fix + return DatasetDbDataSelectorAsync( dataset=dataset, service_registry=self.services_registry, # allow_cache_usage=allow_cache_usage, # is_bleeding_edge_user=self._is_bleeding_edge_user, us_entry_buffer=us_entry_buffer, ) - else: - raise NotImplementedError(f"Creation of selector with type {selector_type} is not supported") diff --git a/lib/dl_core_testing/dl_core_testing/data.py b/lib/dl_core_testing/dl_core_testing/data.py index 829328540..807048fcb 100644 --- a/lib/dl_core_testing/dl_core_testing/data.py +++ b/lib/dl_core_testing/dl_core_testing/data.py @@ -14,7 +14,6 @@ from dl_constants.enums import ( DataSourceRole, ProcessorType, - SelectorType, ) from dl_core.components.accessor import DatasetComponentAccessor from dl_core.components.ids import AvatarId @@ -25,7 +24,6 @@ DownloadOp, JoinOp, ) -from dl_core.data_processing.selectors.base import DataSelectorAsyncBase from dl_core.data_processing.stream_base import ( DataRequestMetaInfo, DataSourceVS, @@ -47,9 +45,7 @@ @attr.s class DataFetcher: _dataset: Dataset = attr.ib(kw_only=True) - _selector: Optional[DataSelectorAsyncBase] = attr.ib(kw_only=True, default=None) _service_registry: Optional[ServicesRegistry] = attr.ib(kw_only=True, default=None) - _selector_type: SelectorType = attr.ib(kw_only=True, default=SelectorType.CACHED) _us_manager: Optional[USManagerBase] = attr.ib(kw_only=True, default=None) # FIXME: Legacy; remove _us_entry_buffer: USEntryBuffer = attr.ib(kw_only=True) _ds_accessor: DatasetComponentAccessor = attr.ib(init=False) @@ -64,16 +60,6 @@ def _make_us_entry_buffer(self) -> USEntryBuffer: def _make_ds_accessor(self) -> DatasetComponentAccessor: return DatasetComponentAccessor(dataset=self._dataset) - def __attrs_post_init__(self) -> None: - if self._selector is None: - assert self._service_registry is not None - sel_factory = self._service_registry.get_selector_factory() - self._selector = sel_factory.get_dataset_selector( - dataset=self._dataset, - selector_type=self._selector_type, - us_entry_buffer=self._us_entry_buffer, - ) - def _get_avatar_virtual_data_stream( self, avatar_id: AvatarId, @@ -126,7 +112,6 @@ async def get_data_stream_async( data_processor = await dp_factory.get_data_processor( dataset=self._dataset, processor_type=ProcessorType.SOURCE_DB, - selector_type=self._selector_type, role=role, us_entry_buffer=self._us_entry_buffer, allow_cache_usage=allow_cache_usage, diff --git a/lib/dl_query_processing/dl_query_processing/execution/executor.py b/lib/dl_query_processing/dl_query_processing/execution/executor.py index b7e09c048..c00541b50 100644 --- a/lib/dl_query_processing/dl_query_processing/execution/executor.py +++ b/lib/dl_query_processing/dl_query_processing/execution/executor.py @@ -17,7 +17,6 @@ from dl_constants.enums import ( DataSourceRole, ProcessorType, - SelectorType, ) from dl_constants.types import TBIDataRow from dl_core.components.ids import AvatarId @@ -68,7 +67,6 @@ class SourceDbBaseSourceInfo: @attr.s class QueryExecutor(QueryExecutorBase): _dataset: Dataset = attr.ib(kw_only=True) - _selector_type: SelectorType = attr.ib(kw_only=True) _compeng_processor_type: ProcessorType = attr.ib(kw_only=True) _source_db_processor_type: ProcessorType = attr.ib(kw_only=True) _allow_cache_usage: bool = attr.ib(kw_only=True) @@ -97,7 +95,6 @@ async def _get_source_db_data_processor(self, role: DataSourceRole) -> Operation processor_type=self._source_db_processor_type, allow_cache_usage=self._allow_cache_usage, role=role, - selector_type=self._selector_type, ) async def _get_data_processor(