diff --git a/lib/dl_api_lib/dl_api_lib_tests/db/base.py b/lib/dl_api_lib/dl_api_lib_tests/db/base.py index 1233b7e04..ec59880bb 100644 --- a/lib/dl_api_lib/dl_api_lib_tests/db/base.py +++ b/lib/dl_api_lib/dl_api_lib_tests/db/base.py @@ -24,7 +24,7 @@ class DefaultApiTestBase(DataApiTestBase, DatasetTestBase, ConnectionTestBase): """The knowledge that this is a ClickHouse connector should not go beyond this class""" - bi_compeng_pg_on = True + compeng_enabled = True conn_type = CONNECTION_TYPE_CLICKHOUSE raw_sql_level: ClassVar[RawSQLLevel] = RawSQLLevel.off diff --git a/lib/dl_api_lib/dl_api_lib_tests/db/config.py b/lib/dl_api_lib/dl_api_lib_tests/db/config.py index c77c1f7b0..2dd5670e3 100644 --- a/lib/dl_api_lib/dl_api_lib_tests/db/config.py +++ b/lib/dl_api_lib/dl_api_lib_tests/db/config.py @@ -14,6 +14,10 @@ port_us_pg_5432=get_test_container_hostport("db-postgres", fallback_port=52509).port, us_master_token="AC1ofiek8coB", core_connector_ep_names=["clickhouse", "postgresql"], + compeng_url=( + f"postgresql://us:us@" + f'{get_test_container_hostport("db-postgres", fallback_port=52509).as_pair()}/us-db-ci_purgeable' + ), ) @@ -35,8 +39,4 @@ class CoreConnectionSettings: api_connector_ep_names=["clickhouse", "postgresql"], core_test_config=CORE_TEST_CONFIG, ext_query_executer_secret_key="_some_test_secret_key_", - bi_compeng_pg_url=( - f"postgresql://us:us@" - f'{get_test_container_hostport("db-postgres", fallback_port=52509).as_pair()}/us-db-ci_purgeable' - ), ) diff --git a/lib/dl_api_lib_testing/dl_api_lib_testing/base.py b/lib/dl_api_lib_testing/dl_api_lib_testing/base.py index 068b2db29..bbb23b8ed 100644 --- a/lib/dl_api_lib_testing/dl_api_lib_testing/base.py +++ b/lib/dl_api_lib_testing/dl_api_lib_testing/base.py @@ -52,7 +52,6 @@ class ApiTestBase(abc.ABC): Base class defining the basic fixtures of bi-api tests """ - bi_compeng_pg_on: ClassVar[bool] = True query_processing_mode: ClassVar[QueryProcessingMode] = QueryProcessingMode.basic @pytest.fixture(scope="function", autouse=True) @@ -115,8 +114,8 @@ def create_control_api_settings( BI_API_CONNECTOR_WHITELIST=bi_test_config.get_api_library_config().api_connector_ep_names, CORE_CONNECTOR_WHITELIST=core_test_config.get_core_library_config().core_connector_ep_names, RQE_CONFIG=rqe_config_subprocess, - BI_COMPENG_PG_ON=cls.bi_compeng_pg_on, - BI_COMPENG_PG_URL=bi_test_config.bi_compeng_pg_url, + BI_COMPENG_PG_ON=cls.compeng_enabled, + BI_COMPENG_PG_URL=core_test_config.get_compeng_url(), DO_DSRC_IDX_FETCH=True, FIELD_ID_GENERATOR_TYPE=FieldIdGeneratorType.suffix, REDIS_ARQ=redis_setting_maker.get_redis_settings_arq(), diff --git a/lib/dl_api_lib_testing/dl_api_lib_testing/configuration.py b/lib/dl_api_lib_testing/dl_api_lib_testing/configuration.py index d61dee412..f49c09f7b 100644 --- a/lib/dl_api_lib_testing/dl_api_lib_testing/configuration.py +++ b/lib/dl_api_lib_testing/dl_api_lib_testing/configuration.py @@ -20,8 +20,6 @@ class ApiTestEnvironmentConfiguration: mutation_cache_enabled: bool = attr.ib(default=True) - bi_compeng_pg_url: str = attr.ib(default="") - file_uploader_api_host: str = attr.ib(default="http://127.0.0.1") file_uploader_api_port: int = attr.ib(default=9999) diff --git a/lib/dl_api_lib_testing/dl_api_lib_testing/data_api_base.py b/lib/dl_api_lib_testing/dl_api_lib_testing/data_api_base.py index b10309931..f104cb4fc 100644 --- a/lib/dl_api_lib_testing/dl_api_lib_testing/data_api_base.py +++ b/lib/dl_api_lib_testing/dl_api_lib_testing/data_api_base.py @@ -79,8 +79,8 @@ def create_data_api_settings( CORE_CONNECTOR_WHITELIST=core_test_config.get_core_library_config().core_connector_ep_names, MUTATIONS_CACHES_ON=cls.mutation_caches_on, CACHES_REDIS=redis_setting_maker.get_redis_settings_cache(), - BI_COMPENG_PG_ON=cls.bi_compeng_pg_on, - BI_COMPENG_PG_URL=bi_test_config.bi_compeng_pg_url, + BI_COMPENG_PG_ON=cls.compeng_enabled, + BI_COMPENG_PG_URL=core_test_config.get_compeng_url(), FIELD_ID_GENERATOR_TYPE=FieldIdGeneratorType.suffix, FILE_UPLOADER_BASE_URL=f"{bi_test_config.file_uploader_api_host}:{bi_test_config.file_uploader_api_port}", FILE_UPLOADER_MASTER_TOKEN="qwerty", diff --git a/lib/dl_connector_bigquery/dl_connector_bigquery_tests/ext/api/base.py b/lib/dl_connector_bigquery/dl_connector_bigquery_tests/ext/api/base.py index 485180b7a..838744ee4 100644 --- a/lib/dl_connector_bigquery/dl_connector_bigquery_tests/ext/api/base.py +++ b/lib/dl_connector_bigquery/dl_connector_bigquery_tests/ext/api/base.py @@ -15,7 +15,7 @@ class BigQueryConnectionTestBase(BaseBigQueryTestClass, ConnectionTestBase): conn_type = CONNECTION_TYPE_BIGQUERY - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def bi_test_config(self) -> ApiTestEnvironmentConfiguration: diff --git a/lib/dl_connector_bitrix_gds/dl_connector_bitrix_gds_tests/ext/config.py b/lib/dl_connector_bitrix_gds/dl_connector_bitrix_gds_tests/ext/config.py index 8525583f2..ee58458e0 100644 --- a/lib/dl_connector_bitrix_gds/dl_connector_bitrix_gds_tests/ext/config.py +++ b/lib/dl_connector_bitrix_gds/dl_connector_bitrix_gds_tests/ext/config.py @@ -3,6 +3,8 @@ from dl_testing.containers import get_test_container_hostport +COMPENG_URL = f'postgresql://datalens:qwerty@{get_test_container_hostport("db-postgres-13", fallback_port=52301).as_pair()}/test_data' + # Infra settings CORE_TEST_CONFIG = DefaultCoreTestConfiguration( host_us_http=get_test_container_hostport("us", fallback_port=51911).host, @@ -11,14 +13,13 @@ port_us_pg_5432=get_test_container_hostport("pg-us", fallback_port=51910).port, us_master_token="AC1ofiek8coB", core_connector_ep_names=["bitrix_gds", "postgresql"], + compeng_url=COMPENG_URL, ) -COMPENG_URL = f'postgresql://datalens:qwerty@{get_test_container_hostport("db-postgres-13", fallback_port=52301).as_pair()}/test_data' API_TEST_CONFIG = ApiTestEnvironmentConfiguration( api_connector_ep_names=["bitrix_gds", "postgresql"], core_test_config=CORE_TEST_CONFIG, ext_query_executer_secret_key="_some_test_secret_key_", - bi_compeng_pg_url=COMPENG_URL, ) BITRIX_PORTALS = dict( diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/api/base.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/api/base.py index 9b43bc39f..3af3579b4 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/api/base.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/api/base.py @@ -66,7 +66,7 @@ async def get_internal_params(self, src: FileSourceDesc) -> SourceInternalParams class CHS3ConnectionApiTestBase(BaseCHS3TestClass[FILE_CONN_TV], ConnectionTestBase, metaclass=abc.ABCMeta): - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def bi_test_config(self) -> ApiTestEnvironmentConfiguration: diff --git a/lib/dl_connector_clickhouse/dl_connector_clickhouse_tests/db/api/base.py b/lib/dl_connector_clickhouse/dl_connector_clickhouse_tests/db/api/base.py index db7cda468..49325561c 100644 --- a/lib/dl_connector_clickhouse/dl_connector_clickhouse_tests/db/api/base.py +++ b/lib/dl_connector_clickhouse/dl_connector_clickhouse_tests/db/api/base.py @@ -17,7 +17,7 @@ class ClickHouseConnectionTestBase(BaseClickHouseTestClass, ConnectionTestBase): conn_type = CONNECTION_TYPE_CLICKHOUSE - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def bi_test_config(self) -> ApiTestEnvironmentConfiguration: diff --git a/lib/dl_connector_greenplum/dl_connector_greenplum_tests/db/api/base.py b/lib/dl_connector_greenplum/dl_connector_greenplum_tests/db/api/base.py index c6b314831..838f8c590 100644 --- a/lib/dl_connector_greenplum/dl_connector_greenplum_tests/db/api/base.py +++ b/lib/dl_connector_greenplum/dl_connector_greenplum_tests/db/api/base.py @@ -23,7 +23,7 @@ class GreenplumConnectionTestBase(ConnectionTestBase, ServiceFixtureTextClass): conn_type = CONNECTION_TYPE_GREENPLUM core_test_config = CORE_TEST_CONFIG - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def db_url(self) -> str: diff --git a/lib/dl_connector_metrica/dl_connector_metrica_tests/ext/api/base.py b/lib/dl_connector_metrica/dl_connector_metrica_tests/ext/api/base.py index f6554747f..4baad5ce7 100644 --- a/lib/dl_connector_metrica/dl_connector_metrica_tests/ext/api/base.py +++ b/lib/dl_connector_metrica/dl_connector_metrica_tests/ext/api/base.py @@ -29,7 +29,7 @@ class MetricaConnectionTestBase(BaseMetricaTestClass, ConnectionTestBase): conn_type = CONNECTION_TYPE_METRICA_API - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def bi_test_config(self) -> ApiTestEnvironmentConfiguration: @@ -71,7 +71,7 @@ def data_api_test_params(self) -> DataApiTestParams: class AppMetricaConnectionTestBase(BaseAppMetricaTestClass, ConnectionTestBase): conn_type = CONNECTION_TYPE_APPMETRICA_API - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def bi_test_config(self) -> ApiTestEnvironmentConfiguration: diff --git a/lib/dl_connector_mssql/dl_connector_mssql_tests/db/api/base.py b/lib/dl_connector_mssql/dl_connector_mssql_tests/db/api/base.py index e4c76262a..9c5cc8ff1 100644 --- a/lib/dl_connector_mssql/dl_connector_mssql_tests/db/api/base.py +++ b/lib/dl_connector_mssql/dl_connector_mssql_tests/db/api/base.py @@ -19,7 +19,7 @@ class MSSQLConnectionTestBase(BaseMSSQLTestClass, ConnectionTestBase): conn_type = CONNECTION_TYPE_MSSQL - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def bi_test_config(self) -> ApiTestEnvironmentConfiguration: diff --git a/lib/dl_connector_mysql/dl_connector_mysql_tests/db/api/base.py b/lib/dl_connector_mysql/dl_connector_mysql_tests/db/api/base.py index 608b67608..c41209aca 100644 --- a/lib/dl_connector_mysql/dl_connector_mysql_tests/db/api/base.py +++ b/lib/dl_connector_mysql/dl_connector_mysql_tests/db/api/base.py @@ -19,7 +19,7 @@ class MySQLConnectionTestBase(BaseMySQLTestClass, ConnectionTestBase): conn_type = CONNECTION_TYPE_MYSQL - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def bi_test_config(self) -> ApiTestEnvironmentConfiguration: diff --git a/lib/dl_connector_oracle/dl_connector_oracle_tests/db/api/base.py b/lib/dl_connector_oracle/dl_connector_oracle_tests/db/api/base.py index debfe442e..f89d77fc4 100644 --- a/lib/dl_connector_oracle/dl_connector_oracle_tests/db/api/base.py +++ b/lib/dl_connector_oracle/dl_connector_oracle_tests/db/api/base.py @@ -20,7 +20,7 @@ class OracleConnectionTestBase(BaseOracleTestClass, ConnectionTestBase): conn_type = CONNECTION_TYPE_ORACLE - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def bi_test_config(self) -> ApiTestEnvironmentConfiguration: diff --git a/lib/dl_connector_postgresql/dl_connector_postgresql_tests/db/api/base.py b/lib/dl_connector_postgresql/dl_connector_postgresql_tests/db/api/base.py index 77c5de051..dd78f253f 100644 --- a/lib/dl_connector_postgresql/dl_connector_postgresql_tests/db/api/base.py +++ b/lib/dl_connector_postgresql/dl_connector_postgresql_tests/db/api/base.py @@ -19,7 +19,7 @@ class PostgreSQLConnectionTestBase(BasePostgreSQLTestClass, ConnectionTestBase): conn_type = CONNECTION_TYPE_POSTGRES - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def bi_test_config(self) -> ApiTestEnvironmentConfiguration: diff --git a/lib/dl_connector_promql/dl_connector_promql_tests/db/api/base.py b/lib/dl_connector_promql/dl_connector_promql_tests/db/api/base.py index 5fd335729..733fb07ac 100644 --- a/lib/dl_connector_promql/dl_connector_promql_tests/db/api/base.py +++ b/lib/dl_connector_promql/dl_connector_promql_tests/db/api/base.py @@ -20,7 +20,7 @@ class PromQLConnectionTestBase(ConnectionTestBase): - bi_compeng_pg_on = False + compeng_enabled = False conn_type = CONNECTION_TYPE_PROMQL @pytest.fixture(scope="class") diff --git a/lib/dl_connector_snowflake/dl_connector_snowflake_tests/ext/api/base.py b/lib/dl_connector_snowflake/dl_connector_snowflake_tests/ext/api/base.py index 1cbe363fd..166d9dbc8 100644 --- a/lib/dl_connector_snowflake/dl_connector_snowflake_tests/ext/api/base.py +++ b/lib/dl_connector_snowflake/dl_connector_snowflake_tests/ext/api/base.py @@ -63,7 +63,7 @@ def dataset_params(self, sf_secrets) -> dict: class SnowFlakeDataApiTestBase(SnowFlakeDatasetTestBase, StandardizedDataApiTestBase): - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def data_api_test_params(self) -> DataApiTestParams: diff --git a/lib/dl_connector_ydb/dl_connector_ydb_tests/db/api/base.py b/lib/dl_connector_ydb/dl_connector_ydb_tests/db/api/base.py index 0a5debfbc..ebf8bc25a 100644 --- a/lib/dl_connector_ydb/dl_connector_ydb_tests/db/api/base.py +++ b/lib/dl_connector_ydb/dl_connector_ydb_tests/db/api/base.py @@ -30,7 +30,7 @@ class YDBConnectionTestBase(ConnectionTestBase): - bi_compeng_pg_on = False + compeng_enabled = False conn_type = CONNECTION_TYPE_YDB @pytest.fixture(scope="class") diff --git a/lib/dl_core/dl_core_tests/db/compeng/__init__.py b/lib/dl_core/dl_core_tests/db/compeng/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_core/dl_core_tests/db/compeng/test_compeng_cache.py b/lib/dl_core/dl_core_tests/db/compeng/test_compeng_cache.py new file mode 100644 index 000000000..28ed6ce70 --- /dev/null +++ b/lib/dl_core/dl_core_tests/db/compeng/test_compeng_cache.py @@ -0,0 +1,229 @@ +from __future__ import annotations + +import logging +import time +from typing import List +import uuid + +import pytest +import sqlalchemy as sa + +from bi_legacy_test_bundle_tests.core.utils import get_dump_request_profile_records +from dl_api_commons.reporting.models import QueryExecutionStartReportingRecord +from dl_api_commons.reporting.profiler import ( + PROFILING_LOG_NAME, + DefaultReportingProfiler, +) +from dl_constants.enums import ( + DataSourceRole, + ProcessorType, + ReportingQueryType, + UserDataType, +) +from dl_core.base_models import WorkbookEntryLocation +from dl_core.data_processing.cache.primitives import ( + DataKeyPart, + LocalKeyRepresentation, +) +from dl_core.data_processing.processing.operation import ( + BaseOp, + CalcOp, + DownloadOp, + UploadOp, +) +from dl_core.data_processing.stream_base import ( + DataRequestMetaInfo, + DataStreamAsync, +) +from dl_core.data_processing.streaming import AsyncChunked +from dl_core.query.bi_query import BIQuery +from dl_core.query.expression import ExpressionCtx +from dl_core.services_registry import ServicesRegistry +from dl_core_testing.dataset_wrappers import DatasetTestWrapper +from dl_core_tests.db.base import DefaultCoreTestClass + +from dl_connector_clickhouse.core.clickhouse_base.constants import CONNECTION_TYPE_CLICKHOUSE +from dl_connector_clickhouse.core.clickhouse_base.dto import ClickHouseConnDTO + + +class TestCompengCache(DefaultCoreTestClass): + caches_enabled = True + compeng_enabled = True + + @pytest.mark.asyncio + async def test_compeng_cache( + self, + saved_dataset, + conn_bi_context, + caplog, + sync_us_manager, + caches_redis_client_factory, + data_processor_service_factory, + ): + dataset = saved_dataset + us_manager = sync_us_manager + ds_wrapper = DatasetTestWrapper(dataset=dataset, us_manager=us_manager) + + names = ["int_value", "str_value"] + user_types = [UserDataType.integer, UserDataType.string] + + def get_operations(coeff: int) -> List[BaseOp]: + """ + Instructions for compeng. + ``int_value`` is multiplied by ``coeff`` + """ + return [ + UploadOp( + result_id="1", + source_stream_id="1", + dest_stream_id="2", + alias=str(uuid.uuid4()), + ), + CalcOp( + result_id="2", + source_stream_id="2", + dest_stream_id="3", + alias=str(uuid.uuid4()), + bi_query=BIQuery( + select_expressions=[ + ExpressionCtx( + expression=sa.literal_column(names[0]) * sa.literal(coeff), + alias="value_1", + user_type=UserDataType.integer, + ), + ExpressionCtx( + expression=sa.literal_column(names[1]), + alias="value_2", + user_type=UserDataType.string, + ), + ], + ), + data_key_data="__whatever", # just a random hashable + ), + DownloadOp( + source_stream_id="3", + dest_stream_id="4", + ), + ] + + def get_expected_data(length: int, coeff: int): + """ + What we expect to be returned from compeng. + ``length`` entries are generated + ``int_value`` is multiplied by ``coeff`` + """ + return [[i * coeff, f"str_{i}"] for i in range(length)] + + async def get_data_from_processor(input_data, data_key, operations): + query_id = str(uuid.uuid4()) + sr: ServicesRegistry = self.service_registry_factory( + conn_exec_factory_async_env=True, + conn_bi_context=conn_bi_context, + caches_redis_client_factory=caches_redis_client_factory, + data_processor_service_factory=data_processor_service_factory, + ) + dto = ClickHouseConnDTO( + conn_id="123", + host="localhost", + cluster_name="cluster_name", + db_name="", + endpoint="", + multihosts=[""], + password="", + port=0, + protocol="", + username="", + ) + reporting = sr.get_reporting_registry() + workbook_id = ( + dataset.entry_key.workbook_id if isinstance(dataset.entry_key, WorkbookEntryLocation) else None + ) + reporting.save_reporting_record( + QueryExecutionStartReportingRecord( + timestamp=time.time(), + query_id=query_id, + query_type=ReportingQueryType.internal, + query="1", # doesn't matter... + connection_type=CONNECTION_TYPE_CLICKHOUSE, + dataset_id=dataset.uuid, + conn_reporting_data=dto.conn_reporting_data(), + workbook_id=workbook_id, + ) + ) + input_stream = DataStreamAsync( + id="1", + data=AsyncChunked.from_chunked_iterable([input_data]), + names=names, + user_types=user_types, + meta=DataRequestMetaInfo( + data_source_list=ds_wrapper.get_data_source_list(role=DataSourceRole.origin), + query_id=query_id, + ), + data_key=data_key, + ) + processor_factory = sr.get_data_processor_factory() + processor = await processor_factory.get_data_processor( + dataset=dataset, + us_entry_buffer=us_manager.get_entry_buffer(), + allow_cache_usage=True, + processor_type=ProcessorType.ASYNCPG, + ) + try: + output_streams = await processor.run( + streams=[input_stream], + operations=operations, + output_stream_ids=[operations[-1].dest_stream_id], + ) + assert len(output_streams) == 1 + output_stream = output_streams[0] + assert output_stream.id == "4" + return await output_stream.data.all() + finally: + await sr.close_async() + reporting_profiler = DefaultReportingProfiler(reporting_registry=sr.get_reporting_registry()) + reporting_profiler.on_request_end() + + caplog.set_level(logging.INFO, logger=PROFILING_LOG_NAME) + + # Validate the first run + caplog.clear() + input_data_l10 = [[i, f"str_{i}"] for i in range(10)] + operations_c10 = get_operations(coeff=10) + data_key_l10 = LocalKeyRepresentation(key_parts=(DataKeyPart(part_type="part", part_content="value_l10"),)) + output_data = await get_data_from_processor( + input_data=input_data_l10, data_key=data_key_l10, operations=operations_c10 + ) + expected_data_l10_c10 = get_expected_data(length=10, coeff=10) + assert output_data == expected_data_l10_c10 + # Check cache flags in reporting + req_profiling_log_rec = get_dump_request_profile_records(caplog, single=True) + assert req_profiling_log_rec.cache_used is True + assert req_profiling_log_rec.cache_full_hit is False + + # Now use updated input data, but with the old cache key and same operations + # -> same old data from cache + caplog.clear() + input_data_l5 = [[i, f"str_{i}"] for i in range(5)] # up to 5 instead of 10 + output_data = await get_data_from_processor( + input_data=input_data_l5, data_key=data_key_l10, operations=operations_c10 + ) + assert output_data == expected_data_l10_c10 + # Check cache flags in reporting + req_profiling_log_rec = get_dump_request_profile_records(caplog, single=True) + assert req_profiling_log_rec.cache_used is True + assert req_profiling_log_rec.cache_full_hit is True + + # Now use updated input data with updated operations -> should result in a new full key + # -> fresh data, not from cache + caplog.clear() + data_key_l5 = LocalKeyRepresentation(key_parts=(DataKeyPart(part_type="part", part_content="value_l5"),)) + operations_c5 = get_operations(coeff=5) + output_data = await get_data_from_processor( + input_data=input_data_l5, data_key=data_key_l5, operations=operations_c5 + ) + expected_data_l5_c5 = get_expected_data(length=5, coeff=5) + assert output_data == expected_data_l5_c5 + # Check cache flags in reporting + req_profiling_log_rec = get_dump_request_profile_records(caplog, single=True) + assert req_profiling_log_rec.cache_used is True + assert req_profiling_log_rec.cache_full_hit is False diff --git a/lib/dl_core/dl_core_tests/db/config.py b/lib/dl_core/dl_core_tests/db/config.py index 52f17631a..5748b2607 100644 --- a/lib/dl_core/dl_core_tests/db/config.py +++ b/lib/dl_core/dl_core_tests/db/config.py @@ -11,7 +11,13 @@ host_us_pg=get_test_container_hostport("pg-us", fallback_port=50309).host, port_us_pg_5432=get_test_container_hostport("pg-us", fallback_port=50309).port, us_master_token="AC1ofiek8coB", - core_connector_ep_names=["clickhouse"], + core_connector_ep_names=["clickhouse", "postgresql"], + compeng_url=( + f"postgresql://us:us@" + f'{get_test_container_hostport("pg-us", fallback_port=50309).as_pair()}/us-db-ci_purgeable' + ), + redis_host=get_test_container_hostport("redis-caches").host, + redis_port=get_test_container_hostport("redis-caches", fallback_port=50305).port, ) diff --git a/lib/dl_core/docker-compose.yml b/lib/dl_core/docker-compose.yml index 2bd89fe26..8eb1c15ba 100644 --- a/lib/dl_core/docker-compose.yml +++ b/lib/dl_core/docker-compose.yml @@ -12,6 +12,14 @@ services: context: docker-compose dockerfile: Dockerfile.db-clickhouse + redis-caches: + # image: "bitnami/redis:5.0.8" + image: "bitnami/redis:5.0.8@sha256:3127620da977815556439a9dc347fff89432a79b6bb6e93a16f20ac4a34ce337" + environment: + ALLOW_EMPTY_PASSWORD: "yes" + ports: + - "50305:6379" + # INFRA pg-us: build: diff --git a/lib/dl_core_testing/dl_core_testing/configuration.py b/lib/dl_core_testing/dl_core_testing/configuration.py index 0851f1b7c..a414d5792 100644 --- a/lib/dl_core_testing/dl_core_testing/configuration.py +++ b/lib/dl_core_testing/dl_core_testing/configuration.py @@ -80,6 +80,10 @@ def get_core_library_config(self) -> CoreLibraryConfig: def get_redis_setting_maker(self) -> RedisSettingMaker: raise NotImplementedError + @abc.abstractmethod + def get_compeng_url(self) -> str: + raise NotImplementedError + # These are used only for creation of local environments in tests, not actual external ones DEFAULT_FERNET_KEY = "h1ZpilcYLYRdWp7Nk8X1M1kBPiUi8rdjz9oBfHyUKIk=" @@ -104,6 +108,8 @@ class DefaultCoreTestConfiguration(CoreTestEnvironmentConfigurationBase): redis_db_mutation: int = attr.ib(default=2) redis_db_arq: int = attr.ib(default=11) + compeng_url: str = attr.ib(default="") + def get_us_config(self) -> UnitedStorageConfiguration: return UnitedStorageConfiguration( us_master_token=self.us_master_token, @@ -129,3 +135,6 @@ def get_core_library_config(self) -> CoreLibraryConfig: return CoreLibraryConfig( core_connector_ep_names=self.core_connector_ep_names, ) + + def get_compeng_url(self) -> str: + return self.compeng_url diff --git a/lib/dl_core_testing/dl_core_testing/testcases/service_base.py b/lib/dl_core_testing/dl_core_testing/testcases/service_base.py index 229c2b5ba..08ed378d1 100644 --- a/lib/dl_core_testing/dl_core_testing/testcases/service_base.py +++ b/lib/dl_core_testing/dl_core_testing/testcases/service_base.py @@ -1,21 +1,32 @@ from __future__ import annotations import abc +import contextlib from typing import ( Any, + AsyncGenerator, + Callable, ClassVar, NamedTuple, Optional, ) import pytest +from redis.asyncio import Redis from dl_api_commons.base_models import RequestContextInfo from dl_api_commons.reporting.registry import DefaultReportingRegistry +from dl_compeng_pg.compeng_pg_base.data_processor_service_pg import CompEngPgConfig from dl_configs.connectors_settings import ConnectorSettingsBase from dl_configs.crypto_keys import CryptoKeysConfig from dl_configs.rqe import RQEConfig -from dl_constants.enums import ConnectionType +from dl_configs.settings_submodels import RedisSettings +from dl_constants.enums import ( + ConnectionType, + ProcessorType, +) +from dl_core.aio.web_app_services.data_processing.data_processor import DataProcessorService +from dl_core.aio.web_app_services.data_processing.factory import make_compeng_service from dl_core.connections_security.base import InsecureConnectionSecurityManager from dl_core.services_registry.conn_executor_factory import DefaultConnExecutorFactory from dl_core.services_registry.inst_specific_sr import InstallationSpecificServiceRegistryFactory @@ -50,6 +61,8 @@ class ServiceFixtureTextClass(metaclass=abc.ABCMeta): core_test_config: ClassVar[CoreTestEnvironmentConfigurationBase] connection_settings: ClassVar[Optional[ConnectorSettingsBase]] = None inst_specific_sr_factory: ClassVar[Optional[InstallationSpecificServiceRegistryFactory]] = None + caches_enabled: ClassVar[bool] = False + compeng_enabled: ClassVar[bool] = False @pytest.fixture(scope="session") def conn_us_config(self) -> USConfig: @@ -69,10 +82,66 @@ def conn_bi_context(self) -> RequestContextInfo: def conn_exec_factory_async_env(self) -> bool: return False + @contextlib.asynccontextmanager + async def _make_redis(self, redis_settings: RedisSettings) -> AsyncGenerator[Redis, None]: + redis_client = Redis( + host=redis_settings.HOSTS[0], + port=redis_settings.PORT, + db=redis_settings.DB, + password=redis_settings.PASSWORD, + ) + try: + yield redis_client + finally: + await redis_client.close() + await redis_client.connection_pool.disconnect() + + @pytest.fixture(scope="function") + async def caches_redis_client_factory(self) -> Optional[Callable[[bool], Redis]]: + if not self.caches_enabled: + yield None + + else: + redis_settings = self.core_test_config.get_redis_setting_maker().get_redis_settings_cache() + async with self._make_redis(redis_settings=redis_settings) as redis_client: + try: + yield lambda *args: redis_client + finally: + await redis_client.close() + await redis_client.connection_pool.disconnect() + + @pytest.fixture(scope="function") + async def data_processor_service_factory( + self, + ) -> AsyncGenerator[Optional[Callable[[ProcessorType], DataProcessorService]], None]: + """ + PG CompEng pool fixture. + + Can only work properly in `db` tests, but should probably be okay to + instantiate without it. + """ + + if not self.compeng_enabled: + yield None + + else: + compeng_type = ProcessorType.ASYNCPG + processor = make_compeng_service( + processor_type=compeng_type, + config=CompEngPgConfig(url=self.core_test_config.get_compeng_url()), + ) + await processor.initialize() + try: + yield (lambda *args: processor) + finally: + await processor.finalize() + def service_registry_factory( self, conn_exec_factory_async_env: bool, conn_bi_context: RequestContextInfo, + caches_redis_client_factory: Optional[Callable[[bool], Optional[Redis]]] = None, + data_processor_service_factory: Optional[Callable[[bool], DataProcessorService]] = None, **kwargs: Any, ) -> ServicesRegistry: sr_future_ref: FutureRef[ServicesRegistry] = FutureRef() @@ -96,6 +165,8 @@ def service_registry_factory( if self.inst_specific_sr_factory is not None else None ), + caches_redis_client_factory=caches_redis_client_factory, + data_processor_service_factory=data_processor_service_factory, **kwargs, ) sr_future_ref.fulfill(service_registry) @@ -109,12 +180,14 @@ def conn_sync_service_registry( return self.service_registry_factory( conn_exec_factory_async_env=False, conn_bi_context=conn_bi_context, + caches_redis_client_factory=None, ) @pytest.fixture(scope="session") def conn_async_service_registry( self, conn_bi_context: RequestContextInfo, + # caches_redis_client_factory: Optional[Callable[[bool], Redis]], # FIXME: switch to function scope to use ) -> ServicesRegistry: return self.service_registry_factory( conn_exec_factory_async_env=True,