From c580a379695e419752d478438131475d68e81d29 Mon Sep 17 00:00:00 2001 From: Grigory Statsenko Date: Thu, 16 Nov 2023 13:30:39 +0100 Subject: [PATCH] Refactored service registry creation in core tests --- .../db/base/core/base.py | 4 +- .../dl_core/services_registry/sr_factories.py | 22 ++++--- .../dl_core_testing/testcases/service_base.py | 63 ++++++++++--------- 3 files changed, 52 insertions(+), 37 deletions(-) diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/core/base.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/core/base.py index db5506f3c..53a09ef4d 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/core/base.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/core/base.py @@ -110,7 +110,7 @@ def conn_sync_service_registry( conn_bi_context: RequestContextInfo, task_processor_factory: TaskProcessorFactory, ) -> ServicesRegistry: - return self.service_registry_factory( + return self.make_service_registry( conn_exec_factory_async_env=False, conn_bi_context=conn_bi_context, task_processor_factory=task_processor_factory, @@ -122,7 +122,7 @@ def conn_async_service_registry( conn_bi_context: RequestContextInfo, task_processor_factory: TaskProcessorFactory, ) -> ServicesRegistry: - return self.service_registry_factory( + return self.make_service_registry( conn_exec_factory_async_env=True, conn_bi_context=conn_bi_context, task_processor_factory=task_processor_factory, diff --git a/lib/dl_core/dl_core/services_registry/sr_factories.py b/lib/dl_core/dl_core/services_registry/sr_factories.py index bf4652746..55d68fb5f 100644 --- a/lib/dl_core/dl_core/services_registry/sr_factories.py +++ b/lib/dl_core/dl_core/services_registry/sr_factories.py @@ -37,7 +37,10 @@ ) from dl_core.us_manager.mutation_cache.usentry_mutation_cache_factory import USEntryMutationCacheFactory from dl_core.utils import FutureRef -from dl_task_processor.processor import ARQTaskProcessorFactory +from dl_task_processor.processor import ( + ARQTaskProcessorFactory, + TaskProcessorFactory, +) if TYPE_CHECKING: @@ -54,6 +57,7 @@ from dl_core.services_registry.inst_specific_sr import InstallationSpecificServiceRegistryFactory from dl_core.services_registry.typing import ConnectOptionsFactory from dl_core.us_connection_base import ExecutorBasedMixin + from dl_utils.aio import ContextVarExecutor LOGGER = logging.getLogger(__name__) @@ -100,9 +104,17 @@ class DefaultSRFactory(SRFactory[SERVICE_REGISTRY_TV]): # type: ignore # TODO: rqe_caches_settings: Optional[RQECachesSetting] = attr.ib(default=None) required_services: set[RequiredService] = attr.ib(factory=set) inst_specific_sr_factory: Optional[InstallationSpecificServiceRegistryFactory] = attr.ib(default=None) + task_processor_factory: Optional[TaskProcessorFactory] = attr.ib() + tpe: Optional[ContextVarExecutor] = attr.ib(default=None) service_registry_cls: ClassVar[Type[SERVICE_REGISTRY_TV]] = DefaultServicesRegistry # type: ignore # TODO: fix + @task_processor_factory.default + def _make_task_processor_factory(self) -> Optional[TaskProcessorFactory]: + if self.redis_pool_settings: + return ARQTaskProcessorFactory(redis_pool_settings=self.redis_pool_settings) + return None + def is_bleeding_edge_user(self, request_context_info: RequestContextInfo) -> bool: return request_context_info.user_name in self.bleeding_edge_users @@ -116,7 +128,7 @@ def make_conn_executor_factory( LOGGER.info("ATTENTION! It's bleeding edge user") return DefaultConnExecutorFactory( async_env=self.async_env, - tpe=None, + tpe=self.tpe, conn_sec_mgr=self.env_manager_factory.make_security_manager(), rqe_config=self.rqe_config, services_registry_ref=sr_ref, # type: ignore # TODO: fix @@ -172,11 +184,7 @@ def make_service_registry( ) if self.file_uploader_settings else None, - task_processor_factory=ARQTaskProcessorFactory( - redis_pool_settings=self.redis_pool_settings, - ) - if self.redis_pool_settings - else None, + task_processor_factory=self.task_processor_factory, rqe_caches_settings=self.rqe_caches_settings, required_services=self.required_services, inst_specific_sr=( diff --git a/lib/dl_core_testing/dl_core_testing/testcases/service_base.py b/lib/dl_core_testing/dl_core_testing/testcases/service_base.py index 229c2b5ba..326d1c737 100644 --- a/lib/dl_core_testing/dl_core_testing/testcases/service_base.py +++ b/lib/dl_core_testing/dl_core_testing/testcases/service_base.py @@ -16,18 +16,17 @@ from dl_configs.crypto_keys import CryptoKeysConfig from dl_configs.rqe import RQEConfig from dl_constants.enums import ConnectionType -from dl_core.connections_security.base import InsecureConnectionSecurityManager -from dl_core.services_registry.conn_executor_factory import DefaultConnExecutorFactory +from dl_core.services_registry.env_manager_factory import InsecureEnvManagerFactory from dl_core.services_registry.inst_specific_sr import InstallationSpecificServiceRegistryFactory -from dl_core.services_registry.top_level import ( - DefaultServicesRegistry, - ServicesRegistry, +from dl_core.services_registry.sr_factories import ( + DefaultSRFactory, + SRFactory, ) +from dl_core.services_registry.top_level import ServicesRegistry from dl_core.united_storage_client import USAuthContextMaster from dl_core.us_manager.mutation_cache.usentry_mutation_cache_factory import DefaultUSEntryMutationCacheFactory from dl_core.us_manager.us_manager_async import AsyncUSManager from dl_core.us_manager.us_manager_sync import SyncUSManager -from dl_core.utils import FutureRef from dl_core_testing.configuration import CoreTestEnvironmentConfigurationBase from dl_core_testing.database import ( CoreDbConfig, @@ -37,6 +36,10 @@ from dl_core_testing.fixtures.dispenser import DbCsvTableDispenser from dl_db_testing.database.engine_wrapper import DbEngineConfig from dl_utils.aio import ContextVarExecutor +from dl_task_processor.processor import ( + DummyTaskProcessorFactory, + TaskProcessorFactory, +) class USConfig(NamedTuple): @@ -69,44 +72,48 @@ def conn_bi_context(self) -> RequestContextInfo: def conn_exec_factory_async_env(self) -> bool: return False - def service_registry_factory( + def make_service_registry_factory( + self, + async_env: bool, + task_processor_factory: TaskProcessorFactory = DummyTaskProcessorFactory(), + ) -> SRFactory: + return DefaultSRFactory( + async_env=async_env, + rqe_config=RQEConfig.get_default(), # Not used because RQE is disabled + connectors_settings={self.conn_type: self.connection_settings} if self.connection_settings else {}, + inst_specific_sr_factory=self.inst_specific_sr_factory, + env_manager_factory=InsecureEnvManagerFactory(), + force_non_rqe_mode=True, + tpe=ContextVarExecutor(), + task_processor_factory=task_processor_factory, + ) + + def make_service_registry( self, conn_exec_factory_async_env: bool, conn_bi_context: RequestContextInfo, + task_processor_factory: TaskProcessorFactory = DummyTaskProcessorFactory(), **kwargs: Any, ) -> ServicesRegistry: - sr_future_ref: FutureRef[ServicesRegistry] = FutureRef() - service_registry = DefaultServicesRegistry( - rci=conn_bi_context, + sr_factory = self.make_service_registry_factory( + async_env=conn_exec_factory_async_env, + task_processor_factory=task_processor_factory, + ) + return sr_factory.make_service_registry( + request_context_info=conn_bi_context, mutations_cache_factory=DefaultUSEntryMutationCacheFactory(), reporting_registry=DefaultReportingRegistry( rci=conn_bi_context, ), - conn_exec_factory=DefaultConnExecutorFactory( - async_env=conn_exec_factory_async_env, - force_non_rqe_mode=True, - rqe_config=RQEConfig.get_default(), # Not used because RQE is disabled - services_registry_ref=sr_future_ref, - conn_sec_mgr=InsecureConnectionSecurityManager(), - tpe=ContextVarExecutor(), - ), - connectors_settings={self.conn_type: self.connection_settings} if self.connection_settings else {}, - inst_specific_sr=( - self.inst_specific_sr_factory.get_inst_specific_sr(sr_future_ref) - if self.inst_specific_sr_factory is not None - else None - ), **kwargs, ) - sr_future_ref.fulfill(service_registry) - return service_registry @pytest.fixture(scope="session") def conn_sync_service_registry( self, conn_bi_context: RequestContextInfo, ) -> ServicesRegistry: - return self.service_registry_factory( + return self.make_service_registry( conn_exec_factory_async_env=False, conn_bi_context=conn_bi_context, ) @@ -116,7 +123,7 @@ def conn_async_service_registry( self, conn_bi_context: RequestContextInfo, ) -> ServicesRegistry: - return self.service_registry_factory( + return self.make_service_registry( conn_exec_factory_async_env=True, conn_bi_context=conn_bi_context, )