diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/core/base.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/core/base.py index db5506f3c..53a09ef4d 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/core/base.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/core/base.py @@ -110,7 +110,7 @@ def conn_sync_service_registry( conn_bi_context: RequestContextInfo, task_processor_factory: TaskProcessorFactory, ) -> ServicesRegistry: - return self.service_registry_factory( + return self.make_service_registry( conn_exec_factory_async_env=False, conn_bi_context=conn_bi_context, task_processor_factory=task_processor_factory, @@ -122,7 +122,7 @@ def conn_async_service_registry( conn_bi_context: RequestContextInfo, task_processor_factory: TaskProcessorFactory, ) -> ServicesRegistry: - return self.service_registry_factory( + return self.make_service_registry( conn_exec_factory_async_env=True, conn_bi_context=conn_bi_context, task_processor_factory=task_processor_factory, diff --git a/lib/dl_core/dl_core/services_registry/sr_factories.py b/lib/dl_core/dl_core/services_registry/sr_factories.py index bf4652746..5aeb4bf38 100644 --- a/lib/dl_core/dl_core/services_registry/sr_factories.py +++ b/lib/dl_core/dl_core/services_registry/sr_factories.py @@ -37,7 +37,10 @@ ) from dl_core.us_manager.mutation_cache.usentry_mutation_cache_factory import USEntryMutationCacheFactory from dl_core.utils import FutureRef -from dl_task_processor.processor import ARQTaskProcessorFactory +from dl_task_processor.processor import ( + ARQTaskProcessorFactory, + TaskProcessorFactory, +) if TYPE_CHECKING: @@ -100,9 +103,16 @@ class DefaultSRFactory(SRFactory[SERVICE_REGISTRY_TV]): # type: ignore # TODO: rqe_caches_settings: Optional[RQECachesSetting] = attr.ib(default=None) required_services: set[RequiredService] = attr.ib(factory=set) inst_specific_sr_factory: Optional[InstallationSpecificServiceRegistryFactory] = attr.ib(default=None) + task_processor_factory: Optional[TaskProcessorFactory] = attr.ib() service_registry_cls: ClassVar[Type[SERVICE_REGISTRY_TV]] = DefaultServicesRegistry # type: ignore # TODO: fix + @task_processor_factory.default + def _make_task_processor_factory(self) -> Optional[TaskProcessorFactory]: + if self.redis_pool_settings: + return ARQTaskProcessorFactory(redis_pool_settings=self.redis_pool_settings) + return None + def is_bleeding_edge_user(self, request_context_info: RequestContextInfo) -> bool: return request_context_info.user_name in self.bleeding_edge_users @@ -172,11 +182,7 @@ def make_service_registry( ) if self.file_uploader_settings else None, - task_processor_factory=ARQTaskProcessorFactory( - redis_pool_settings=self.redis_pool_settings, - ) - if self.redis_pool_settings - else None, + task_processor_factory=self.task_processor_factory, rqe_caches_settings=self.rqe_caches_settings, required_services=self.required_services, inst_specific_sr=( diff --git a/lib/dl_core_testing/dl_core_testing/testcases/service_base.py b/lib/dl_core_testing/dl_core_testing/testcases/service_base.py index 229c2b5ba..695813f0c 100644 --- a/lib/dl_core_testing/dl_core_testing/testcases/service_base.py +++ b/lib/dl_core_testing/dl_core_testing/testcases/service_base.py @@ -18,7 +18,12 @@ from dl_constants.enums import ConnectionType from dl_core.connections_security.base import InsecureConnectionSecurityManager from dl_core.services_registry.conn_executor_factory import DefaultConnExecutorFactory +from dl_core.services_registry.env_manager_factory import InsecureEnvManagerFactory from dl_core.services_registry.inst_specific_sr import InstallationSpecificServiceRegistryFactory +from dl_core.services_registry.sr_factories import ( + DefaultSRFactory, + SRFactory, +) from dl_core.services_registry.top_level import ( DefaultServicesRegistry, ServicesRegistry, @@ -69,7 +74,33 @@ def conn_bi_context(self) -> RequestContextInfo: def conn_exec_factory_async_env(self) -> bool: return False - def service_registry_factory( + def make_service_registry_factory(self, async_env: bool) -> SRFactory: + return DefaultSRFactory( + async_env=async_env, + rqe_config=RQEConfig.get_default(), # Not used because RQE is disabled + connectors_settings={self.conn_type: self.connection_settings} if self.connection_settings else {}, + inst_specific_sr_factory=self.inst_specific_sr_factory, + env_manager_factory=InsecureEnvManagerFactory(), + force_non_rqe_mode=True, + ) + + def make_service_registry( + self, + conn_exec_factory_async_env: bool, + conn_bi_context: RequestContextInfo, + **kwargs: Any, + ) -> ServicesRegistry: + sr_factory = self.make_service_registry_factory(async_env=conn_exec_factory_async_env) + return sr_factory.make_service_registry( + request_context_info=conn_bi_context, + mutations_cache_factory=DefaultUSEntryMutationCacheFactory(), + reporting_registry=DefaultReportingRegistry( + rci=conn_bi_context, + ), + **kwargs, + ) + + def make_service_registry_legacy( self, conn_exec_factory_async_env: bool, conn_bi_context: RequestContextInfo, @@ -106,7 +137,7 @@ def conn_sync_service_registry( self, conn_bi_context: RequestContextInfo, ) -> ServicesRegistry: - return self.service_registry_factory( + return self.make_service_registry( conn_exec_factory_async_env=False, conn_bi_context=conn_bi_context, ) @@ -116,7 +147,7 @@ def conn_async_service_registry( self, conn_bi_context: RequestContextInfo, ) -> ServicesRegistry: - return self.service_registry_factory( + return self.make_service_registry( conn_exec_factory_async_env=True, conn_bi_context=conn_bi_context, )