From 33d14c8deced4020d260cdf24a93a9508abc15ab Mon Sep 17 00:00:00 2001 From: Grigory Statsenko Date: Thu, 7 Dec 2023 16:57:51 +0100 Subject: [PATCH] Added a compeng cache test to dl_core and an api cache test to dl_api_lib (#151) --- lib/dl_api_lib/dl_api_lib_tests/db/base.py | 2 +- lib/dl_api_lib/dl_api_lib_tests/db/config.py | 10 +- .../db/data_api/caches/test_caches.py | 44 ++++ lib/dl_api_lib/docker-compose.yml | 8 + .../dl_api_lib_testing/base.py | 6 +- .../dl_api_lib_testing/configuration.py | 2 - .../dl_api_lib_testing/data_api_base.py | 12 +- .../ext/api/base.py | 4 +- .../ext/api/base.py | 4 +- .../ext/config.py | 5 +- .../db/base/api/base.py | 2 +- .../db/base/api/data.py | 2 +- .../db/api/base.py | 4 +- .../db/api/base.py | 4 +- .../ext/api/base.py | 6 +- .../dl_connector_mssql_tests/db/api/base.py | 4 +- .../dl_connector_mysql_tests/db/api/base.py | 4 +- .../dl_connector_oracle_tests/db/api/base.py | 4 +- .../db/api/base.py | 4 +- .../dl_connector_promql_tests/db/api/base.py | 2 +- .../ext/api/base.py | 2 +- .../dl_connector_ydb_tests/db/api/base.py | 4 +- .../dl_core_tests/db/compeng/__init__.py | 0 .../db/compeng/test_compeng_cache.py | 229 ++++++++++++++++++ lib/dl_core/dl_core_tests/db/config.py | 8 +- lib/dl_core/docker-compose.yml | 8 + .../dl_core_testing/configuration.py | 9 + .../dl_core_testing/testcases/service_base.py | 75 +++++- lib/dl_testing/dl_testing/utils.py | 16 ++ 29 files changed, 439 insertions(+), 45 deletions(-) create mode 100644 lib/dl_api_lib/dl_api_lib_tests/db/data_api/caches/test_caches.py create mode 100644 lib/dl_core/dl_core_tests/db/compeng/__init__.py create mode 100644 lib/dl_core/dl_core_tests/db/compeng/test_compeng_cache.py diff --git a/lib/dl_api_lib/dl_api_lib_tests/db/base.py b/lib/dl_api_lib/dl_api_lib_tests/db/base.py index 1233b7e04..ec59880bb 100644 --- a/lib/dl_api_lib/dl_api_lib_tests/db/base.py +++ b/lib/dl_api_lib/dl_api_lib_tests/db/base.py @@ -24,7 +24,7 @@ class DefaultApiTestBase(DataApiTestBase, DatasetTestBase, ConnectionTestBase): """The knowledge that this is a ClickHouse connector should not go beyond this class""" - bi_compeng_pg_on = True + compeng_enabled = True conn_type = CONNECTION_TYPE_CLICKHOUSE raw_sql_level: ClassVar[RawSQLLevel] = RawSQLLevel.off diff --git a/lib/dl_api_lib/dl_api_lib_tests/db/config.py b/lib/dl_api_lib/dl_api_lib_tests/db/config.py index c77c1f7b0..c8983927a 100644 --- a/lib/dl_api_lib/dl_api_lib_tests/db/config.py +++ b/lib/dl_api_lib/dl_api_lib_tests/db/config.py @@ -14,6 +14,12 @@ port_us_pg_5432=get_test_container_hostport("db-postgres", fallback_port=52509).port, us_master_token="AC1ofiek8coB", core_connector_ep_names=["clickhouse", "postgresql"], + compeng_url=( + f"postgresql://us:us@" + f'{get_test_container_hostport("db-postgres", fallback_port=52509).as_pair()}/us-db-ci_purgeable' + ), + redis_host=get_test_container_hostport("redis-caches").host, + redis_port=get_test_container_hostport("redis-caches", fallback_port=52505).port, ) @@ -35,8 +41,4 @@ class CoreConnectionSettings: api_connector_ep_names=["clickhouse", "postgresql"], core_test_config=CORE_TEST_CONFIG, ext_query_executer_secret_key="_some_test_secret_key_", - bi_compeng_pg_url=( - f"postgresql://us:us@" - f'{get_test_container_hostport("db-postgres", fallback_port=52509).as_pair()}/us-db-ci_purgeable' - ), ) diff --git a/lib/dl_api_lib/dl_api_lib_tests/db/data_api/caches/test_caches.py b/lib/dl_api_lib/dl_api_lib_tests/db/data_api/caches/test_caches.py new file mode 100644 index 000000000..721cdb134 --- /dev/null +++ b/lib/dl_api_lib/dl_api_lib_tests/db/data_api/caches/test_caches.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from dl_api_client.dsmaker.primitives import Dataset +from dl_api_client.dsmaker.shortcuts.result_data import get_data_rows +from dl_api_lib_testing.helpers.data_source import data_source_settings_from_table +from dl_api_lib_tests.db.base import DefaultApiTestBase +from dl_core_testing.database import make_table + + +class TestDataCaches(DefaultApiTestBase): + data_caches_enabled = True + + def test_cache_by_deleting_table(self, db, control_api, data_api, saved_connection_id): + db_table = make_table(db) + ds = Dataset() + ds.sources["source_1"] = ds.source( + connection_id=saved_connection_id, **data_source_settings_from_table(db_table) + ) + ds.source_avatars["avatar_1"] = ds.sources["source_1"].avatar() + ds.result_schema["measure"] = ds.field(formula="SUM([int_value])") + ds = control_api.apply_updates(dataset=ds).dataset + ds = control_api.save_dataset(ds).dataset + + def get_data(): + result_resp = data_api.get_result( + dataset=ds, + fields=[ + ds.find_field(title="int_value"), + ds.find_field(title="measure"), + ], + ) + assert result_resp.status_code == 200, result_resp.response_errors + data = get_data_rows(response=result_resp) + return data + + data_rows = get_data() + + # Now delete the table. + # This will make real DB queries impossible, + # however the cache should still return the same data + db_table.db.drop_table(db_table.table) + + data_rows_after_deletion = get_data() + assert data_rows_after_deletion == data_rows diff --git a/lib/dl_api_lib/docker-compose.yml b/lib/dl_api_lib/docker-compose.yml index ef2d3427f..42119d82f 100644 --- a/lib/dl_api_lib/docker-compose.yml +++ b/lib/dl_api_lib/docker-compose.yml @@ -4,6 +4,14 @@ x-constants: US_MASTER_TOKEN: &c-us-master-token "AC1ofiek8coB" services: + redis-caches: + # image: "bitnami/redis:5.0.8" + image: "bitnami/redis:5.0.8@sha256:3127620da977815556439a9dc347fff89432a79b6bb6e93a16f20ac4a34ce337" + environment: + ALLOW_EMPTY_PASSWORD: "yes" + ports: + - "52505:6379" + db-clickhouse: ports: - "52510:8123" diff --git a/lib/dl_api_lib_testing/dl_api_lib_testing/base.py b/lib/dl_api_lib_testing/dl_api_lib_testing/base.py index 068b2db29..5d474e471 100644 --- a/lib/dl_api_lib_testing/dl_api_lib_testing/base.py +++ b/lib/dl_api_lib_testing/dl_api_lib_testing/base.py @@ -52,7 +52,7 @@ class ApiTestBase(abc.ABC): Base class defining the basic fixtures of bi-api tests """ - bi_compeng_pg_on: ClassVar[bool] = True + compeng_enabled: ClassVar[bool] = True query_processing_mode: ClassVar[QueryProcessingMode] = QueryProcessingMode.basic @pytest.fixture(scope="function", autouse=True) @@ -115,8 +115,8 @@ def create_control_api_settings( BI_API_CONNECTOR_WHITELIST=bi_test_config.get_api_library_config().api_connector_ep_names, CORE_CONNECTOR_WHITELIST=core_test_config.get_core_library_config().core_connector_ep_names, RQE_CONFIG=rqe_config_subprocess, - BI_COMPENG_PG_ON=cls.bi_compeng_pg_on, - BI_COMPENG_PG_URL=bi_test_config.bi_compeng_pg_url, + BI_COMPENG_PG_ON=cls.compeng_enabled, + BI_COMPENG_PG_URL=core_test_config.get_compeng_url(), DO_DSRC_IDX_FETCH=True, FIELD_ID_GENERATOR_TYPE=FieldIdGeneratorType.suffix, REDIS_ARQ=redis_setting_maker.get_redis_settings_arq(), diff --git a/lib/dl_api_lib_testing/dl_api_lib_testing/configuration.py b/lib/dl_api_lib_testing/dl_api_lib_testing/configuration.py index d61dee412..f49c09f7b 100644 --- a/lib/dl_api_lib_testing/dl_api_lib_testing/configuration.py +++ b/lib/dl_api_lib_testing/dl_api_lib_testing/configuration.py @@ -20,8 +20,6 @@ class ApiTestEnvironmentConfiguration: mutation_cache_enabled: bool = attr.ib(default=True) - bi_compeng_pg_url: str = attr.ib(default="") - file_uploader_api_host: str = attr.ib(default="http://127.0.0.1") file_uploader_api_port: int = attr.ib(default=9999) diff --git a/lib/dl_api_lib_testing/dl_api_lib_testing/data_api_base.py b/lib/dl_api_lib_testing/dl_api_lib_testing/data_api_base.py index b10309931..ca6128c40 100644 --- a/lib/dl_api_lib_testing/dl_api_lib_testing/data_api_base.py +++ b/lib/dl_api_lib_testing/dl_api_lib_testing/data_api_base.py @@ -45,8 +45,8 @@ class DataApiTestParams(NamedTuple): class DataApiTestBase(ApiTestBase, metaclass=abc.ABCMeta): - mutation_caches_on: ClassVar[bool] = True - data_caches_on: ClassVar[bool] = True + mutation_caches_enabled: ClassVar[bool] = True + data_caches_enabled: ClassVar[bool] = True @pytest.fixture def loop(self, event_loop: asyncio.AbstractEventLoop) -> Generator[asyncio.AbstractEventLoop, None, None]: @@ -72,15 +72,15 @@ def create_data_api_settings( US_MASTER_TOKEN=us_config.us_master_token, CRYPTO_KEYS_CONFIG=core_test_config.get_crypto_keys_config(), # TODO FIX: Configure caches - CACHES_ON=cls.data_caches_on, + CACHES_ON=cls.data_caches_enabled, SAMPLES_CH_HOSTS=(), RQE_CONFIG=rqe_config_subprocess, BI_API_CONNECTOR_WHITELIST=bi_test_config.get_api_library_config().api_connector_ep_names, CORE_CONNECTOR_WHITELIST=core_test_config.get_core_library_config().core_connector_ep_names, - MUTATIONS_CACHES_ON=cls.mutation_caches_on, + MUTATIONS_CACHES_ON=cls.mutation_caches_enabled, CACHES_REDIS=redis_setting_maker.get_redis_settings_cache(), - BI_COMPENG_PG_ON=cls.bi_compeng_pg_on, - BI_COMPENG_PG_URL=bi_test_config.bi_compeng_pg_url, + BI_COMPENG_PG_ON=cls.compeng_enabled, + BI_COMPENG_PG_URL=core_test_config.get_compeng_url(), FIELD_ID_GENERATOR_TYPE=FieldIdGeneratorType.suffix, FILE_UPLOADER_BASE_URL=f"{bi_test_config.file_uploader_api_host}:{bi_test_config.file_uploader_api_port}", FILE_UPLOADER_MASTER_TOKEN="qwerty", diff --git a/lib/dl_connector_bigquery/dl_connector_bigquery_tests/ext/api/base.py b/lib/dl_connector_bigquery/dl_connector_bigquery_tests/ext/api/base.py index 485180b7a..7a25d8d1e 100644 --- a/lib/dl_connector_bigquery/dl_connector_bigquery_tests/ext/api/base.py +++ b/lib/dl_connector_bigquery/dl_connector_bigquery_tests/ext/api/base.py @@ -15,7 +15,7 @@ class BigQueryConnectionTestBase(BaseBigQueryTestClass, ConnectionTestBase): conn_type = CONNECTION_TYPE_BIGQUERY - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def bi_test_config(self) -> ApiTestEnvironmentConfiguration: @@ -42,4 +42,4 @@ def dataset_params(self, sample_table) -> dict: class BigQueryDataApiTestBase(BigQueryDatasetTestBase, StandardizedDataApiTestBase): - mutation_caches_on = False + mutation_caches_enabled = False diff --git a/lib/dl_connector_bitrix_gds/dl_connector_bitrix_gds_tests/ext/api/base.py b/lib/dl_connector_bitrix_gds/dl_connector_bitrix_gds_tests/ext/api/base.py index 399f84924..0bc46df8e 100644 --- a/lib/dl_connector_bitrix_gds/dl_connector_bitrix_gds_tests/ext/api/base.py +++ b/lib/dl_connector_bitrix_gds/dl_connector_bitrix_gds_tests/ext/api/base.py @@ -95,7 +95,7 @@ def dataset_params(self) -> dict: class BitrixDataApiTestBase(BitrixDatasetTestBase, StandardizedDataApiTestBase): - mutation_caches_on = False + mutation_caches_enabled = False @pytest.fixture(scope="class") def data_api_test_params(self) -> DataApiTestParams: @@ -109,4 +109,4 @@ def data_api_test_params(self) -> DataApiTestParams: class BitrixSmartTablesDataApiTestBase(BitrixSmartTablesDatasetTestBase, StandardizedDataApiTestBase): - mutation_caches_on = False + mutation_caches_enabled = False diff --git a/lib/dl_connector_bitrix_gds/dl_connector_bitrix_gds_tests/ext/config.py b/lib/dl_connector_bitrix_gds/dl_connector_bitrix_gds_tests/ext/config.py index 8525583f2..ee58458e0 100644 --- a/lib/dl_connector_bitrix_gds/dl_connector_bitrix_gds_tests/ext/config.py +++ b/lib/dl_connector_bitrix_gds/dl_connector_bitrix_gds_tests/ext/config.py @@ -3,6 +3,8 @@ from dl_testing.containers import get_test_container_hostport +COMPENG_URL = f'postgresql://datalens:qwerty@{get_test_container_hostport("db-postgres-13", fallback_port=52301).as_pair()}/test_data' + # Infra settings CORE_TEST_CONFIG = DefaultCoreTestConfiguration( host_us_http=get_test_container_hostport("us", fallback_port=51911).host, @@ -11,14 +13,13 @@ port_us_pg_5432=get_test_container_hostport("pg-us", fallback_port=51910).port, us_master_token="AC1ofiek8coB", core_connector_ep_names=["bitrix_gds", "postgresql"], + compeng_url=COMPENG_URL, ) -COMPENG_URL = f'postgresql://datalens:qwerty@{get_test_container_hostport("db-postgres-13", fallback_port=52301).as_pair()}/test_data' API_TEST_CONFIG = ApiTestEnvironmentConfiguration( api_connector_ep_names=["bitrix_gds", "postgresql"], core_test_config=CORE_TEST_CONFIG, ext_query_executer_secret_key="_some_test_secret_key_", - bi_compeng_pg_url=COMPENG_URL, ) BITRIX_PORTALS = dict( diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/api/base.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/api/base.py index 9b43bc39f..3af3579b4 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/api/base.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/api/base.py @@ -66,7 +66,7 @@ async def get_internal_params(self, src: FileSourceDesc) -> SourceInternalParams class CHS3ConnectionApiTestBase(BaseCHS3TestClass[FILE_CONN_TV], ConnectionTestBase, metaclass=abc.ABCMeta): - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def bi_test_config(self) -> ApiTestEnvironmentConfiguration: diff --git a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/api/data.py b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/api/data.py index a7fd3ac24..c26fc11c2 100644 --- a/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/api/data.py +++ b/lib/dl_connector_bundle_chs3/dl_connector_bundle_chs3_tests/db/base/api/data.py @@ -24,7 +24,7 @@ class CHS3DataApiTestBase(CHS3DatasetTestBase[FILE_CONN_TV], StandardizedDataApiTestBase, metaclass=abc.ABCMeta): - mutation_caches_on = False + mutation_caches_enabled = False class CHS3DataResultTestSuite( diff --git a/lib/dl_connector_clickhouse/dl_connector_clickhouse_tests/db/api/base.py b/lib/dl_connector_clickhouse/dl_connector_clickhouse_tests/db/api/base.py index db7cda468..f73ed12c9 100644 --- a/lib/dl_connector_clickhouse/dl_connector_clickhouse_tests/db/api/base.py +++ b/lib/dl_connector_clickhouse/dl_connector_clickhouse_tests/db/api/base.py @@ -17,7 +17,7 @@ class ClickHouseConnectionTestBase(BaseClickHouseTestClass, ConnectionTestBase): conn_type = CONNECTION_TYPE_CLICKHOUSE - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def bi_test_config(self) -> ApiTestEnvironmentConfiguration: @@ -52,4 +52,4 @@ def dataset_params(self, sample_table) -> dict: class ClickHouseDataApiTestBase(ClickHouseDatasetTestBase, StandardizedDataApiTestBase): - mutation_caches_on = False + mutation_caches_enabled = False diff --git a/lib/dl_connector_greenplum/dl_connector_greenplum_tests/db/api/base.py b/lib/dl_connector_greenplum/dl_connector_greenplum_tests/db/api/base.py index c6b314831..782c00c9e 100644 --- a/lib/dl_connector_greenplum/dl_connector_greenplum_tests/db/api/base.py +++ b/lib/dl_connector_greenplum/dl_connector_greenplum_tests/db/api/base.py @@ -23,7 +23,7 @@ class GreenplumConnectionTestBase(ConnectionTestBase, ServiceFixtureTextClass): conn_type = CONNECTION_TYPE_GREENPLUM core_test_config = CORE_TEST_CONFIG - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def db_url(self) -> str: @@ -57,4 +57,4 @@ def dataset_params(self, sample_table: DbTable) -> dict: class GreenplumDataApiTestBase(GreenplumDatasetTestBase, StandardizedDataApiTestBase): - mutation_caches_on = False + mutation_caches_enabled = False diff --git a/lib/dl_connector_metrica/dl_connector_metrica_tests/ext/api/base.py b/lib/dl_connector_metrica/dl_connector_metrica_tests/ext/api/base.py index f6554747f..7eaa26cb3 100644 --- a/lib/dl_connector_metrica/dl_connector_metrica_tests/ext/api/base.py +++ b/lib/dl_connector_metrica/dl_connector_metrica_tests/ext/api/base.py @@ -29,7 +29,7 @@ class MetricaConnectionTestBase(BaseMetricaTestClass, ConnectionTestBase): conn_type = CONNECTION_TYPE_METRICA_API - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def bi_test_config(self) -> ApiTestEnvironmentConfiguration: @@ -56,7 +56,7 @@ def dataset_params(self) -> dict: class MetricaDataApiTestBase(MetricaDatasetTestBase, StandardizedDataApiTestBase): - mutation_caches_on = False + mutation_caches_enabled = False @pytest.fixture(scope="class") def data_api_test_params(self) -> DataApiTestParams: @@ -71,7 +71,7 @@ def data_api_test_params(self) -> DataApiTestParams: class AppMetricaConnectionTestBase(BaseAppMetricaTestClass, ConnectionTestBase): conn_type = CONNECTION_TYPE_APPMETRICA_API - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def bi_test_config(self) -> ApiTestEnvironmentConfiguration: diff --git a/lib/dl_connector_mssql/dl_connector_mssql_tests/db/api/base.py b/lib/dl_connector_mssql/dl_connector_mssql_tests/db/api/base.py index e4c76262a..6dce3ff6b 100644 --- a/lib/dl_connector_mssql/dl_connector_mssql_tests/db/api/base.py +++ b/lib/dl_connector_mssql/dl_connector_mssql_tests/db/api/base.py @@ -19,7 +19,7 @@ class MSSQLConnectionTestBase(BaseMSSQLTestClass, ConnectionTestBase): conn_type = CONNECTION_TYPE_MSSQL - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def bi_test_config(self) -> ApiTestEnvironmentConfiguration: @@ -55,4 +55,4 @@ def dataset_params(self, sample_table) -> dict: class MSSQLDataApiTestBase(MSSQLDatasetTestBase, StandardizedDataApiTestBase): - mutation_caches_on = False + mutation_caches_enabled = False diff --git a/lib/dl_connector_mysql/dl_connector_mysql_tests/db/api/base.py b/lib/dl_connector_mysql/dl_connector_mysql_tests/db/api/base.py index 608b67608..3ebd5ff53 100644 --- a/lib/dl_connector_mysql/dl_connector_mysql_tests/db/api/base.py +++ b/lib/dl_connector_mysql/dl_connector_mysql_tests/db/api/base.py @@ -19,7 +19,7 @@ class MySQLConnectionTestBase(BaseMySQLTestClass, ConnectionTestBase): conn_type = CONNECTION_TYPE_MYSQL - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def bi_test_config(self) -> ApiTestEnvironmentConfiguration: @@ -54,4 +54,4 @@ def dataset_params(self, sample_table) -> dict: class MySQLDataApiTestBase(MySQLDatasetTestBase, StandardizedDataApiTestBase): - mutation_caches_on = False + mutation_caches_enabled = False diff --git a/lib/dl_connector_oracle/dl_connector_oracle_tests/db/api/base.py b/lib/dl_connector_oracle/dl_connector_oracle_tests/db/api/base.py index debfe442e..7e3a180d4 100644 --- a/lib/dl_connector_oracle/dl_connector_oracle_tests/db/api/base.py +++ b/lib/dl_connector_oracle/dl_connector_oracle_tests/db/api/base.py @@ -20,7 +20,7 @@ class OracleConnectionTestBase(BaseOracleTestClass, ConnectionTestBase): conn_type = CONNECTION_TYPE_ORACLE - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def bi_test_config(self) -> ApiTestEnvironmentConfiguration: @@ -57,4 +57,4 @@ def dataset_params(self, sample_table) -> dict: class OracleDataApiTestBase(OracleDatasetTestBase, StandardizedDataApiTestBase): - mutation_caches_on = False + mutation_caches_enabled = False diff --git a/lib/dl_connector_postgresql/dl_connector_postgresql_tests/db/api/base.py b/lib/dl_connector_postgresql/dl_connector_postgresql_tests/db/api/base.py index 77c5de051..c6e33104d 100644 --- a/lib/dl_connector_postgresql/dl_connector_postgresql_tests/db/api/base.py +++ b/lib/dl_connector_postgresql/dl_connector_postgresql_tests/db/api/base.py @@ -19,7 +19,7 @@ class PostgreSQLConnectionTestBase(BasePostgreSQLTestClass, ConnectionTestBase): conn_type = CONNECTION_TYPE_POSTGRES - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def bi_test_config(self) -> ApiTestEnvironmentConfiguration: @@ -54,4 +54,4 @@ def dataset_params(self, sample_table) -> dict: class PostgreSQLDataApiTestBase(PostgreSQLDatasetTestBase, StandardizedDataApiTestBase): - mutation_caches_on = False + mutation_caches_enabled = False diff --git a/lib/dl_connector_promql/dl_connector_promql_tests/db/api/base.py b/lib/dl_connector_promql/dl_connector_promql_tests/db/api/base.py index 5fd335729..733fb07ac 100644 --- a/lib/dl_connector_promql/dl_connector_promql_tests/db/api/base.py +++ b/lib/dl_connector_promql/dl_connector_promql_tests/db/api/base.py @@ -20,7 +20,7 @@ class PromQLConnectionTestBase(ConnectionTestBase): - bi_compeng_pg_on = False + compeng_enabled = False conn_type = CONNECTION_TYPE_PROMQL @pytest.fixture(scope="class") diff --git a/lib/dl_connector_snowflake/dl_connector_snowflake_tests/ext/api/base.py b/lib/dl_connector_snowflake/dl_connector_snowflake_tests/ext/api/base.py index 1cbe363fd..166d9dbc8 100644 --- a/lib/dl_connector_snowflake/dl_connector_snowflake_tests/ext/api/base.py +++ b/lib/dl_connector_snowflake/dl_connector_snowflake_tests/ext/api/base.py @@ -63,7 +63,7 @@ def dataset_params(self, sf_secrets) -> dict: class SnowFlakeDataApiTestBase(SnowFlakeDatasetTestBase, StandardizedDataApiTestBase): - bi_compeng_pg_on = False + compeng_enabled = False @pytest.fixture(scope="class") def data_api_test_params(self) -> DataApiTestParams: diff --git a/lib/dl_connector_ydb/dl_connector_ydb_tests/db/api/base.py b/lib/dl_connector_ydb/dl_connector_ydb_tests/db/api/base.py index 0a5debfbc..dbb51cc9a 100644 --- a/lib/dl_connector_ydb/dl_connector_ydb_tests/db/api/base.py +++ b/lib/dl_connector_ydb/dl_connector_ydb_tests/db/api/base.py @@ -30,7 +30,7 @@ class YDBConnectionTestBase(ConnectionTestBase): - bi_compeng_pg_on = False + compeng_enabled = False conn_type = CONNECTION_TYPE_YDB @pytest.fixture(scope="class") @@ -77,7 +77,7 @@ def dataset_params(self, sample_table: DbTable) -> dict: class YDBDataApiTestBase(YDBDatasetTestBase, StandardizedDataApiTestBase): - mutation_caches_on = False + mutation_caches_enabled = False @pytest.fixture(scope="class") def data_api_test_params(self) -> DataApiTestParams: diff --git a/lib/dl_core/dl_core_tests/db/compeng/__init__.py b/lib/dl_core/dl_core_tests/db/compeng/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/dl_core/dl_core_tests/db/compeng/test_compeng_cache.py b/lib/dl_core/dl_core_tests/db/compeng/test_compeng_cache.py new file mode 100644 index 000000000..feb5dd9ba --- /dev/null +++ b/lib/dl_core/dl_core_tests/db/compeng/test_compeng_cache.py @@ -0,0 +1,229 @@ +from __future__ import annotations + +import logging +import time +from typing import List +import uuid + +import pytest +import sqlalchemy as sa + +from dl_api_commons.reporting.models import QueryExecutionStartReportingRecord +from dl_api_commons.reporting.profiler import ( + PROFILING_LOG_NAME, + DefaultReportingProfiler, +) +from dl_constants.enums import ( + DataSourceRole, + ProcessorType, + ReportingQueryType, + UserDataType, +) +from dl_core.base_models import WorkbookEntryLocation +from dl_core.data_processing.cache.primitives import ( + DataKeyPart, + LocalKeyRepresentation, +) +from dl_core.data_processing.processing.operation import ( + BaseOp, + CalcOp, + DownloadOp, + UploadOp, +) +from dl_core.data_processing.stream_base import ( + DataRequestMetaInfo, + DataStreamAsync, +) +from dl_core.data_processing.streaming import AsyncChunked +from dl_core.query.bi_query import BIQuery +from dl_core.query.expression import ExpressionCtx +from dl_core.services_registry import ServicesRegistry +from dl_core_testing.dataset_wrappers import DatasetTestWrapper +from dl_core_tests.db.base import DefaultCoreTestClass +from dl_testing.utils import get_dump_request_profile_records + +from dl_connector_clickhouse.core.clickhouse_base.constants import CONNECTION_TYPE_CLICKHOUSE +from dl_connector_clickhouse.core.clickhouse_base.dto import ClickHouseConnDTO + + +class TestCompengCache(DefaultCoreTestClass): + data_caches_enabled = True + compeng_enabled = True + + @pytest.mark.asyncio + async def test_compeng_cache( + self, + saved_dataset, + conn_bi_context, + caplog, + sync_us_manager, + caches_redis_client_factory, + data_processor_service_factory, + ): + dataset = saved_dataset + us_manager = sync_us_manager + ds_wrapper = DatasetTestWrapper(dataset=dataset, us_manager=us_manager) + + names = ["int_value", "str_value"] + user_types = [UserDataType.integer, UserDataType.string] + + def get_operations(coeff: int) -> List[BaseOp]: + """ + Instructions for compeng. + ``int_value`` is multiplied by ``coeff`` + """ + return [ + UploadOp( + result_id="1", + source_stream_id="1", + dest_stream_id="2", + alias=str(uuid.uuid4()), + ), + CalcOp( + result_id="2", + source_stream_id="2", + dest_stream_id="3", + alias=str(uuid.uuid4()), + bi_query=BIQuery( + select_expressions=[ + ExpressionCtx( + expression=sa.literal_column(names[0]) * sa.literal(coeff), + alias="value_1", + user_type=UserDataType.integer, + ), + ExpressionCtx( + expression=sa.literal_column(names[1]), + alias="value_2", + user_type=UserDataType.string, + ), + ], + ), + data_key_data="__whatever", # just a random hashable + ), + DownloadOp( + source_stream_id="3", + dest_stream_id="4", + ), + ] + + def get_expected_data(length: int, coeff: int): + """ + What we expect to be returned from compeng. + ``length`` entries are generated + ``int_value`` is multiplied by ``coeff`` + """ + return [[i * coeff, f"str_{i}"] for i in range(length)] + + async def get_data_from_processor(input_data, data_key, operations): + query_id = str(uuid.uuid4()) + sr: ServicesRegistry = self.service_registry_factory( + conn_exec_factory_async_env=True, + conn_bi_context=conn_bi_context, + caches_redis_client_factory=caches_redis_client_factory, + data_processor_service_factory=data_processor_service_factory, + ) + dto = ClickHouseConnDTO( + conn_id="123", + host="localhost", + cluster_name="cluster_name", + db_name="", + endpoint="", + multihosts=[""], + password="", + port=0, + protocol="", + username="", + ) + reporting = sr.get_reporting_registry() + workbook_id = ( + dataset.entry_key.workbook_id if isinstance(dataset.entry_key, WorkbookEntryLocation) else None + ) + reporting.save_reporting_record( + QueryExecutionStartReportingRecord( + timestamp=time.time(), + query_id=query_id, + query_type=ReportingQueryType.internal, + query="1", # doesn't matter... + connection_type=CONNECTION_TYPE_CLICKHOUSE, + dataset_id=dataset.uuid, + conn_reporting_data=dto.conn_reporting_data(), + workbook_id=workbook_id, + ) + ) + input_stream = DataStreamAsync( + id="1", + data=AsyncChunked.from_chunked_iterable([input_data]), + names=names, + user_types=user_types, + meta=DataRequestMetaInfo( + data_source_list=ds_wrapper.get_data_source_list(role=DataSourceRole.origin), + query_id=query_id, + ), + data_key=data_key, + ) + processor_factory = sr.get_data_processor_factory() + processor = await processor_factory.get_data_processor( + dataset=dataset, + us_entry_buffer=us_manager.get_entry_buffer(), + allow_cache_usage=True, + processor_type=ProcessorType.ASYNCPG, + ) + try: + output_streams = await processor.run( + streams=[input_stream], + operations=operations, + output_stream_ids=[operations[-1].dest_stream_id], + ) + assert len(output_streams) == 1 + output_stream = output_streams[0] + assert output_stream.id == "4" + return await output_stream.data.all() + finally: + await sr.close_async() + reporting_profiler = DefaultReportingProfiler(reporting_registry=sr.get_reporting_registry()) + reporting_profiler.on_request_end() + + caplog.set_level(logging.INFO, logger=PROFILING_LOG_NAME) + + # Validate the first run + caplog.clear() + input_data_l10 = [[i, f"str_{i}"] for i in range(10)] + operations_c10 = get_operations(coeff=10) + data_key_l10 = LocalKeyRepresentation(key_parts=(DataKeyPart(part_type="part", part_content="value_l10"),)) + output_data = await get_data_from_processor( + input_data=input_data_l10, data_key=data_key_l10, operations=operations_c10 + ) + expected_data_l10_c10 = get_expected_data(length=10, coeff=10) + assert output_data == expected_data_l10_c10 + # Check cache flags in reporting + req_profiling_log_rec = get_dump_request_profile_records(caplog, single=True) + assert req_profiling_log_rec.cache_used is True + assert req_profiling_log_rec.cache_full_hit is False + + # Now use updated input data, but with the old cache key and same operations + # -> same old data from cache + caplog.clear() + input_data_l5 = [[i, f"str_{i}"] for i in range(5)] # up to 5 instead of 10 + output_data = await get_data_from_processor( + input_data=input_data_l5, data_key=data_key_l10, operations=operations_c10 + ) + assert output_data == expected_data_l10_c10 + # Check cache flags in reporting + req_profiling_log_rec = get_dump_request_profile_records(caplog, single=True) + assert req_profiling_log_rec.cache_used is True + assert req_profiling_log_rec.cache_full_hit is True + + # Now use updated input data with updated operations -> should result in a new full key + # -> fresh data, not from cache + caplog.clear() + data_key_l5 = LocalKeyRepresentation(key_parts=(DataKeyPart(part_type="part", part_content="value_l5"),)) + operations_c5 = get_operations(coeff=5) + output_data = await get_data_from_processor( + input_data=input_data_l5, data_key=data_key_l5, operations=operations_c5 + ) + expected_data_l5_c5 = get_expected_data(length=5, coeff=5) + assert output_data == expected_data_l5_c5 + # Check cache flags in reporting + req_profiling_log_rec = get_dump_request_profile_records(caplog, single=True) + assert req_profiling_log_rec.cache_used is True + assert req_profiling_log_rec.cache_full_hit is False diff --git a/lib/dl_core/dl_core_tests/db/config.py b/lib/dl_core/dl_core_tests/db/config.py index 52f17631a..5748b2607 100644 --- a/lib/dl_core/dl_core_tests/db/config.py +++ b/lib/dl_core/dl_core_tests/db/config.py @@ -11,7 +11,13 @@ host_us_pg=get_test_container_hostport("pg-us", fallback_port=50309).host, port_us_pg_5432=get_test_container_hostport("pg-us", fallback_port=50309).port, us_master_token="AC1ofiek8coB", - core_connector_ep_names=["clickhouse"], + core_connector_ep_names=["clickhouse", "postgresql"], + compeng_url=( + f"postgresql://us:us@" + f'{get_test_container_hostport("pg-us", fallback_port=50309).as_pair()}/us-db-ci_purgeable' + ), + redis_host=get_test_container_hostport("redis-caches").host, + redis_port=get_test_container_hostport("redis-caches", fallback_port=50305).port, ) diff --git a/lib/dl_core/docker-compose.yml b/lib/dl_core/docker-compose.yml index 2bd89fe26..8eb1c15ba 100644 --- a/lib/dl_core/docker-compose.yml +++ b/lib/dl_core/docker-compose.yml @@ -12,6 +12,14 @@ services: context: docker-compose dockerfile: Dockerfile.db-clickhouse + redis-caches: + # image: "bitnami/redis:5.0.8" + image: "bitnami/redis:5.0.8@sha256:3127620da977815556439a9dc347fff89432a79b6bb6e93a16f20ac4a34ce337" + environment: + ALLOW_EMPTY_PASSWORD: "yes" + ports: + - "50305:6379" + # INFRA pg-us: build: diff --git a/lib/dl_core_testing/dl_core_testing/configuration.py b/lib/dl_core_testing/dl_core_testing/configuration.py index 0851f1b7c..a414d5792 100644 --- a/lib/dl_core_testing/dl_core_testing/configuration.py +++ b/lib/dl_core_testing/dl_core_testing/configuration.py @@ -80,6 +80,10 @@ def get_core_library_config(self) -> CoreLibraryConfig: def get_redis_setting_maker(self) -> RedisSettingMaker: raise NotImplementedError + @abc.abstractmethod + def get_compeng_url(self) -> str: + raise NotImplementedError + # These are used only for creation of local environments in tests, not actual external ones DEFAULT_FERNET_KEY = "h1ZpilcYLYRdWp7Nk8X1M1kBPiUi8rdjz9oBfHyUKIk=" @@ -104,6 +108,8 @@ class DefaultCoreTestConfiguration(CoreTestEnvironmentConfigurationBase): redis_db_mutation: int = attr.ib(default=2) redis_db_arq: int = attr.ib(default=11) + compeng_url: str = attr.ib(default="") + def get_us_config(self) -> UnitedStorageConfiguration: return UnitedStorageConfiguration( us_master_token=self.us_master_token, @@ -129,3 +135,6 @@ def get_core_library_config(self) -> CoreLibraryConfig: return CoreLibraryConfig( core_connector_ep_names=self.core_connector_ep_names, ) + + def get_compeng_url(self) -> str: + return self.compeng_url diff --git a/lib/dl_core_testing/dl_core_testing/testcases/service_base.py b/lib/dl_core_testing/dl_core_testing/testcases/service_base.py index 229c2b5ba..12fd5ac6b 100644 --- a/lib/dl_core_testing/dl_core_testing/testcases/service_base.py +++ b/lib/dl_core_testing/dl_core_testing/testcases/service_base.py @@ -1,21 +1,32 @@ from __future__ import annotations import abc +import contextlib from typing import ( Any, + AsyncGenerator, + Callable, ClassVar, NamedTuple, Optional, ) import pytest +from redis.asyncio import Redis from dl_api_commons.base_models import RequestContextInfo from dl_api_commons.reporting.registry import DefaultReportingRegistry +from dl_compeng_pg.compeng_pg_base.data_processor_service_pg import CompEngPgConfig from dl_configs.connectors_settings import ConnectorSettingsBase from dl_configs.crypto_keys import CryptoKeysConfig from dl_configs.rqe import RQEConfig -from dl_constants.enums import ConnectionType +from dl_configs.settings_submodels import RedisSettings +from dl_constants.enums import ( + ConnectionType, + ProcessorType, +) +from dl_core.aio.web_app_services.data_processing.data_processor import DataProcessorService +from dl_core.aio.web_app_services.data_processing.factory import make_compeng_service from dl_core.connections_security.base import InsecureConnectionSecurityManager from dl_core.services_registry.conn_executor_factory import DefaultConnExecutorFactory from dl_core.services_registry.inst_specific_sr import InstallationSpecificServiceRegistryFactory @@ -50,6 +61,8 @@ class ServiceFixtureTextClass(metaclass=abc.ABCMeta): core_test_config: ClassVar[CoreTestEnvironmentConfigurationBase] connection_settings: ClassVar[Optional[ConnectorSettingsBase]] = None inst_specific_sr_factory: ClassVar[Optional[InstallationSpecificServiceRegistryFactory]] = None + data_caches_enabled: ClassVar[bool] = False + compeng_enabled: ClassVar[bool] = False @pytest.fixture(scope="session") def conn_us_config(self) -> USConfig: @@ -69,10 +82,66 @@ def conn_bi_context(self) -> RequestContextInfo: def conn_exec_factory_async_env(self) -> bool: return False + @contextlib.asynccontextmanager + async def _make_redis(self, redis_settings: RedisSettings) -> AsyncGenerator[Redis, None]: + redis_client = Redis( + host=redis_settings.HOSTS[0], + port=redis_settings.PORT, + db=redis_settings.DB, + password=redis_settings.PASSWORD, + ) + try: + yield redis_client + finally: + await redis_client.close() + await redis_client.connection_pool.disconnect() + + @pytest.fixture(scope="function") + async def caches_redis_client_factory(self) -> Optional[Callable[[bool], Redis]]: + if not self.data_caches_enabled: + yield None + + else: + redis_settings = self.core_test_config.get_redis_setting_maker().get_redis_settings_cache() + async with self._make_redis(redis_settings=redis_settings) as redis_client: + try: + yield lambda *args: redis_client + finally: + await redis_client.close() + await redis_client.connection_pool.disconnect() + + @pytest.fixture(scope="function") + async def data_processor_service_factory( + self, + ) -> AsyncGenerator[Optional[Callable[[ProcessorType], DataProcessorService]], None]: + """ + PG CompEng pool fixture. + + Can only work properly in `db` tests, but should probably be okay to + instantiate without it. + """ + + if not self.compeng_enabled: + yield None + + else: + compeng_type = ProcessorType.ASYNCPG + processor = make_compeng_service( + processor_type=compeng_type, + config=CompEngPgConfig(url=self.core_test_config.get_compeng_url()), + ) + await processor.initialize() + try: + yield (lambda *args: processor) + finally: + await processor.finalize() + def service_registry_factory( self, conn_exec_factory_async_env: bool, conn_bi_context: RequestContextInfo, + caches_redis_client_factory: Optional[Callable[[bool], Optional[Redis]]] = None, + data_processor_service_factory: Optional[Callable[[bool], DataProcessorService]] = None, **kwargs: Any, ) -> ServicesRegistry: sr_future_ref: FutureRef[ServicesRegistry] = FutureRef() @@ -96,6 +165,8 @@ def service_registry_factory( if self.inst_specific_sr_factory is not None else None ), + caches_redis_client_factory=caches_redis_client_factory, + data_processor_service_factory=data_processor_service_factory, **kwargs, ) sr_future_ref.fulfill(service_registry) @@ -109,12 +180,14 @@ def conn_sync_service_registry( return self.service_registry_factory( conn_exec_factory_async_env=False, conn_bi_context=conn_bi_context, + caches_redis_client_factory=None, ) @pytest.fixture(scope="session") def conn_async_service_registry( self, conn_bi_context: RequestContextInfo, + # caches_redis_client_factory: Optional[Callable[[bool], Redis]], # FIXME: switch to function scope to use ) -> ServicesRegistry: return self.service_registry_factory( conn_exec_factory_async_env=True, diff --git a/lib/dl_testing/dl_testing/utils.py b/lib/dl_testing/dl_testing/utils.py index af4b9c2dc..66b303fdb 100644 --- a/lib/dl_testing/dl_testing/utils.py +++ b/lib/dl_testing/dl_testing/utils.py @@ -15,6 +15,10 @@ import pytest +from dl_api_commons.reporting.profiler import ( + PROFILING_LOG_NAME, + QUERY_PROFILING_ENTRY, +) from dl_testing.containers import get_test_container_hostport from dl_testing.shared_testing_constants import RUN_DEVHOST_TESTS from dl_utils.wait import wait_for @@ -105,3 +109,15 @@ def override_env_cm(to_set: dict[str, str], purge: bool = False): for k, v in preserved.items(): os.environ[k] = v + + +def _is_profiling_record(rec) -> bool: + return rec.name == PROFILING_LOG_NAME and rec.msg == QUERY_PROFILING_ENTRY + + +def get_dump_request_profile_records(caplog, single: bool = False): + return get_log_record( + caplog, + predicate=_is_profiling_record, + single=single, + )