diff --git a/exareme2/node/tasks/tables.py b/exareme2/node/tasks/tables.py index 86fac4285..9c03dfef1 100644 --- a/exareme2/node/tasks/tables.py +++ b/exareme2/node/tasks/tables.py @@ -67,22 +67,3 @@ def create_table( schema_=schema, type_=TableType.NORMAL, ).json() - - -# TODO: https://team-1617704806227.atlassian.net/browse/MIP-762 -@shared_task -@initialise_logger -def insert_data_to_table( - request_id: str, table_name: str, values: List[List[Union[str, int, float, bool]]] -): - """ - Parameters - ---------- - request_id : str - The identifier for the logging - table_name : str - The name of the table - values : List[List[Union[str, int, float, bool]] - The data of the table to be inserted - """ - tables.insert_data_to_table(table_name, values) diff --git a/tests/standalone_tests/conftest.py b/tests/standalone_tests/conftest.py index a3744a970..bc8614273 100644 --- a/tests/standalone_tests/conftest.py +++ b/tests/standalone_tests/conftest.py @@ -5,8 +5,11 @@ import re import subprocess import time +from itertools import chain from os import path from pathlib import Path +from typing import List +from typing import Union import docker import psutil @@ -556,6 +559,21 @@ def localnodetmp_db_cursor(): return _create_db_cursor(MONETDB_LOCALNODETMP_PORT) +def insert_data_to_db( + table_name: str, table_values: List[List[Union[str, int, float]]], db_cursor +): + row_length = len(table_values[0]) + if all(len(row) != row_length for row in table_values): + raise Exception("Not all rows have the same number of values") + + values = ", ".join( + "(" + ", ".join("%s" for _ in range(row_length)) + ")" for _ in table_values + ) + sql_clause = f"INSERT INTO {table_name} VALUES {values}" + + db_cursor.execute(sql_clause, list(chain(*table_values))) + + def _clean_db(cursor): class TableType(enum.Enum): NORMAL = 0 diff --git a/tests/standalone_tests/test_local_global_nodes_flow.py b/tests/standalone_tests/test_local_global_nodes_flow.py index e1eb2d9a0..f5dd14412 100644 --- a/tests/standalone_tests/test_local_global_nodes_flow.py +++ b/tests/standalone_tests/test_local_global_nodes_flow.py @@ -11,11 +11,11 @@ from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT from tests.standalone_tests.conftest import MONETDB_LOCALNODE2_PORT from tests.standalone_tests.conftest import TASKS_TIMEOUT +from tests.standalone_tests.conftest import insert_data_to_db from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature from tests.standalone_tests.std_output_logger import StdOutputLogger create_table_task_signature = get_celery_task_signature("create_table") -insert_task_signature = get_celery_task_signature("insert_data_to_table") create_remote_task_signature = get_celery_task_signature("create_remote_table") get_remote_tables_task_signature = get_celery_task_signature("get_remote_tables") create_merge_task_signature = get_celery_task_signature("create_merge_table") @@ -42,8 +42,10 @@ def test_create_merge_table_with_remote_tables( context_id, localnode1_node_service, localnode1_celery_app, + localnode1_db_cursor, localnode2_node_service, localnode2_celery_app, + localnode2_db_cursor, globalnode_node_service, globalnode_celery_app, ): @@ -92,32 +94,8 @@ def test_create_merge_table_with_remote_tables( # Insert data into local tables values = [[1, 0.1, "test1"], [2, 0.2, "test2"], [3, 0.3, "test3"]] - async_result = localnode1_celery_app.queue_task( - task_signature=insert_task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=local_node_1_table_info.name, - values=values, - ) - localnode1_celery_app.get_result( - async_result=async_result, - logger=StdOutputLogger(), - timeout=TASKS_TIMEOUT, - ) - - async_result = localnode2_celery_app.queue_task( - task_signature=insert_task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=local_node_2_table_info.name, - values=values, - ) - - localnode2_celery_app.get_result( - async_result=async_result, - logger=StdOutputLogger(), - timeout=TASKS_TIMEOUT, - ) + insert_data_to_db(local_node_1_table_info.name, values, localnode1_db_cursor) + insert_data_to_db(local_node_2_table_info.name, values, localnode2_db_cursor) # Create remote tables local_node_1_monetdb_sock_address = f"{str(COMMON_IP)}:{MONETDB_LOCALNODE1_PORT}" diff --git a/tests/standalone_tests/test_merge_tables.py b/tests/standalone_tests/test_merge_tables.py index cb4ddd6bb..26d93677d 100644 --- a/tests/standalone_tests/test_merge_tables.py +++ b/tests/standalone_tests/test_merge_tables.py @@ -9,13 +9,14 @@ from exareme2.node_tasks_DTOs import TableInfo from exareme2.node_tasks_DTOs import TableSchema from exareme2.node_tasks_DTOs import TableType +from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT from tests.standalone_tests.conftest import TASKS_TIMEOUT +from tests.standalone_tests.conftest import insert_data_to_db from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature from tests.standalone_tests.std_output_logger import StdOutputLogger create_table_task_signature = get_celery_task_signature("create_table") create_merge_table_task_signature = get_celery_task_signature("create_merge_table") -insert_data_to_table_task_signature = get_celery_task_signature("insert_data_to_table") get_merge_tables_task_signature = get_celery_task_signature("get_merge_tables") @@ -57,7 +58,7 @@ def create_two_column_table(request_id, context_id, table_id: int, celery_app): def create_three_column_table_with_data( - request_id, context_id, table_id: int, celery_app + request_id, context_id, table_id: int, celery_app, db_cursor ): table_schema = TableSchema( columns=[ @@ -83,18 +84,7 @@ def create_three_column_table_with_data( ) values = [[1, 0.1, "test1"], [2, 0.2, "test2"], [3, 0.3, "test3"]] - async_result = celery_app.queue_task( - task_signature=insert_data_to_table_task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=table_info.name, - values=values, - ) - celery_app.get_result( - async_result=async_result, - logger=StdOutputLogger(), - timeout=TASKS_TIMEOUT, - ) + insert_data_to_db(table_info.name, values, db_cursor) return table_info @@ -105,10 +95,11 @@ def test_create_and_get_merge_table( context_id, localnode1_node_service, localnode1_celery_app, + localnode1_db_cursor, ): tables_to_be_merged = [ create_three_column_table_with_data( - request_id, context_id, count, localnode1_celery_app + request_id, context_id, count, localnode1_celery_app, localnode1_db_cursor ) for count in range(0, 5) ] @@ -148,15 +139,16 @@ def test_incompatible_schemas_merge( context_id, localnode1_node_service, localnode1_celery_app, + localnode1_db_cursor, ): incompatible_partition_tables = [ create_three_column_table_with_data( - request_id, context_id, 1, localnode1_celery_app + request_id, context_id, 1, localnode1_celery_app, localnode1_db_cursor ), create_two_column_table(request_id, context_id, 2, localnode1_celery_app), create_two_column_table(request_id, context_id, 3, localnode1_celery_app), create_three_column_table_with_data( - request_id, context_id, 4, localnode1_celery_app + request_id, context_id, 4, localnode1_celery_app, localnode1_db_cursor ), ] async_result = localnode1_celery_app.queue_task( @@ -184,13 +176,14 @@ def test_table_cannot_be_found( context_id, localnode1_node_service, localnode1_celery_app, + localnode1_db_cursor, ): not_found_tables = [ create_three_column_table_with_data( - request_id, context_id, 1, localnode1_celery_app + request_id, context_id, 1, localnode1_celery_app, localnode1_db_cursor ), create_three_column_table_with_data( - request_id, context_id, 2, localnode1_celery_app + request_id, context_id, 2, localnode1_celery_app, localnode1_db_cursor ), TableInfo( name="non_existing_table", diff --git a/tests/standalone_tests/test_node_info_tasks_priority.py b/tests/standalone_tests/test_node_info_tasks_priority.py index 079515f2d..d77f29d7f 100644 --- a/tests/standalone_tests/test_node_info_tasks_priority.py +++ b/tests/standalone_tests/test_node_info_tasks_priority.py @@ -47,13 +47,14 @@ def queue_one_second_udf( @pytest.mark.very_slow def test_node_info_tasks_have_higher_priority_over_other_tasks( globalnode_node_service, + globalnode_db_cursor, reset_celery_app_factory, get_controller_testing_logger, ): cel_app_wrapper = CeleryAppFactory().get_celery_app(RABBITMQ_GLOBALNODE_ADDR) input_table_name, _ = create_table_with_one_column_and_ten_rows( - cel_app_wrapper, request_id + cel_app_wrapper, globalnode_db_cursor, request_id ) # Queue an X amount of udfs to fill the rabbitmq. diff --git a/tests/standalone_tests/test_smpc_node_tasks.py b/tests/standalone_tests/test_smpc_node_tasks.py index 7e1374358..94088c657 100644 --- a/tests/standalone_tests/test_smpc_node_tasks.py +++ b/tests/standalone_tests/test_smpc_node_tasks.py @@ -31,9 +31,13 @@ from tests.algorithms.orphan_udfs import smpc_local_step from tests.standalone_tests.conftest import LOCALNODE1_SMPC_CONFIG_FILE from tests.standalone_tests.conftest import LOCALNODE2_SMPC_CONFIG_FILE +from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT +from tests.standalone_tests.conftest import MONETDB_SMPC_LOCALNODE1_PORT +from tests.standalone_tests.conftest import MONETDB_SMPC_LOCALNODE2_PORT from tests.standalone_tests.conftest import SMPC_COORDINATOR_ADDRESS from tests.standalone_tests.conftest import TASKS_TIMEOUT from tests.standalone_tests.conftest import get_node_config_by_id +from tests.standalone_tests.conftest import insert_data_to_db from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature request_id = "testsmpcudfs" + str(uuid.uuid4().hex)[:10] + "request" @@ -44,10 +48,9 @@ def create_table_with_one_column_and_ten_rows( - celery_app: Celery, + celery_app: Celery, db_cursor ) -> Tuple[TableInfo, int]: create_table_task = get_celery_task_signature("create_table") - insert_data_to_table_task = get_celery_task_signature("insert_data_to_table") table_schema = TableSchema( columns=[ @@ -67,11 +70,7 @@ def create_table_with_one_column_and_ten_rows( values = [[1], [2], [3], [4], [5], [6], [7], [8], [9], [10]] - celery_app.signature(insert_data_to_table_task).delay( - request_id=request_id, - table_name=table_info.name, - values=values, - ).get(timeout=TASKS_TIMEOUT) + insert_data_to_db(table_info.name, values, db_cursor) return table_info, 55 @@ -101,10 +100,8 @@ def create_secure_transfer_table(celery_app: Celery) -> TableInfo: # TODO More dynamic so it can receive any secure_transfer values def create_table_with_secure_transfer_results_with_smpc_off( - celery_app: Celery, + celery_app: Celery, db_cursor ) -> Tuple[TableInfo, int]: - task_signature = get_celery_task_signature("insert_data_to_table") - table_info = create_secure_transfer_table(celery_app) secure_transfer_1_value = 100 @@ -121,20 +118,14 @@ def create_table_with_secure_transfer_results_with_smpc_off( [json.dumps(secure_transfer_2)], ] - celery_app.signature(task_signature).delay( - request_id=request_id, - table_name=table_info.name, - values=values, - ).get(timeout=TASKS_TIMEOUT) + insert_data_to_db(table_info.name, values, db_cursor) return table_info, secure_transfer_1_value + secure_transfer_2_value def create_table_with_multiple_secure_transfer_templates( - celery_app: Celery, similar: bool + celery_app: Celery, db_cursor, similar: bool ) -> TableInfo: - task_signature = get_celery_task_signature("insert_data_to_table") - table_info = create_secure_transfer_table(celery_app) secure_transfer_template = { @@ -155,18 +146,14 @@ def create_table_with_multiple_secure_transfer_templates( [json.dumps(different_secure_transfer_template)], ] - celery_app.signature(task_signature).delay( - request_id=request_id, - table_name=table_info.name, - values=values, - ).get(timeout=TASKS_TIMEOUT) + insert_data_to_db(table_info.name, values, db_cursor) return table_info -def create_table_with_smpc_sum_op_values(celery_app: Celery) -> Tuple[TableInfo, str]: - task_signature = get_celery_task_signature("insert_data_to_table") - +def create_table_with_smpc_sum_op_values( + celery_app: Celery, db_cursor +) -> Tuple[TableInfo, str]: table_info = create_secure_transfer_table(celery_app) sum_op_values = [0, 1, 2, 3, 4, 5] @@ -174,11 +161,7 @@ def create_table_with_smpc_sum_op_values(celery_app: Celery) -> Tuple[TableInfo, [json.dumps(sum_op_values)], ] - celery_app.signature(task_signature).delay( - request_id=request_id, - table_name=table_info.name, - values=values, - ).get(timeout=TASKS_TIMEOUT) + insert_data_to_db(table_info.name, values, db_cursor) return table_info, json.dumps(sum_op_values) @@ -204,14 +187,17 @@ def validate_table_data_match_expected( @pytest.mark.slow def test_secure_transfer_output_with_smpc_off( - localnode1_node_service, use_localnode1_database, localnode1_celery_app + localnode1_node_service, + use_localnode1_database, + localnode1_celery_app, + localnode1_db_cursor, ): localnode1_celery_app = localnode1_celery_app._celery_app run_udf_task = get_celery_task_signature("run_udf") get_table_data_task = get_celery_task_signature("get_table_data") input_table_info, input_table_name_sum = create_table_with_one_column_and_ten_rows( - localnode1_celery_app + localnode1_celery_app, localnode1_db_cursor ) pos_args_str = NodeUDFPosArguments( @@ -249,7 +235,10 @@ def test_secure_transfer_output_with_smpc_off( @pytest.mark.slow def test_secure_transfer_input_with_smpc_off( - localnode1_node_service, use_localnode1_database, localnode1_celery_app + localnode1_node_service, + use_localnode1_database, + localnode1_celery_app, + localnode1_db_cursor, ): localnode1_celery_app = localnode1_celery_app._celery_app run_udf_task = get_celery_task_signature("run_udf") @@ -258,7 +247,9 @@ def test_secure_transfer_input_with_smpc_off( ( secure_transfer_results_tableinfo, secure_transfer_results_values_sum, - ) = create_table_with_secure_transfer_results_with_smpc_off(localnode1_celery_app) + ) = create_table_with_secure_transfer_results_with_smpc_off( + localnode1_celery_app, localnode1_db_cursor + ) pos_args_str = NodeUDFPosArguments( args=[NodeTableDTO(value=secure_transfer_results_tableinfo)] @@ -299,6 +290,7 @@ def test_validate_smpc_templates_match( smpc_localnode1_node_service, use_smpc_localnode1_database, smpc_localnode1_celery_app, + localnode1_smpc_db_cursor, ): smpc_localnode1_celery_app = smpc_localnode1_celery_app._celery_app validate_smpc_templates_match_task = get_celery_task_signature( @@ -306,7 +298,7 @@ def test_validate_smpc_templates_match( ) table_info = create_table_with_multiple_secure_transfer_templates( - smpc_localnode1_celery_app, True + smpc_localnode1_celery_app, localnode1_smpc_db_cursor, True ) try: @@ -325,6 +317,7 @@ def test_validate_smpc_templates_dont_match( smpc_localnode1_node_service, use_smpc_localnode1_database, smpc_localnode1_celery_app, + localnode1_smpc_db_cursor, ): smpc_localnode1_celery_app = smpc_localnode1_celery_app._celery_app validate_smpc_templates_match_task = get_celery_task_signature( @@ -332,7 +325,7 @@ def test_validate_smpc_templates_dont_match( ) table_info = create_table_with_multiple_secure_transfer_templates( - smpc_localnode1_celery_app, False + smpc_localnode1_celery_app, localnode1_smpc_db_cursor, False ) with pytest.raises(ValueError) as exc: @@ -350,6 +343,7 @@ def test_secure_transfer_run_udf_flow_with_smpc_on( smpc_localnode1_node_service, use_smpc_localnode1_database, smpc_localnode1_celery_app, + localnode1_smpc_db_cursor, ): smpc_localnode1_celery_app = smpc_localnode1_celery_app._celery_app run_udf_task = get_celery_task_signature("run_udf") @@ -357,7 +351,7 @@ def test_secure_transfer_run_udf_flow_with_smpc_on( # ----------------------- SECURE TRANSFER OUTPUT ---------------------- input_table_name, input_table_name_sum = create_table_with_one_column_and_ten_rows( - smpc_localnode1_celery_app + smpc_localnode1_celery_app, localnode1_smpc_db_cursor ) pos_args_str = NodeUDFPosArguments( @@ -463,11 +457,12 @@ def test_load_data_to_smpc_client( smpc_localnode1_node_service, use_smpc_localnode1_database, smpc_localnode1_celery_app, + localnode1_smpc_db_cursor, smpc_cluster, ): smpc_localnode1_celery_app = smpc_localnode1_celery_app._celery_app table_info, sum_op_values_str = create_table_with_smpc_sum_op_values( - smpc_localnode1_celery_app + smpc_localnode1_celery_app, localnode1_smpc_db_cursor ) load_data_to_smpc_client_task = get_celery_task_signature( "load_data_to_smpc_client" @@ -613,6 +608,8 @@ def test_orchestrate_SMPC_between_two_localnodes_and_the_globalnode( smpc_globalnode_celery_app, smpc_localnode1_celery_app, smpc_localnode2_celery_app, + localnode1_smpc_db_cursor, + localnode2_smpc_db_cursor, smpc_cluster, ): smpc_globalnode_celery_app = smpc_globalnode_celery_app._celery_app @@ -653,11 +650,15 @@ def test_orchestrate_SMPC_between_two_localnodes_and_the_globalnode( ( input_table_1_name, input_table_1_name_sum, - ) = create_table_with_one_column_and_ten_rows(smpc_localnode1_celery_app) + ) = create_table_with_one_column_and_ten_rows( + smpc_localnode1_celery_app, localnode1_smpc_db_cursor + ) ( input_table_2_name, input_table_2_name_sum, - ) = create_table_with_one_column_and_ten_rows(smpc_localnode2_celery_app) + ) = create_table_with_one_column_and_ten_rows( + smpc_localnode2_celery_app, localnode2_smpc_db_cursor + ) # ---------------- RUN LOCAL UDFS WITH SECURE TRANSFER OUTPUT ---------------------- pos_args_str_localnode1 = NodeUDFPosArguments( diff --git a/tests/standalone_tests/test_tables.py b/tests/standalone_tests/test_tables.py index 60722fe75..d6f92ecae 100644 --- a/tests/standalone_tests/test_tables.py +++ b/tests/standalone_tests/test_tables.py @@ -10,13 +10,14 @@ from exareme2.table_data_DTOs import ColumnDataFloat from exareme2.table_data_DTOs import ColumnDataInt from exareme2.table_data_DTOs import ColumnDataStr +from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT from tests.standalone_tests.conftest import TASKS_TIMEOUT +from tests.standalone_tests.conftest import insert_data_to_db from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature from tests.standalone_tests.std_output_logger import StdOutputLogger create_table_task_signature = get_celery_task_signature("create_table") get_tables_task_signature = get_celery_task_signature("get_tables") -insert_data_to_table_task_signature = get_celery_task_signature("insert_data_to_table") get_table_data_task_signature = get_celery_task_signature("get_table_data") @@ -38,6 +39,7 @@ def test_create_and_find_tables( context_id, localnode1_node_service, localnode1_celery_app, + localnode1_db_cursor, ): table_schema = TableSchema( columns=[ @@ -78,18 +80,7 @@ def test_create_and_find_tables( assert table_1_info.name in tables values = [[1, 0.1, "test1"], [2, 0.2, None], [3, 0.3, "test3"]] - async_result = localnode1_celery_app.queue_task( - task_signature=insert_data_to_table_task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=table_1_info.name, - values=values, - ) - localnode1_celery_app.get_result( - async_result=async_result, - logger=StdOutputLogger(), - timeout=TASKS_TIMEOUT, - ) + insert_data_to_db(table_1_info.name, values, localnode1_db_cursor) async_result = localnode1_celery_app.queue_task( task_signature=get_table_data_task_signature, @@ -142,19 +133,7 @@ def test_create_and_find_tables( assert table_2_info.name in tables values = [[1, 0.1, "test1"], [2, None, "None"], [3, 0.3, None]] - - async_result = localnode1_celery_app.queue_task( - task_signature=insert_data_to_table_task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=table_2_info.name, - values=values, - ) - localnode1_celery_app.get_result( - async_result=async_result, - logger=StdOutputLogger(), - timeout=TASKS_TIMEOUT, - ) + insert_data_to_db(table_2_info.name, values, localnode1_db_cursor) async_result = localnode1_celery_app.queue_task( task_signature=get_table_data_task_signature, diff --git a/tests/standalone_tests/test_udfs.py b/tests/standalone_tests/test_udfs.py index 02202e988..7b261127b 100644 --- a/tests/standalone_tests/test_udfs.py +++ b/tests/standalone_tests/test_udfs.py @@ -25,7 +25,9 @@ from tests.algorithms.orphan_udfs import get_column_rows from tests.algorithms.orphan_udfs import local_step from tests.algorithms.orphan_udfs import one_hundred_seconds_udf +from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT from tests.standalone_tests.conftest import TASKS_TIMEOUT +from tests.standalone_tests.conftest import insert_data_to_db from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature from tests.standalone_tests.std_output_logger import StdOutputLogger @@ -35,10 +37,9 @@ def create_table_with_one_column_and_ten_rows( - celery_app, request_id + celery_app, db_cursor, request_id ) -> Tuple[TableInfo, int]: create_table_task = get_celery_task_signature("create_table") - insert_data_to_table_task = get_celery_task_signature("insert_data_to_table") table_schema = TableSchema( columns=[ @@ -58,18 +59,8 @@ def create_table_with_one_column_and_ten_rows( async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT ) ) - values = [[1], [2], [3], [4], [5], [6], [7], [8], [9], [10]] - async_result = celery_app.queue_task( - task_signature=insert_data_to_table_task, - logger=StdOutputLogger(), - request_id=request_id, - table_name=table_info.name, - values=values, - ) - celery_app.get_result( - async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT - ) + insert_data_to_db(table_info.name, values, db_cursor) return table_info, 55 @@ -108,7 +99,7 @@ def test_run_udf_state_and_transfer_output( local_node_get_table_data = get_celery_task_signature("get_table_data") input_table_info, input_table_name_sum = create_table_with_one_column_and_ten_rows( - localnode1_celery_app, request_id + localnode1_celery_app, localnode1_db_cursor, request_id ) kw_args_str = NodeUDFKeyArguments( diff --git a/tests/standalone_tests/test_views_and_filters.py b/tests/standalone_tests/test_views_and_filters.py index cdb91c263..0c7577c4f 100644 --- a/tests/standalone_tests/test_views_and_filters.py +++ b/tests/standalone_tests/test_views_and_filters.py @@ -13,6 +13,8 @@ from exareme2.node_tasks_DTOs import TableInfo from exareme2.node_tasks_DTOs import TableSchema from tests.standalone_tests.conftest import ALGORITHMS_URL +from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT +from tests.standalone_tests.conftest import insert_data_to_db from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature from tests.standalone_tests.std_output_logger import StdOutputLogger from tests.standalone_tests.test_smpc_node_tasks import TASKS_TIMEOUT @@ -87,7 +89,7 @@ def test_view_without_filters( load_data_localnode1, localnode1_node_service, localnode1_celery_app, - use_localnode1_database, + localnode1_db_cursor, ): table_schema = TableSchema( columns=[ @@ -115,17 +117,7 @@ def test_view_without_filters( ) values = [[1, 0.1, "test1"], [2, 0.2, None], [3, 0.3, "test3"]] - task_signature = get_celery_task_signature("insert_data_to_table") - async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=table_info.name, - values=values, - ) - localnode1_celery_app.get_result( - async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT - ) + insert_data_to_db(table_info.name, values, localnode1_db_cursor) columns = ["col1", "col3"] task_signature = get_celery_task_signature("create_view") @@ -192,6 +184,7 @@ def test_view_with_filters( localnode1_node_service, localnode1_celery_app, use_localnode1_database, + localnode1_db_cursor, ): table_schema = TableSchema( columns=[ @@ -217,17 +210,7 @@ def test_view_with_filters( ) values = [[1, 0.1, "test1"], [2, 0.2, None], [3, 0.3, "test3"]] - task_signature = get_celery_task_signature("insert_data_to_table") - async_result = localnode1_celery_app.queue_task( - task_signature=task_signature, - logger=StdOutputLogger(), - request_id=request_id, - table_name=table_info.name, - values=values, - ) - localnode1_celery_app.get_result( - async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT - ) + insert_data_to_db(table_info.name, values, localnode1_db_cursor) columns = ["col1", "col3"] filters = {