From fa60aef008cddbd783de0db62f6446b5375ad33c Mon Sep 17 00:00:00 2001 From: ThanKarab Date: Thu, 21 Sep 2023 13:59:28 +0300 Subject: [PATCH] NODE service get_table_data task is only allowed for published tables. https://team-1617704806227.atlassian.net/browse/MIP-825 The 'get_table_data' task uses the db public user to fetch data so it only works for published data. Added 'use_public_user' parameter in the monetdb_facade method, so that the public user is used for the queries. Added 'get_table_data_from_db' method in the standalone tests (conftest.py) and changed all tests to use this method instead of calling the NODE task. Refactored many standalone tests to use ONLY the task they are testing, instead of using multiple NODE tasks to ensure the proper result. Instead direct DB queries are made. Added on/off switch named 'protect_local_data'. When the switch is on, the 'get_table_data' will use the public user, otherwise not. The GLOBALNODE should always have this variable switched off. In the LOCALNODE this variable should be off ONLY in testing scenarios. --- README.md | 1 + exareme2/node/README.md | 7 +- exareme2/node/config.toml | 1 + .../node/monetdb_interface/common_actions.py | 23 +- .../node/monetdb_interface/monet_db_facade.py | 41 +- exareme2/node/tasks/common.py | 7 +- exareme2/node/tasks/smpc.py | 6 +- .../templates/mipengine-globalnode.yaml | 2 + kubernetes/templates/mipengine-localnode.yaml | 2 + tasks.py | 10 + .../five_node_deployment_template.toml | 1 + .../one_node_deployment_template.toml | 1 + tests/standalone_tests/conftest.py | 7 + .../test_linear_regression.py | 1 - .../test_logistic_regression.py | 1 - tests/standalone_tests/test_merge_tables.py | 40 +- .../standalone_tests/test_monet_db_facade.py | 21 +- tests/standalone_tests/test_remote_tables.py | 48 +- .../standalone_tests/test_smpc_node_tasks.py | 144 +++--- tests/standalone_tests/test_tables.py | 140 +++--- .../test_views_and_filters.py | 423 ++++-------------- .../test_external_smpc_globalnode.toml | 1 + .../test_external_smpc_localnode1.toml | 1 + .../test_external_smpc_localnode2.toml | 1 + .../testing_env_configs/test_globalnode.toml | 1 + .../testing_env_configs/test_localnode1.toml | 1 + .../testing_env_configs/test_localnode2.toml | 1 + .../test_localnodetmp.toml | 1 + .../test_smpc_globalnode.toml | 1 + .../test_smpc_localnode1.toml | 1 + .../test_smpc_localnode2.toml | 1 + 31 files changed, 363 insertions(+), 574 deletions(-) diff --git a/README.md b/README.md index 86fd2e2dd..056ddd53b 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,7 @@ [privacy] minimum_row_count = 10 + protect_local_data = false [cleanup] nodes_cleanup_interval=10 diff --git a/exareme2/node/README.md b/exareme2/node/README.md index ffcf80dc9..28c70a8d0 100644 --- a/exareme2/node/README.md +++ b/exareme2/node/README.md @@ -17,11 +17,16 @@ NODE_IDENTIFIER=globalnode NODE_ROLE=GLOBALNODE LOG_LEVEL=INFO FRAMEWORK_LOG_LEVEL=INFO +PROTECT_LOCAL_DATA=false RABBITMQ_IP=172.17.0.1 RABBITMQ_PORT=5670 MONETDB_IP=172.17.0.1 MONETDB_PORT=50000 -MONETDB_PASSWORD=executor +MONETDB_LOCAL_USERNAME=executor +MONETDB_LOCAL_PASSWORD=executor +MONETDB_PUBLIC_USERNAME=guest +MONETDB_PUBLIC_PASSWORD=guest +SMPC_ENABLED=false ``` Then start the container with: diff --git a/exareme2/node/config.toml b/exareme2/node/config.toml index f671f7152..ffa332361 100644 --- a/exareme2/node/config.toml +++ b/exareme2/node/config.toml @@ -6,6 +6,7 @@ framework_log_level = "$FRAMEWORK_LOG_LEVEL" [privacy] minimum_row_count = 10 +protect_local_data = "$PROTECT_LOCAL_DATA" [celery] worker_concurrency = 16 diff --git a/exareme2/node/monetdb_interface/common_actions.py b/exareme2/node/monetdb_interface/common_actions.py index 40e2d161f..278d6288d 100644 --- a/exareme2/node/monetdb_interface/common_actions.py +++ b/exareme2/node/monetdb_interface/common_actions.py @@ -5,10 +5,12 @@ from exareme2 import DType from exareme2.exceptions import TablesNotFound +from exareme2.node import config as node_config from exareme2.node.monetdb_interface.guard import is_datamodel from exareme2.node.monetdb_interface.guard import sql_injection_guard from exareme2.node.monetdb_interface.monet_db_facade import db_execute_and_fetchall from exareme2.node.monetdb_interface.monet_db_facade import db_execute_query +from exareme2.node_info_DTOs import NodeRole from exareme2.node_tasks_DTOs import ColumnInfo from exareme2.node_tasks_DTOs import CommonDataElement from exareme2.node_tasks_DTOs import CommonDataElements @@ -170,8 +172,11 @@ def get_table_names(table_type: TableType, context_id: str) -> List[str]: return [table[0] for table in table_names] -@sql_injection_guard(table_name=str.isidentifier) -def get_table_data(table_name: str) -> List[ColumnData]: +@sql_injection_guard( + table_name=str.isidentifier, + use_public_user=None, +) +def get_table_data(table_name: str, use_public_user: bool = True) -> List[ColumnData]: """ Returns a list of columns data which will contain name, type and the data of the specific column. @@ -179,24 +184,20 @@ def get_table_data(table_name: str) -> List[ColumnData]: ---------- table_name : str The name of the table + use_public_user : bool + Will the public or local user be used to access the data? Returns ------ List[ColumnData] A list of column data """ + schema = get_table_schema(table_name) - # TODO: blocked by https://team-1617704806227.atlassian.net/browse/MIP-133 . - # Retrieving the data should be a simple select. - # row_stored_data = db_execute_and_fetchall(f"SELECT * FROM {table_name}") + local_username = node_config.monetdb.local_username row_stored_data = db_execute_and_fetchall( - f""" - SELECT {table_name}.* - FROM {table_name} - INNER JOIN tables ON tables.name = '{table_name}' - WHERE tables.system=false - """ + f"SELECT * FROM {local_username}.{table_name}", use_public_user=use_public_user ) column_stored_data = list(zip(*row_stored_data)) diff --git a/exareme2/node/monetdb_interface/monet_db_facade.py b/exareme2/node/monetdb_interface/monet_db_facade.py index 5ad1b3779..f144abde4 100644 --- a/exareme2/node/monetdb_interface/monet_db_facade.py +++ b/exareme2/node/monetdb_interface/monet_db_facade.py @@ -23,25 +23,34 @@ class _DBExecutionDTO(BaseModel): query: str parameters: Optional[List[Any]] + use_public_user: bool = False timeout: Optional[int] class Config: allow_mutation = False -def db_execute_and_fetchall(query: str, parameters=None) -> List: +def db_execute_and_fetchall( + query: str, parameters=None, use_public_user: bool = False +) -> List: query_execution_timeout = node_config.celery.tasks_timeout db_execution_dto = _DBExecutionDTO( - query=query, parameters=parameters, timeout=query_execution_timeout + query=query, + parameters=parameters, + use_public_user=use_public_user, + timeout=query_execution_timeout, ) return _execute_and_fetchall(db_execution_dto=db_execution_dto) -def db_execute_query(query: str, parameters=None): +def db_execute_query(query: str, parameters=None, use_public_user: bool = False): query_execution_timeout = node_config.celery.tasks_timeout query = convert_to_idempotent(query) db_execution_dto = _DBExecutionDTO( - query=query, parameters=parameters, timeout=query_execution_timeout + query=query, + parameters=parameters, + use_public_user=use_public_user, + timeout=query_execution_timeout, ) _execute(db_execution_dto=db_execution_dto, lock=query_execution_lock) @@ -62,12 +71,19 @@ def db_execute_udf(query: str, parameters=None): # Connection Pool disabled due to bugs in maintaining connections @contextmanager -def _connection(): +def _connection(use_public_user: bool): + if use_public_user: + username = node_config.monetdb.public_username + password = node_config.monetdb.public_password + else: + username = node_config.monetdb.local_username + password = node_config.monetdb.local_password + conn = pymonetdb.connect( hostname=node_config.monetdb.ip, port=node_config.monetdb.port, - username=node_config.monetdb.local_username, - password=node_config.monetdb.local_password, + username=username, + password=password, database=node_config.monetdb.database, ) yield conn @@ -75,8 +91,8 @@ def _connection(): @contextmanager -def _cursor(commit=False): - with _connection() as conn: +def _cursor(use_public_user: bool, commit: bool = False): + with _connection(use_public_user) as conn: cur = conn.cursor() yield cur cur.close() @@ -163,7 +179,7 @@ def _execute_and_fetchall(db_execution_dto) -> List: Used to execute only select queries that return a result. 'parameters' option to provide the functionality of bind-parameters. """ - with _cursor() as cur: + with _cursor(use_public_user=db_execution_dto.use_public_user) as cur: cur.execute(db_execution_dto.query, db_execution_dto.parameters) result = cur.fetchall() return result @@ -249,7 +265,10 @@ def _execute(db_execution_dto: _DBExecutionDTO, lock): try: with _lock(lock, db_execution_dto.timeout): - with _cursor(commit=True) as cur: + with _cursor( + use_public_user=db_execution_dto.use_public_user, + commit=True, + ) as cur: cur.execute(db_execution_dto.query, db_execution_dto.parameters) except TimeoutError: error_msg = f""" diff --git a/exareme2/node/tasks/common.py b/exareme2/node/tasks/common.py index c1e2d5079..f0c480888 100644 --- a/exareme2/node/tasks/common.py +++ b/exareme2/node/tasks/common.py @@ -112,7 +112,12 @@ def get_table_data(request_id: str, table_name: str) -> str: str(TableData) An object of TableData in a jsonified format """ - columns = common_actions.get_table_data(table_name) + # If the public user is used, its ensured that the table won't hold private data. + # Tables are published to the public DB user when they are meant for sending to other nodes. + # The "protect_local_data" config allows for turning this logic off in testing scenarios. + use_public_user = True if node_config.privacy.protect_local_data else False + + columns = common_actions.get_table_data(table_name, use_public_user) return TableData(name=table_name, columns=columns).json() diff --git a/exareme2/node/tasks/smpc.py b/exareme2/node/tasks/smpc.py index 2aeaf1a6c..14fd20940 100644 --- a/exareme2/node/tasks/smpc.py +++ b/exareme2/node/tasks/smpc.py @@ -43,7 +43,7 @@ def validate_smpc_templates_match( Nothing, only throws exception if they don't match. """ - templates = _get_smpc_values_from_table_data(get_table_data(table_name)) + templates = _get_smpc_values_from_table_data(get_table_data(table_name, False)) first_template, *_ = templates for template in templates[1:]: if template != first_template: @@ -73,7 +73,9 @@ def load_data_to_smpc_client(request_id: str, table_name: str, jobid: str) -> st "load_data_to_smpc_client is allowed only for a LOCALNODE." ) - smpc_values, *_ = _get_smpc_values_from_table_data(get_table_data(table_name)) + smpc_values, *_ = _get_smpc_values_from_table_data( + get_table_data(table_name, False) + ) smpc_cluster.load_data_to_smpc_client( node_config.smpc.client_address, jobid, smpc_values diff --git a/kubernetes/templates/mipengine-globalnode.yaml b/kubernetes/templates/mipengine-globalnode.yaml index 3d756f2e4..131648f19 100644 --- a/kubernetes/templates/mipengine-globalnode.yaml +++ b/kubernetes/templates/mipengine-globalnode.yaml @@ -90,6 +90,8 @@ spec: value: {{ .Values.log_level }} - name: FRAMEWORK_LOG_LEVEL value: {{ .Values.framework_log_level }} + - name: PROTECT_LOCAL_DATA + value: "false" # The GLOBALNODE does not need to secure its data, since they are not private. - name: CELERY_TASKS_TIMEOUT value: {{ quote .Values.controller.celery_tasks_timeout }} - name: RABBITMQ_IP diff --git a/kubernetes/templates/mipengine-localnode.yaml b/kubernetes/templates/mipengine-localnode.yaml index 9851847e4..a5b49bb9d 100644 --- a/kubernetes/templates/mipengine-localnode.yaml +++ b/kubernetes/templates/mipengine-localnode.yaml @@ -119,6 +119,8 @@ spec: value: {{ .Values.log_level }} - name: FRAMEWORK_LOG_LEVEL value: {{ .Values.framework_log_level }} + - name: PROTECT_LOCAL_DATA + value: "true" - name: CELERY_TASKS_TIMEOUT value: {{ quote .Values.controller.celery_tasks_timeout }} - name: RABBITMQ_IP diff --git a/tasks.py b/tasks.py index 6ba44a73f..e20c7426d 100644 --- a/tasks.py +++ b/tasks.py @@ -155,6 +155,12 @@ def create_configs(c): node_config["privacy"]["minimum_row_count"] = deployment_config["privacy"][ "minimum_row_count" ] + if node["role"] == "GLOBALNODE": + node_config["privacy"]["protect_local_data"] = False + else: + node_config["privacy"]["protect_local_data"] = deployment_config["privacy"][ + "protect_local_data" + ] node_config["smpc"]["enabled"] = deployment_config["smpc"]["enabled"] if node_config["smpc"]["enabled"]: @@ -163,11 +169,15 @@ def create_configs(c): node_config["smpc"][ "coordinator_address" ] = f"http://{deployment_config['ip']}:{SMPC_COORDINATOR_PORT}" + node_config["privacy"]["protect_local_data"] = False else: node_config["smpc"]["client_id"] = node["id"] node_config["smpc"][ "client_address" ] = f"http://{deployment_config['ip']}:{node['smpc_client_port']}" + node_config["privacy"]["protect_local_data"] = deployment_config[ + "privacy" + ]["protect_local_data"] node_config_file = NODES_CONFIG_DIR / f"{node['id']}.toml" with open(node_config_file, "w+") as fp: diff --git a/tests/algorithm_validation_tests/five_node_deployment_template.toml b/tests/algorithm_validation_tests/five_node_deployment_template.toml index 07cb9c38c..82162307d 100644 --- a/tests/algorithm_validation_tests/five_node_deployment_template.toml +++ b/tests/algorithm_validation_tests/five_node_deployment_template.toml @@ -16,6 +16,7 @@ celery_run_udf_task_timeout = 300 [privacy] minimum_row_count = 1 +protect_local_data = false [cleanup] nodes_cleanup_interval=30 diff --git a/tests/algorithm_validation_tests/one_node_deployment_template.toml b/tests/algorithm_validation_tests/one_node_deployment_template.toml index fa06860d1..c0bb22159 100644 --- a/tests/algorithm_validation_tests/one_node_deployment_template.toml +++ b/tests/algorithm_validation_tests/one_node_deployment_template.toml @@ -16,6 +16,7 @@ celery_run_udf_task_timeout = 120 [privacy] minimum_row_count = 1 +protect_local_data = false [cleanup] nodes_cleanup_interval=30 diff --git a/tests/standalone_tests/conftest.py b/tests/standalone_tests/conftest.py index 4386360fe..ba7404c3b 100644 --- a/tests/standalone_tests/conftest.py +++ b/tests/standalone_tests/conftest.py @@ -591,6 +591,13 @@ def insert_data_to_db( db_cursor.execute(sql_clause, list(chain(*table_values))) +def get_table_data_from_db( + db_cursor, + table_name: str, +): + return db_cursor.execute(f"SELECT * FROM {table_name};").fetchall() + + def _clean_db(cursor): class TableType(enum.Enum): NORMAL = 0 diff --git a/tests/standalone_tests/test_linear_regression.py b/tests/standalone_tests/test_linear_regression.py index 23a6ec3c5..191b79dce 100644 --- a/tests/standalone_tests/test_linear_regression.py +++ b/tests/standalone_tests/test_linear_regression.py @@ -15,7 +15,6 @@ def run_udf_on_local_nodes(self, func, keyword_args, *args, **kwargs): run_udf_on_global_node = run_udf_on_local_nodes -@pytest.mark.slow class TestLinearRegression: @pytest.mark.parametrize("nrows", range(10, 100, 10)) @pytest.mark.parametrize("ncols", range(1, 20)) diff --git a/tests/standalone_tests/test_logistic_regression.py b/tests/standalone_tests/test_logistic_regression.py index b8df812a9..bab03a8c0 100644 --- a/tests/standalone_tests/test_logistic_regression.py +++ b/tests/standalone_tests/test_logistic_regression.py @@ -9,7 +9,6 @@ np.random.seed(1) -@pytest.mark.slow class TestLogisticRegression: @pytest.mark.parametrize("nrows", range(10, 100, 10)) @pytest.mark.parametrize("ncols", range(1, 20)) diff --git a/tests/standalone_tests/test_merge_tables.py b/tests/standalone_tests/test_merge_tables.py index 9b870eb18..0ddc6e720 100644 --- a/tests/standalone_tests/test_merge_tables.py +++ b/tests/standalone_tests/test_merge_tables.py @@ -6,7 +6,6 @@ from exareme2.exceptions import IncompatibleSchemasMergeException from exareme2.exceptions import TablesNotFound from exareme2.node_tasks_DTOs import ColumnInfo -from exareme2.node_tasks_DTOs import TableData from exareme2.node_tasks_DTOs import TableInfo from exareme2.node_tasks_DTOs import TableSchema from exareme2.node_tasks_DTOs import TableType @@ -15,6 +14,7 @@ from tests.standalone_tests.conftest import MONETDB_LOCALNODE2_PORT from tests.standalone_tests.conftest import TASKS_TIMEOUT from tests.standalone_tests.conftest import create_table_in_db +from tests.standalone_tests.conftest import get_table_data_from_db from tests.standalone_tests.conftest import insert_data_to_db from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature from tests.standalone_tests.std_output_logger import StdOutputLogger @@ -22,7 +22,6 @@ create_remote_task_signature = get_celery_task_signature("create_remote_table") create_merge_table_task_signature = get_celery_task_signature("create_merge_table") get_merge_tables_task_signature = get_celery_task_signature("get_merge_tables") -get_table_data_task_signature = get_celery_task_signature("get_table_data") @pytest.fixture(autouse=True) @@ -210,6 +209,7 @@ def test_create_merge_table_on_top_of_remote_tables( use_localnode2_database, globalnode_node_service, globalnode_celery_app, + globalnode_db_cursor, use_globalnode_database, ): """ @@ -224,7 +224,7 @@ def test_create_merge_table_on_top_of_remote_tables( ColumnInfo(name="col3", dtype=DType.STR), ] ) - table_values = [[1, 0.1, "test1"], [2, 0.2, "test2"], [3, 0.3, "test3"]] + initial_table_values = [[1, 0.1, "test1"], [2, 0.2, "test2"], [3, 0.3, "test3"]] localnode1_tableinfo = TableInfo( name=f"normal_testlocalnode1_{context_id}", schema_=table_schema, @@ -247,8 +247,12 @@ def test_create_merge_table_on_top_of_remote_tables( localnode2_tableinfo.schema_, True, ) - insert_data_to_db(localnode1_tableinfo.name, table_values, localnode1_db_cursor) - insert_data_to_db(localnode2_tableinfo.name, table_values, localnode2_db_cursor) + insert_data_to_db( + localnode1_tableinfo.name, initial_table_values, localnode1_db_cursor + ) + insert_data_to_db( + localnode2_tableinfo.name, initial_table_values, localnode2_db_cursor + ) # Create remote tables local_node_1_monetdb_sock_address = f"{str(COMMON_IP)}:{MONETDB_LOCALNODE1_PORT}" @@ -305,21 +309,15 @@ def test_create_merge_table_on_top_of_remote_tables( ) ) - # Validate merge table row count - async_result = globalnode_celery_app.queue_task( - task_signature=get_table_data_task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=merge_table_info.name, + # Validate merge tables contains both remote tables' values + merge_table_values = get_table_data_from_db( + globalnode_db_cursor, merge_table_info.name ) - table_data_json = globalnode_celery_app.get_result( - async_result=async_result, - logger=StdOutputLogger(), - timeout=TASKS_TIMEOUT, - ) - table_data = TableData.parse_raw(table_data_json) - column_count = len(table_data.columns) - assert column_count == len(table_values) - row_count = len(table_data.columns[0].data) - assert row_count == len(table_values[0] * 2) + column_count = len(initial_table_values[0]) + assert column_count == len(merge_table_values[0]) + + row_count = len(initial_table_values) + assert row_count * 2 == len( + merge_table_values + ) # The rows are doubled since we have 2 localnodes with N rows each. diff --git a/tests/standalone_tests/test_monet_db_facade.py b/tests/standalone_tests/test_monet_db_facade.py index 7786d2eb3..e16ceb6bc 100644 --- a/tests/standalone_tests/test_monet_db_facade.py +++ b/tests/standalone_tests/test_monet_db_facade.py @@ -11,6 +11,8 @@ from exareme2.node.monetdb_interface.monet_db_facade import ( _validate_exception_could_be_recovered, ) +from exareme2.node.monetdb_interface.monet_db_facade import db_execute_and_fetchall +from exareme2.node.monetdb_interface.monet_db_facade import db_execute_query from exareme2.node.node_logger import init_logger from tests.standalone_tests.conftest import COMMON_IP from tests.standalone_tests.conftest import MONETDB_LOCALNODETMP_NAME @@ -54,9 +56,11 @@ def patch_node_config(): "database": "db", "local_username": "executor", "local_password": "executor", + "public_username": "guest", + "public_password": "guest", }, "celery": { - "tasks_timeout": 60, + "tasks_timeout": 5, "run_udf_task_timeout": 120, "worker_concurrency": 1, }, @@ -162,3 +166,18 @@ def get_exception_cases(): ) def test_validate_exception_is_recoverable(exception: Exception, expected): assert _validate_exception_could_be_recovered(exception) == expected + + +@pytest.mark.slow +@pytest.mark.very_slow +def test_db_execute_use_public_user_parameter( + monetdb_localnodetmp, +): + table_name = "local_user_table" + + db_execute_query(query=f"create table {table_name} (col1 int);") + + with pytest.raises(OperationalError, match=r"no such table"): + db_execute_and_fetchall( + query=f"select * from {table_name};", use_public_user=True + ) diff --git a/tests/standalone_tests/test_remote_tables.py b/tests/standalone_tests/test_remote_tables.py index 10ab2a0fb..d357fdd05 100644 --- a/tests/standalone_tests/test_remote_tables.py +++ b/tests/standalone_tests/test_remote_tables.py @@ -1,12 +1,10 @@ import uuid import pytest -from pymonetdb import OperationalError +from sqlalchemy.exc import OperationalError -from exareme2.controller.celery_app import CeleryTaskTimeoutException from exareme2.datatypes import DType from exareme2.node_tasks_DTOs import ColumnInfo -from exareme2.node_tasks_DTOs import TableData from exareme2.node_tasks_DTOs import TableInfo from exareme2.node_tasks_DTOs import TableSchema from exareme2.node_tasks_DTOs import TableType @@ -14,12 +12,12 @@ from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT from tests.standalone_tests.conftest import TASKS_TIMEOUT from tests.standalone_tests.conftest import create_table_in_db +from tests.standalone_tests.conftest import get_table_data_from_db from tests.standalone_tests.conftest import insert_data_to_db from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature from tests.standalone_tests.std_output_logger import StdOutputLogger create_remote_table_task_signature = get_celery_task_signature("create_remote_table") -get_table_data_task_signature = get_celery_task_signature("get_table_data") @pytest.fixture(autouse=True) @@ -41,6 +39,7 @@ def test_remote_table_properly_mirrors_data( localnode1_db_cursor, localnode1_node_service, localnode1_celery_app, + globalnode_db_cursor, globalnode_node_service, globalnode_celery_app, ): @@ -53,7 +52,7 @@ def test_remote_table_properly_mirrors_data( ColumnInfo(name="col3", dtype=DType.STR), ] ) - table_values = [[1, 0.1, "test1"], [2, 0.2, "test2"], [3, 0.3, "test3"]] + initial_table_values = [[1, 0.1, "test1"], [2, 0.2, "test2"], [3, 0.3, "test3"]] table_info = TableInfo( name=f"normal_testlocalnode1_{context_id}", schema_=table_schema, @@ -65,7 +64,7 @@ def test_remote_table_properly_mirrors_data( table_schema, True, ) - insert_data_to_db(table_info.name, table_values, localnode1_db_cursor) + insert_data_to_db(table_info.name, initial_table_values, localnode1_db_cursor) async_result = globalnode_celery_app.queue_task( task_signature=create_remote_table_task_signature, @@ -81,23 +80,11 @@ def test_remote_table_properly_mirrors_data( timeout=TASKS_TIMEOUT, ) - async_result = globalnode_celery_app.queue_task( - task_signature=get_table_data_task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=table_info.name, - ) - table_data = TableData.parse_raw( - globalnode_celery_app.get_result( - async_result=async_result, - logger=StdOutputLogger(), - timeout=TASKS_TIMEOUT, - ) - ) + table_values = get_table_data_from_db(globalnode_db_cursor, table_info.name) # Validate same size table result - assert len(table_data.columns) == len(table_values[0]) - assert len(table_data.columns[0].data) == len(table_values) + assert len(table_values[0]) == len(initial_table_values[0]) + assert len(table_values) == len(initial_table_values) @pytest.mark.slow @@ -107,6 +94,7 @@ def test_remote_table_error_on_non_published_table( localnode1_db_cursor, localnode1_node_service, localnode1_celery_app, + globalnode_db_cursor, globalnode_node_service, globalnode_celery_app, ): @@ -117,10 +105,6 @@ def test_remote_table_error_on_non_published_table( The error returned is an OperationalError with message "Exception occurred in the remote server, please check the log there". - - It takes a lot of time for the task to complete because since this is an OperationalError, with generic error - message, we don't know what the problem actually is. In the monetdb_facade when we get such generic errors we - retry in case the database returns proper response, which is not possible in this case, so we wait for timeout. """ local_node_monetdb_sock_address = f"{str(COMMON_IP)}:{MONETDB_LOCALNODE1_PORT}" @@ -157,17 +141,5 @@ def test_remote_table_error_on_non_published_table( timeout=TASKS_TIMEOUT, ) - async_result = globalnode_celery_app.queue_task( - task_signature=get_table_data_task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=table_info.name, - ) - - # The timeout should be larger than the monetdb retry time, in order to get the final error after retrying the query with pytest.raises(OperationalError): - globalnode_celery_app.get_result( - async_result=async_result, - logger=StdOutputLogger(), - timeout=TASKS_TIMEOUT, - ) + get_table_data_from_db(globalnode_db_cursor, table_info.name) diff --git a/tests/standalone_tests/test_smpc_node_tasks.py b/tests/standalone_tests/test_smpc_node_tasks.py index 94088c657..8ec174686 100644 --- a/tests/standalone_tests/test_smpc_node_tasks.py +++ b/tests/standalone_tests/test_smpc_node_tasks.py @@ -16,9 +16,9 @@ from exareme2.node_tasks_DTOs import NodeUDFPosArguments from exareme2.node_tasks_DTOs import NodeUDFResults from exareme2.node_tasks_DTOs import SMPCTablesInfo -from exareme2.node_tasks_DTOs import TableData from exareme2.node_tasks_DTOs import TableInfo from exareme2.node_tasks_DTOs import TableSchema +from exareme2.node_tasks_DTOs import TableType from exareme2.smpc_cluster_comm_helpers import ADD_DATASET_ENDPOINT from exareme2.smpc_cluster_comm_helpers import TRIGGER_COMPUTATION_ENDPOINT from exareme2.smpc_cluster_comm_helpers import get_smpc_result @@ -31,12 +31,11 @@ from tests.algorithms.orphan_udfs import smpc_local_step from tests.standalone_tests.conftest import LOCALNODE1_SMPC_CONFIG_FILE from tests.standalone_tests.conftest import LOCALNODE2_SMPC_CONFIG_FILE -from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT -from tests.standalone_tests.conftest import MONETDB_SMPC_LOCALNODE1_PORT -from tests.standalone_tests.conftest import MONETDB_SMPC_LOCALNODE2_PORT from tests.standalone_tests.conftest import SMPC_COORDINATOR_ADDRESS from tests.standalone_tests.conftest import TASKS_TIMEOUT +from tests.standalone_tests.conftest import create_table_in_db from tests.standalone_tests.conftest import get_node_config_by_id +from tests.standalone_tests.conftest import get_table_data_from_db from tests.standalone_tests.conftest import insert_data_to_db from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature @@ -47,26 +46,18 @@ SMPC_GET_DATASET_ENDPOINT = "/api/update-dataset/" -def create_table_with_one_column_and_ten_rows( - celery_app: Celery, db_cursor -) -> Tuple[TableInfo, int]: - create_table_task = get_celery_task_signature("create_table") - - table_schema = TableSchema( - columns=[ - ColumnInfo(name="col1", dtype=DType.INT), - ] - ) - table_info = TableInfo.parse_raw( - celery_app.signature(create_table_task) - .delay( - request_id=request_id, - context_id=context_id, - command_id=uuid.uuid4().hex, - schema_json=table_schema.json(), - ) - .get(timeout=TASKS_TIMEOUT) +def create_table_with_one_column_and_ten_rows(db_cursor) -> Tuple[TableInfo, int]: + table_name = f"table_one_column_ten_rows_{context_id}" + table_info = TableInfo( + name=table_name, + schema_=TableSchema( + columns=[ + ColumnInfo(name="col1", dtype=DType.INT), + ] + ), + type_=TableType.NORMAL, ) + create_table_in_db(db_cursor, table_info.name, table_info.schema_) values = [[1], [2], [3], [4], [5], [6], [7], [8], [9], [10]] @@ -75,34 +66,27 @@ def create_table_with_one_column_and_ten_rows( return table_info, 55 -def create_secure_transfer_table(celery_app: Celery) -> TableInfo: - task_signature = get_celery_task_signature("create_table") - - table_schema = TableSchema( - columns=[ - ColumnInfo(name="secure_transfer", dtype=DType.JSON), - ] - ) - table_info = TableInfo.parse_raw( - ( - celery_app.signature(task_signature) - .delay( - request_id=request_id, - context_id=context_id, - command_id=uuid.uuid4().hex, - schema_json=table_schema.json(), - ) - .get(timeout=TASKS_TIMEOUT) - ) +def create_secure_transfer_table(db_cursor) -> TableInfo: + table_name = f"table_secure_transfer_{context_id}" + table_info = TableInfo( + name=table_name, + schema_=TableSchema( + columns=[ + ColumnInfo(name="secure_transfer", dtype=DType.JSON), + ] + ), + type_=TableType.NORMAL, ) + create_table_in_db(db_cursor, table_info.name, table_info.schema_) + return table_info # TODO More dynamic so it can receive any secure_transfer values def create_table_with_secure_transfer_results_with_smpc_off( - celery_app: Celery, db_cursor + db_cursor, ) -> Tuple[TableInfo, int]: - table_info = create_secure_transfer_table(celery_app) + table_info = create_secure_transfer_table(db_cursor) secure_transfer_1_value = 100 secure_transfer_2_value = 11 @@ -124,9 +108,9 @@ def create_table_with_secure_transfer_results_with_smpc_off( def create_table_with_multiple_secure_transfer_templates( - celery_app: Celery, db_cursor, similar: bool + db_cursor, similar: bool ) -> TableInfo: - table_info = create_secure_transfer_table(celery_app) + table_info = create_secure_transfer_table(db_cursor) secure_transfer_template = { "sum": {"data": [0, 1, 2, 3], "operation": "sum", "type": "int"} @@ -151,10 +135,8 @@ def create_table_with_multiple_secure_transfer_templates( return table_info -def create_table_with_smpc_sum_op_values( - celery_app: Celery, db_cursor -) -> Tuple[TableInfo, str]: - table_info = create_secure_transfer_table(celery_app) +def create_table_with_smpc_sum_op_values(db_cursor) -> Tuple[TableInfo, str]: + table_info = create_secure_transfer_table(db_cursor) sum_op_values = [0, 1, 2, 3, 4, 5] values = [ @@ -167,21 +149,14 @@ def create_table_with_smpc_sum_op_values( def validate_table_data_match_expected( - celery_app: Celery, - get_table_data_task_signature: str, + db_cursor, table_name: str, expected_values: Any, ): assert table_name is not None - table_data_str = ( - celery_app.signature(get_table_data_task_signature) - .delay(request_id=request_id, table_name=table_name) - .get() - ) - table_data: TableData = TableData.parse_raw(table_data_str) - result_str, *_ = table_data.columns[0].data - result = json.loads(result_str) + table_data = get_table_data_from_db(db_cursor, table_name) + result = json.loads(table_data[0][0]) assert result == expected_values @@ -194,10 +169,9 @@ def test_secure_transfer_output_with_smpc_off( ): localnode1_celery_app = localnode1_celery_app._celery_app run_udf_task = get_celery_task_signature("run_udf") - get_table_data_task = get_celery_task_signature("get_table_data") input_table_info, input_table_name_sum = create_table_with_one_column_and_ten_rows( - localnode1_celery_app, localnode1_db_cursor + localnode1_db_cursor ) pos_args_str = NodeUDFPosArguments( @@ -226,8 +200,7 @@ def test_secure_transfer_output_with_smpc_off( "sum": {"data": input_table_name_sum, "operation": "sum", "type": "int"} } validate_table_data_match_expected( - celery_app=localnode1_celery_app, - get_table_data_task_signature=get_table_data_task, + db_cursor=localnode1_db_cursor, table_name=secure_transfer_result.value.name, expected_values=expected_result, ) @@ -242,14 +215,11 @@ def test_secure_transfer_input_with_smpc_off( ): localnode1_celery_app = localnode1_celery_app._celery_app run_udf_task = get_celery_task_signature("run_udf") - get_table_data_task = get_celery_task_signature("get_table_data") ( secure_transfer_results_tableinfo, secure_transfer_results_values_sum, - ) = create_table_with_secure_transfer_results_with_smpc_off( - localnode1_celery_app, localnode1_db_cursor - ) + ) = create_table_with_secure_transfer_results_with_smpc_off(localnode1_db_cursor) pos_args_str = NodeUDFPosArguments( args=[NodeTableDTO(value=secure_transfer_results_tableinfo)] @@ -276,8 +246,7 @@ def test_secure_transfer_input_with_smpc_off( expected_result = {"total_sum": secure_transfer_results_values_sum} validate_table_data_match_expected( - celery_app=localnode1_celery_app, - get_table_data_task_signature=get_table_data_task, + db_cursor=localnode1_db_cursor, table_name=transfer_result.value.name, expected_values=expected_result, ) @@ -298,7 +267,7 @@ def test_validate_smpc_templates_match( ) table_info = create_table_with_multiple_secure_transfer_templates( - smpc_localnode1_celery_app, localnode1_smpc_db_cursor, True + localnode1_smpc_db_cursor, True ) try: @@ -325,7 +294,7 @@ def test_validate_smpc_templates_dont_match( ) table_info = create_table_with_multiple_secure_transfer_templates( - smpc_localnode1_celery_app, localnode1_smpc_db_cursor, False + localnode1_smpc_db_cursor, False ) with pytest.raises(ValueError) as exc: @@ -347,11 +316,10 @@ def test_secure_transfer_run_udf_flow_with_smpc_on( ): smpc_localnode1_celery_app = smpc_localnode1_celery_app._celery_app run_udf_task = get_celery_task_signature("run_udf") - get_table_data_task = get_celery_task_signature("get_table_data") # ----------------------- SECURE TRANSFER OUTPUT ---------------------- input_table_name, input_table_name_sum = create_table_with_one_column_and_ten_rows( - smpc_localnode1_celery_app, localnode1_smpc_db_cursor + localnode1_smpc_db_cursor ) pos_args_str = NodeUDFPosArguments( @@ -381,8 +349,7 @@ def test_secure_transfer_run_udf_flow_with_smpc_on( assert smpc_result.value.template is not None expected_template = {"sum": {"data": 0, "operation": "sum", "type": "int"}} validate_table_data_match_expected( - celery_app=smpc_localnode1_celery_app, - get_table_data_task_signature=get_table_data_task, + db_cursor=localnode1_smpc_db_cursor, table_name=smpc_result.value.template.name, expected_values=expected_template, ) @@ -390,8 +357,7 @@ def test_secure_transfer_run_udf_flow_with_smpc_on( assert smpc_result.value.sum_op is not None expected_sum_op_values = [input_table_name_sum] validate_table_data_match_expected( - celery_app=smpc_localnode1_celery_app, - get_table_data_task_signature=get_table_data_task, + db_cursor=localnode1_smpc_db_cursor, table_name=smpc_result.value.sum_op.name, expected_values=expected_sum_op_values, ) @@ -421,8 +387,7 @@ def test_secure_transfer_run_udf_flow_with_smpc_on( expected_result = {"total_sum": input_table_name_sum} validate_table_data_match_expected( - celery_app=smpc_localnode1_celery_app, - get_table_data_task_signature=get_table_data_task, + db_cursor=localnode1_smpc_db_cursor, table_name=global_step_result.value.name, expected_values=expected_result, ) @@ -462,7 +427,7 @@ def test_load_data_to_smpc_client( ): smpc_localnode1_celery_app = smpc_localnode1_celery_app._celery_app table_info, sum_op_values_str = create_table_with_smpc_sum_op_values( - smpc_localnode1_celery_app, localnode1_smpc_db_cursor + localnode1_smpc_db_cursor ) load_data_to_smpc_client_task = get_celery_task_signature( "load_data_to_smpc_client" @@ -525,11 +490,11 @@ def test_get_smpc_result( smpc_globalnode_node_service, use_smpc_globalnode_database, smpc_globalnode_celery_app, + globalnode_smpc_db_cursor, smpc_cluster, ): smpc_globalnode_celery_app = smpc_globalnode_celery_app._celery_app get_smpc_result_task = get_celery_task_signature("get_smpc_result") - get_table_data_task = get_celery_task_signature("get_table_data") # --------------- LOAD Dataset to SMPC -------------------- node_config = get_node_config_by_id(LOCALNODE1_SMPC_CONFIG_FILE) @@ -587,8 +552,7 @@ def test_get_smpc_result( ) validate_table_data_match_expected( - celery_app=smpc_globalnode_celery_app, - get_table_data_task_signature=get_table_data_task, + db_cursor=globalnode_smpc_db_cursor, table_name=result_tableinfo.name, expected_values=smpc_computation_data, ) @@ -610,6 +574,7 @@ def test_orchestrate_SMPC_between_two_localnodes_and_the_globalnode( smpc_localnode2_celery_app, localnode1_smpc_db_cursor, localnode2_smpc_db_cursor, + globalnode_smpc_db_cursor, smpc_cluster, ): smpc_globalnode_celery_app = smpc_globalnode_celery_app._celery_app @@ -650,15 +615,11 @@ def test_orchestrate_SMPC_between_two_localnodes_and_the_globalnode( ( input_table_1_name, input_table_1_name_sum, - ) = create_table_with_one_column_and_ten_rows( - smpc_localnode1_celery_app, localnode1_smpc_db_cursor - ) + ) = create_table_with_one_column_and_ten_rows(localnode1_smpc_db_cursor) ( input_table_2_name, input_table_2_name_sum, - ) = create_table_with_one_column_and_ten_rows( - smpc_localnode2_celery_app, localnode2_smpc_db_cursor - ) + ) = create_table_with_one_column_and_ten_rows(localnode2_smpc_db_cursor) # ---------------- RUN LOCAL UDFS WITH SECURE TRANSFER OUTPUT ---------------------- pos_args_str_localnode1 = NodeUDFPosArguments( @@ -818,8 +779,7 @@ def test_orchestrate_SMPC_between_two_localnodes_and_the_globalnode( expected_result = {"total_sum": input_table_1_name_sum + input_table_2_name_sum} validate_table_data_match_expected( - celery_app=smpc_globalnode_celery_app, - get_table_data_task_signature=get_celery_task_signature("get_table_data"), + db_cursor=globalnode_smpc_db_cursor, table_name=global_step_result.value.name, expected_values=expected_result, ) diff --git a/tests/standalone_tests/test_tables.py b/tests/standalone_tests/test_tables.py index d6f92ecae..340d198e3 100644 --- a/tests/standalone_tests/test_tables.py +++ b/tests/standalone_tests/test_tables.py @@ -1,18 +1,15 @@ import uuid as uuid import pytest +from pymonetdb import OperationalError from exareme2.datatypes import DType from exareme2.node_tasks_DTOs import ColumnInfo -from exareme2.node_tasks_DTOs import TableData from exareme2.node_tasks_DTOs import TableInfo from exareme2.node_tasks_DTOs import TableSchema -from exareme2.table_data_DTOs import ColumnDataFloat -from exareme2.table_data_DTOs import ColumnDataInt -from exareme2.table_data_DTOs import ColumnDataStr -from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT from tests.standalone_tests.conftest import TASKS_TIMEOUT -from tests.standalone_tests.conftest import insert_data_to_db +from tests.standalone_tests.conftest import create_table_in_db +from tests.standalone_tests.conftest import get_table_data_from_db from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature from tests.standalone_tests.std_output_logger import StdOutputLogger @@ -34,7 +31,7 @@ def context_id(request_id): @pytest.mark.slow -def test_create_and_find_tables( +def test_create_table( request_id, context_id, localnode1_node_service, @@ -65,6 +62,28 @@ def test_create_and_find_tables( ) ) + table_values = get_table_data_from_db(localnode1_db_cursor, table_1_info.name) + assert len(table_values) == 0 + + +@pytest.mark.slow +def test_get_tables( + request_id, + context_id, + localnode1_node_service, + localnode1_celery_app, + localnode1_db_cursor, +): + table_name = f"normal_testlocalnode1_{context_id}" + table_schema = TableSchema( + columns=[ + ColumnInfo(name="col1", dtype=DType.INT), + ColumnInfo(name="col2", dtype=DType.FLOAT), + ColumnInfo(name="col3", dtype=DType.STR), + ] + ) + create_table_in_db(localnode1_db_cursor, table_name, table_schema) + async_result = localnode1_celery_app.queue_task( task_signature=get_tables_task_signature, logger=StdOutputLogger(), @@ -76,82 +95,75 @@ def test_create_and_find_tables( logger=StdOutputLogger(), timeout=TASKS_TIMEOUT, ) + assert table_name in tables - assert table_1_info.name in tables - - values = [[1, 0.1, "test1"], [2, 0.2, None], [3, 0.3, "test3"]] - insert_data_to_db(table_1_info.name, values, localnode1_db_cursor) - async_result = localnode1_celery_app.queue_task( - task_signature=get_table_data_task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=table_1_info.name, - ) - table_data_json = localnode1_celery_app.get_result( - async_result=async_result, - logger=StdOutputLogger(), - timeout=TASKS_TIMEOUT, +@pytest.mark.slow +def test_get_table_data_not_working_from_unpublished_table( + request_id, + context_id, + localnode1_node_service, + localnode1_celery_app, + localnode1_db_cursor, +): + table_name = f"normal_testlocalnode1_{context_id}" + table_schema = TableSchema( + columns=[ + ColumnInfo(name="col1", dtype=DType.INT), + ColumnInfo(name="col2", dtype=DType.FLOAT), + ColumnInfo(name="col3", dtype=DType.STR), + ] ) - - table_data = TableData.parse_raw(table_data_json) - expected_columns = [ - ColumnDataInt(name="col1", data=[1, 2, 3]), - ColumnDataFloat(name="col2", data=[0.1, 0.2, 0.3]), - ColumnDataStr(name="col3", data=["test1", None, "test3"]), - ] - assert table_data.name == table_1_info.name - assert table_data.columns == expected_columns + create_table_in_db(localnode1_db_cursor, table_name, table_schema) async_result = localnode1_celery_app.queue_task( - task_signature=create_table_task_signature, + task_signature=get_table_data_task_signature, logger=StdOutputLogger(), request_id=request_id, - context_id=context_id, - command_id=uuid.uuid4().hex, - schema_json=table_schema.json(), + table_name=table_name, ) - table_2_info = TableInfo.parse_raw( + with pytest.raises(OperationalError): localnode1_celery_app.get_result( async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT, ) - ) - async_result = localnode1_celery_app.queue_task( - task_signature=get_tables_task_signature, - logger=StdOutputLogger(), - request_id=request_id, - context_id=context_id, + +@pytest.mark.slow +def test_get_table_data_works_on_published_table( + request_id, + context_id, + localnode1_node_service, + localnode1_celery_app, + localnode1_db_cursor, +): + table_name = f"normal_testlocalnode1_{context_id}" + table_schema = TableSchema( + columns=[ + ColumnInfo(name="col1", dtype=DType.INT), + ColumnInfo(name="col2", dtype=DType.FLOAT), + ColumnInfo(name="col3", dtype=DType.STR), + ] ) - tables = localnode1_celery_app.get_result( - async_result=async_result, - logger=StdOutputLogger(), - timeout=TASKS_TIMEOUT, + create_table_in_db( + localnode1_db_cursor, table_name, table_schema, publish_table=True ) - assert table_2_info.name in tables - - values = [[1, 0.1, "test1"], [2, None, "None"], [3, 0.3, None]] - insert_data_to_db(table_2_info.name, values, localnode1_db_cursor) async_result = localnode1_celery_app.queue_task( task_signature=get_table_data_task_signature, logger=StdOutputLogger(), request_id=request_id, - table_name=table_2_info.name, - ) - table_data_json = localnode1_celery_app.get_result( - async_result=async_result, - logger=StdOutputLogger(), - timeout=TASKS_TIMEOUT, + table_name=table_name, ) - table_data = TableData.parse_raw(table_data_json) - expected_columns = [ - ColumnDataInt(name="col1", data=[1, 2, 3]), - ColumnDataFloat(name="col2", data=[0.1, None, 0.3]), - ColumnDataStr(name="col3", data=["test1", "None", None]), - ] - assert table_data.name == table_2_info.name - assert table_data.columns == expected_columns - assert table_schema == table_2_info.schema_ + + try: + localnode1_celery_app.get_result( + async_result=async_result, + logger=StdOutputLogger(), + timeout=TASKS_TIMEOUT, + ) + except OperationalError: + pytest.fail( + "The table data should be fetched without error since the table is published." + ) diff --git a/tests/standalone_tests/test_views_and_filters.py b/tests/standalone_tests/test_views_and_filters.py index 0c7577c4f..68e82d140 100644 --- a/tests/standalone_tests/test_views_and_filters.py +++ b/tests/standalone_tests/test_views_and_filters.py @@ -9,16 +9,23 @@ from exareme2.exceptions import DatasetUnavailable from exareme2.exceptions import InsufficientDataError from exareme2.node_tasks_DTOs import ColumnInfo -from exareme2.node_tasks_DTOs import TableData from exareme2.node_tasks_DTOs import TableInfo from exareme2.node_tasks_DTOs import TableSchema +from exareme2.node_tasks_DTOs import TableType from tests.standalone_tests.conftest import ALGORITHMS_URL -from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT +from tests.standalone_tests.conftest import create_table_in_db +from tests.standalone_tests.conftest import get_table_data_from_db from tests.standalone_tests.conftest import insert_data_to_db from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature from tests.standalone_tests.std_output_logger import StdOutputLogger from tests.standalone_tests.test_smpc_node_tasks import TASKS_TIMEOUT +create_view_task_signature = get_celery_task_signature("create_view") +create_data_model_views_task_signature = get_celery_task_signature( + "create_data_model_views" +) +get_views_task_signature = get_celery_task_signature("get_views") + @pytest.fixture def request_id(): @@ -91,6 +98,7 @@ def test_view_without_filters( localnode1_celery_app, localnode1_db_cursor, ): + table_name = f"test_view_without_filters_{context_id}" table_schema = TableSchema( columns=[ ColumnInfo(name="col1", dtype=DType.INT), @@ -98,37 +106,20 @@ def test_view_without_filters( ColumnInfo(name="col3", dtype=DType.STR), ] ) + table_values = [[1, 0.1, "test1"], [2, 0.2, None], [3, 0.3, "test3"]] - task_signature = get_celery_task_signature("create_table") - async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, - logger=StdOutputLogger(), - request_id=request_id, - context_id=context_id, - command_id=uuid.uuid4().hex, - schema_json=table_schema.json(), - ) - table_info = TableInfo.parse_raw( - localnode1_celery_app.get_result( - async_result=async_result, - logger=StdOutputLogger(), - timeout=TASKS_TIMEOUT, - ) - ) - - values = [[1, 0.1, "test1"], [2, 0.2, None], [3, 0.3, "test3"]] - insert_data_to_db(table_info.name, values, localnode1_db_cursor) + create_table_in_db(localnode1_db_cursor, table_name, table_schema) + insert_data_to_db(table_name, table_values, localnode1_db_cursor) - columns = ["col1", "col3"] - task_signature = get_celery_task_signature("create_view") + view_columns = ["col1", "col3"] async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, + task_signature=create_view_task_signature, logger=StdOutputLogger(), request_id=request_id, context_id=context_id, command_id=uuid.uuid4().hex, - table_name=table_info.name, - columns=columns, + table_name=table_name, + columns=view_columns, filters=None, ) view_info = TableInfo.parse_raw( @@ -138,42 +129,11 @@ def test_view_without_filters( timeout=TASKS_TIMEOUT, ) ) + assert view_info.type_ == TableType.VIEW - task_signature = get_celery_task_signature("get_views") - async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, - logger=StdOutputLogger(), - request_id=request_id, - context_id=context_id, - ) - views = localnode1_celery_app.get_result( - async_result=async_result, - logger=StdOutputLogger(), - timeout=TASKS_TIMEOUT, - ) - assert view_info.name in views - - view_intended_schema = TableSchema( - columns=[ - ColumnInfo(name="col1", dtype=DType.INT), - ColumnInfo(name="col3", dtype=DType.STR), - ] - ) - assert view_intended_schema == view_info.schema_ - - task_signature = get_celery_task_signature("get_table_data") - async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=view_info.name, - ) - view_data_json = localnode1_celery_app.get_result( - async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT - ) - view_data = TableData.parse_raw(view_data_json) - assert len(view_data.columns) == len(view_intended_schema.columns) - assert view_data.name == view_info.name + view_data = get_table_data_from_db(localnode1_db_cursor, view_info.name) + assert len(view_data[0]) == len(view_columns) # Assert column count + assert len(view_data) == len(table_values) # Assert row count @pytest.mark.slow @@ -186,6 +146,7 @@ def test_view_with_filters( use_localnode1_database, localnode1_db_cursor, ): + table_name = f"test_view_with_filters_{context_id}" table_schema = TableSchema( columns=[ ColumnInfo(name="col1", dtype=DType.INT), @@ -193,26 +154,12 @@ def test_view_with_filters( ColumnInfo(name="col3", dtype=DType.STR), ] ) + table_values = [[1, 0.1, "test1"], [2, 0.2, None], [3, 0.3, "test3"]] - task_signature = get_celery_task_signature("create_table") - async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, - logger=StdOutputLogger(), - request_id=request_id, - context_id=context_id, - command_id=uuid.uuid4().hex, - schema_json=table_schema.json(), - ) - table_info = TableInfo.parse_raw( - localnode1_celery_app.get_result( - async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT - ) - ) - - values = [[1, 0.1, "test1"], [2, 0.2, None], [3, 0.3, "test3"]] - insert_data_to_db(table_info.name, values, localnode1_db_cursor) + create_table_in_db(localnode1_db_cursor, table_name, table_schema) + insert_data_to_db(table_name, table_values, localnode1_db_cursor) - columns = ["col1", "col3"] + view_columns = ["col1", "col3"] filters = { "condition": "AND", "rules": [ @@ -232,15 +179,14 @@ def test_view_with_filters( ], "valid": True, } - task_signature = get_celery_task_signature("create_view") async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, + task_signature=create_view_task_signature, logger=StdOutputLogger(), request_id=request_id, context_id=context_id, command_id=uuid.uuid4().hex, - table_name=table_info.name, - columns=columns, + table_name=table_name, + columns=view_columns, filters=filters, ) view_info = TableInfo.parse_raw( @@ -249,119 +195,19 @@ def test_view_with_filters( ) ) - task_signature = get_celery_task_signature("get_views") - async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, - logger=StdOutputLogger(), - request_id=request_id, - context_id=context_id, - ) - views = localnode1_celery_app.get_result( - async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT - ) - assert view_info.name in views - view_intended_schema = TableSchema( - columns=[ - ColumnInfo(name="col1", dtype=DType.INT), - ColumnInfo(name="col3", dtype=DType.STR), - ] - ) - assert view_intended_schema == view_info.schema_ - - task_signature = get_celery_task_signature("get_table_data") - async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=view_info.name, - ) - view_data_json = localnode1_celery_app.get_result( - async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT - ) - view_data = TableData.parse_raw(view_data_json) - assert len(view_data.columns) == 2 - assert len(view_data.columns) == len(view_intended_schema.columns) - assert view_data.name == view_info.name - - -@pytest.mark.slow -def test_data_model_view_without_filters( - request_id, - context_id, - load_data_localnode1, - localnode1_node_service, - localnode1_celery_app, - use_localnode1_database, -): - columns = [ - "dataset", - "age_value", - "gcs_motor_response_scale", - "pupil_reactivity_right_eye_result", - ] - data_model = "tbi:0.1" - task_signature = get_celery_task_signature("create_data_model_views") - async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, - logger=StdOutputLogger(), - request_id=request_id, - context_id=context_id, - command_id=uuid.uuid4().hex, - data_model=data_model, - datasets=[], - columns_per_view=[columns], - ) - view_info, *_ = [ - TableInfo.parse_raw(table) - for table in localnode1_celery_app.get_result( - async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT - ) - ] - task_signature = get_celery_task_signature("get_views") - async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, - logger=StdOutputLogger(), - request_id=request_id, - context_id=context_id, - ) - views = localnode1_celery_app.get_result( - async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT - ) - assert view_info.name in views - - schema = TableSchema( - columns=[ - ColumnInfo(name="row_id", dtype=DType.INT), - ColumnInfo(name="dataset", dtype=DType.STR), - ColumnInfo(name="age_value", dtype=DType.INT), - ColumnInfo(name="gcs_motor_response_scale", dtype=DType.STR), - ColumnInfo(name="pupil_reactivity_right_eye_result", dtype=DType.STR), - ] - ) - assert schema == view_info.schema_ - - task_signature = get_celery_task_signature("get_table_data") - async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=view_info.name, - ) - view_data_json = localnode1_celery_app.get_result( - async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT - ) - view_data = TableData.parse_raw(view_data_json) - assert len(view_data.columns) == len(schema.columns) - assert view_data.name == view_info.name + view_data = get_table_data_from_db(localnode1_db_cursor, view_info.name) + assert len(view_data[0]) == len(view_columns) # Assert column count + assert len(view_data) == 1 # All but one row have been filtered out @pytest.mark.slow -def test_data_model_view_with_filters( +def test_data_model_view( request_id, context_id, load_data_localnode1, localnode1_node_service, localnode1_celery_app, + localnode1_db_cursor, use_localnode1_database, ): columns = [ @@ -371,28 +217,8 @@ def test_data_model_view_with_filters( "pupil_reactivity_right_eye_result", ] data_model = "tbi:0.1" - filters = { - "condition": "AND", - "rules": [ - { - "condition": "OR", - "rules": [ - { - "id": "age_value", - "field": "age_value", - "type": "int", - "input": "number", - "operator": "greater", - "value": 30, - } - ], - } - ], - "valid": True, - } - task_signature = get_celery_task_signature("create_data_model_views") async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, + task_signature=create_data_model_views_task_signature, logger=StdOutputLogger(), request_id=request_id, context_id=context_id, @@ -400,8 +226,6 @@ def test_data_model_view_with_filters( data_model=data_model, datasets=[], columns_per_view=[columns], - filters=filters, - dropna=False, ) view_info, *_ = [ TableInfo.parse_raw(table) @@ -409,10 +233,8 @@ def test_data_model_view_with_filters( async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT ) ] - - task_signature = get_celery_task_signature("get_views") async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, + task_signature=get_views_task_signature, logger=StdOutputLogger(), request_id=request_id, context_id=context_id, @@ -433,19 +255,8 @@ def test_data_model_view_with_filters( ) assert schema == view_info.schema_ - task_signature = get_celery_task_signature("get_table_data") - async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=view_info.name, - ) - view_data_json = localnode1_celery_app.get_result( - async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT - ) - view_data = TableData.parse_raw(view_data_json) - assert len(view_data.columns) == len(schema.columns) - assert view_data.name == view_info.name + view_data = get_table_data_from_db(localnode1_db_cursor, view_info.name) + assert len(view_data[0]) == len(schema.columns) @pytest.mark.slow @@ -455,15 +266,15 @@ def test_data_model_view_dataset_constraint( load_data_localnode1, localnode1_node_service, localnode1_celery_app, + localnode1_db_cursor, use_localnode1_database, ): columns = [ "dataset", ] data_model = "tbi:0.1" - task_signature = get_celery_task_signature("create_data_model_views") async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, + task_signature=create_data_model_views_task_signature, logger=StdOutputLogger(), request_id=request_id, context_id=context_id, @@ -480,19 +291,9 @@ def test_data_model_view_dataset_constraint( ) ] - task_signature = get_celery_task_signature("get_table_data") - async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=view_info.name, - ) - view_data_json = localnode1_celery_app.get_result( - async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT - ) - - _, dataset_column = TableData.parse_raw(view_data_json).columns - assert set(dataset_column.data) == {"dummy_tbi1"} + view_data = get_table_data_from_db(localnode1_db_cursor, view_info.name) + for _, dataset in view_data: + assert dataset == "dummy_tbi1" @pytest.mark.slow @@ -502,6 +303,7 @@ def test_data_model_view_null_constraints( load_data_localnode1, localnode1_node_service, localnode1_celery_app, + localnode1_db_cursor, use_localnode1_database, ): columns = [ @@ -509,9 +311,8 @@ def test_data_model_view_null_constraints( ] data_model = "tbi:0.1" datasets = ["dummy_tbi1"] - task_signature = get_celery_task_signature("create_data_model_views") async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, + task_signature=create_data_model_views_task_signature, logger=StdOutputLogger(), request_id=request_id, context_id=context_id, @@ -528,22 +329,14 @@ def test_data_model_view_null_constraints( ) ] - task_signature = get_celery_task_signature("get_table_data") - async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=view_info_without_nulls.name, - ) - view_data_json = localnode1_celery_app.get_result( - async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT + view_data = get_table_data_from_db( + localnode1_db_cursor, view_info_without_nulls.name ) - _, gose_score_column = TableData.parse_raw(view_data_json).columns - assert None not in gose_score_column.data + for row in view_data: + assert None not in row - task_signature = get_celery_task_signature("create_data_model_views") async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, + task_signature=create_data_model_views_task_signature, logger=StdOutputLogger(), request_id=request_id, context_id=context_id, @@ -561,23 +354,18 @@ def test_data_model_view_null_constraints( ) ] - task_signature = get_celery_task_signature("get_table_data") - async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=view_info_with_nulls.name, - ) - view_data_json = localnode1_celery_app.get_result( - async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT - ) - _, gose_score_column = TableData.parse_raw(view_data_json).columns + view_data = get_table_data_from_db(localnode1_db_cursor, view_info_with_nulls.name) + none_found = False + for _, gose_score_column in view_data: + if gose_score_column is None: + none_found = True + break - assert None in gose_score_column.data + assert none_found is True @pytest.mark.slow -def test_insuficient_data_error_raised_when_data_model_view_generated( +def test_insufficient_data_error_raised_when_data_model_view_generated_with_zero_rows( request_id, context_id, load_data_localnode1, @@ -585,13 +373,10 @@ def test_insuficient_data_error_raised_when_data_model_view_generated( localnode1_celery_app, use_localnode1_database, zero_rows_data_model_view_generating_params, - five_rows_data_model_view_generating_params, ): - # check InsufficientDataError raised when empty data model view generated with pytest.raises(InsufficientDataError): - task_signature = get_celery_task_signature("create_data_model_views") async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, + task_signature=create_data_model_views_task_signature, logger=StdOutputLogger(), request_id=request_id, context_id=context_id, @@ -605,12 +390,22 @@ def test_insuficient_data_error_raised_when_data_model_view_generated( async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT ) + +@pytest.mark.slow +def test_insufficient_data_error_raised_when_data_model_view_generated_with_fewer_rows_than_limit( + request_id, + context_id, + load_data_localnode1, + localnode1_node_service, + localnode1_celery_app, + use_localnode1_database, + five_rows_data_model_view_generating_params, +): # check InsufficientDataError raised when data model view with less than # minimum_row_count (defined in testing_env_configs/test_localnode1.toml) with pytest.raises(InsufficientDataError): - task_signature = get_celery_task_signature("create_data_model_views") async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, + task_signature=create_data_model_views_task_signature, logger=StdOutputLogger(), request_id=request_id, context_id=context_id, @@ -639,9 +434,8 @@ def test_data_model_view_with_data_model_unavailable_exception( ] data_model = "non_existing" with pytest.raises(DataModelUnavailable) as exc: - task_signature = get_celery_task_signature("create_data_model_views") async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, + task_signature=create_data_model_views_task_signature, logger=StdOutputLogger(), request_id=request_id, context_id=context_id, @@ -674,9 +468,8 @@ def test_data_model_view_with_dataset_unavailable_exception( ] data_model = "tbi:0.1" with pytest.raises(DatasetUnavailable) as exc: - task_signature = get_celery_task_signature("create_data_model_views") async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, + task_signature=create_data_model_views_task_signature, logger=StdOutputLogger(), request_id=request_id, context_id=context_id, @@ -715,9 +508,8 @@ def test_multiple_data_model_views( ], ] data_model = "tbi:0.1" - task_signature = get_celery_task_signature("create_data_model_views") async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, + task_signature=create_data_model_views_task_signature, logger=StdOutputLogger(), request_id=request_id, context_id=context_id, @@ -733,9 +525,8 @@ def test_multiple_data_model_views( ) ] - task_signature = get_celery_task_signature("get_views") async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, + task_signature=get_views_task_signature, logger=StdOutputLogger(), request_id=request_id, context_id=context_id, @@ -778,9 +569,10 @@ def test_multiple_data_model_views_null_constraints( load_data_localnode1, localnode1_node_service, localnode1_celery_app, + localnode1_db_cursor, use_localnode1_database, ): - # datasets:"edsd2" of the data model dementia:0.1 has 53 rows. Nevertheless + # datasets:"edsd2" of the data model dementia:0.1 has 53 rows. Nevertheless, # column 'neurodegenerativescategories' contains 20 NULL values, whereas 'opticchiasm', # for the same respective rows contains numerical values # The test checks that calling task "create_data_model_view" with "dropna" flag set to @@ -802,9 +594,8 @@ def test_multiple_data_model_views_null_constraints( # Create data model passing only column 'neurodegenerativescategories' with dropna=False # This data model view will contain null values - task_signature = get_celery_task_signature("create_data_model_views") async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, + task_signature=create_data_model_views_task_signature, logger=StdOutputLogger(), request_id=request_id, context_id=context_id, @@ -819,37 +610,25 @@ def test_multiple_data_model_views_null_constraints( result = localnode1_celery_app.get_result( async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT ) - view_neurodegenerativescategories_nulls_included = TableInfo.parse_raw(result[0]) # Get count of how many null values - task_signature = get_celery_task_signature("get_table_data") - async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=view_neurodegenerativescategories_nulls_included.name, - ) - result = localnode1_celery_app.get_result( - async_result=async_result, - logger=StdOutputLogger(), - timeout=TASKS_TIMEOUT, - ) - _, view_neurodegenerativescategories_nulls_included_column = TableData.parse_raw( - result - ).columns - view_neurodegenerativescategories_nulls_included_num_of_nulls = ( - view_neurodegenerativescategories_nulls_included_column.data.count(None) + view_neurodegenerativescategories_nulls_included_data = get_table_data_from_db( + localnode1_db_cursor, view_neurodegenerativescategories_nulls_included.name ) + view_neurodegenerativescategories_nulls_included_num_of_nulls = [ + neuro_column + for _, neuro_column in view_neurodegenerativescategories_nulls_included_data + ].count(None) + view_neurodegenerativescategories_nulls_included_num_of_rows = len( - view_neurodegenerativescategories_nulls_included_column.data + view_neurodegenerativescategories_nulls_included_data ) # Create data model with of both columns and dropna=True per view will drop the # rows with null values - task_signature = get_celery_task_signature("create_data_model_views") async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, + task_signature=create_data_model_views_task_signature, logger=StdOutputLogger(), request_id=request_id, context_id=context_id, @@ -869,35 +648,19 @@ def test_multiple_data_model_views_null_constraints( ] # Get data of the created data model views - task_signature = get_celery_task_signature("get_table_data") # neurodegenerativescategories data model view - async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=view_neurodegenerativescategories.name, - ) - result = localnode1_celery_app.get_result( - async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT + view_neurodegenerativescategories_data = get_table_data_from_db( + localnode1_db_cursor, view_neurodegenerativescategories.name ) - - _, neurodegenerativescategories_column = TableData.parse_raw(result).columns view_neurodegenerativescategories_num_of_rows = len( - neurodegenerativescategories_column.data + view_neurodegenerativescategories_data ) # opticchiasm data model view - async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=view_opticchiasm.name, - ) - result = localnode1_celery_app.get_result( - async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT + view_opticchiasm_data = get_table_data_from_db( + localnode1_db_cursor, view_opticchiasm.name ) - _, opticchiasm_column = TableData.parse_raw(result).columns - view_opticchiasm_num_of_rows = len(opticchiasm_column.data) + view_opticchiasm_num_of_rows = len(view_opticchiasm_data) # check that the 2 data model views generated are of the same row count assert view_neurodegenerativescategories_num_of_rows == view_opticchiasm_num_of_rows diff --git a/tests/standalone_tests/testing_env_configs/test_external_smpc_globalnode.toml b/tests/standalone_tests/testing_env_configs/test_external_smpc_globalnode.toml index bd3b302aa..1ef0b33ec 100644 --- a/tests/standalone_tests/testing_env_configs/test_external_smpc_globalnode.toml +++ b/tests/standalone_tests/testing_env_configs/test_external_smpc_globalnode.toml @@ -6,6 +6,7 @@ monetdb_nclients = 64 [privacy] minimum_row_count = 10 +protect_local_data = false [celery] worker_concurrency = 16 diff --git a/tests/standalone_tests/testing_env_configs/test_external_smpc_localnode1.toml b/tests/standalone_tests/testing_env_configs/test_external_smpc_localnode1.toml index c6c01910e..aa9e29e7f 100644 --- a/tests/standalone_tests/testing_env_configs/test_external_smpc_localnode1.toml +++ b/tests/standalone_tests/testing_env_configs/test_external_smpc_localnode1.toml @@ -6,6 +6,7 @@ monetdb_nclients = 64 [privacy] minimum_row_count = 10 +protect_local_data = true [celery] worker_concurrency = 16 diff --git a/tests/standalone_tests/testing_env_configs/test_external_smpc_localnode2.toml b/tests/standalone_tests/testing_env_configs/test_external_smpc_localnode2.toml index beea67e06..13a5ee07b 100644 --- a/tests/standalone_tests/testing_env_configs/test_external_smpc_localnode2.toml +++ b/tests/standalone_tests/testing_env_configs/test_external_smpc_localnode2.toml @@ -6,6 +6,7 @@ monetdb_nclients = 64 [privacy] minimum_row_count = 10 +protect_local_data = true [celery] worker_concurrency = 16 diff --git a/tests/standalone_tests/testing_env_configs/test_globalnode.toml b/tests/standalone_tests/testing_env_configs/test_globalnode.toml index 534a3358c..23f08d199 100644 --- a/tests/standalone_tests/testing_env_configs/test_globalnode.toml +++ b/tests/standalone_tests/testing_env_configs/test_globalnode.toml @@ -6,6 +6,7 @@ monetdb_nclients = 128 [privacy] minimum_row_count = 10 +protect_local_data = false [celery] worker_concurrency = 16 diff --git a/tests/standalone_tests/testing_env_configs/test_localnode1.toml b/tests/standalone_tests/testing_env_configs/test_localnode1.toml index f033108b9..61d1ad312 100644 --- a/tests/standalone_tests/testing_env_configs/test_localnode1.toml +++ b/tests/standalone_tests/testing_env_configs/test_localnode1.toml @@ -6,6 +6,7 @@ monetdb_nclients = 64 [privacy] minimum_row_count = 10 +protect_local_data = true [celery] worker_concurrency = 16 diff --git a/tests/standalone_tests/testing_env_configs/test_localnode2.toml b/tests/standalone_tests/testing_env_configs/test_localnode2.toml index b47ecf3bf..512df85c7 100644 --- a/tests/standalone_tests/testing_env_configs/test_localnode2.toml +++ b/tests/standalone_tests/testing_env_configs/test_localnode2.toml @@ -6,6 +6,7 @@ monetdb_nclients = 64 [privacy] minimum_row_count = 10 +protect_local_data = true [celery] worker_concurrency = 16 diff --git a/tests/standalone_tests/testing_env_configs/test_localnodetmp.toml b/tests/standalone_tests/testing_env_configs/test_localnodetmp.toml index e05df6a97..598158ef8 100644 --- a/tests/standalone_tests/testing_env_configs/test_localnodetmp.toml +++ b/tests/standalone_tests/testing_env_configs/test_localnodetmp.toml @@ -6,6 +6,7 @@ monetdb_nclients = 64 [privacy] minimum_row_count = 10 +protect_local_data = true [celery] worker_concurrency = 16 diff --git a/tests/standalone_tests/testing_env_configs/test_smpc_globalnode.toml b/tests/standalone_tests/testing_env_configs/test_smpc_globalnode.toml index 67fecc260..8b205fe77 100644 --- a/tests/standalone_tests/testing_env_configs/test_smpc_globalnode.toml +++ b/tests/standalone_tests/testing_env_configs/test_smpc_globalnode.toml @@ -6,6 +6,7 @@ monetdb_nclients = 64 [privacy] minimum_row_count = 10 +protect_local_data = false [celery] worker_concurrency = 16 diff --git a/tests/standalone_tests/testing_env_configs/test_smpc_localnode1.toml b/tests/standalone_tests/testing_env_configs/test_smpc_localnode1.toml index 0e3ce7a29..01804478d 100644 --- a/tests/standalone_tests/testing_env_configs/test_smpc_localnode1.toml +++ b/tests/standalone_tests/testing_env_configs/test_smpc_localnode1.toml @@ -6,6 +6,7 @@ monetdb_nclients = 64 [privacy] minimum_row_count = 10 +protect_local_data = true [celery] worker_concurrency = 16 diff --git a/tests/standalone_tests/testing_env_configs/test_smpc_localnode2.toml b/tests/standalone_tests/testing_env_configs/test_smpc_localnode2.toml index d5ad66d09..ee9b66202 100644 --- a/tests/standalone_tests/testing_env_configs/test_smpc_localnode2.toml +++ b/tests/standalone_tests/testing_env_configs/test_smpc_localnode2.toml @@ -6,6 +6,7 @@ monetdb_nclients = 64 [privacy] minimum_row_count = 10 +protect_local_data = true [celery] worker_concurrency = 16