Skip to content

Commit

Permalink
isort & black
Browse files Browse the repository at this point in the history
  • Loading branch information
simos committed Aug 29, 2023
1 parent f1d5e3b commit f385e89
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 68 deletions.
17 changes: 10 additions & 7 deletions tests/standalone_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,23 +558,25 @@ def localnodetmp_db_cursor():
return _create_db_cursor(MONETDB_LOCALNODETMP_PORT)




def insert_data_to_localnode(
table_name: str, table_values: List[List[Union[str, int, float]]],monetdb_localnode_port
table_name: str,
table_values: List[List[Union[str, int, float]]],
monetdb_localnode_port,
):
row_length = len(table_values[0])
if all(len(row) != row_length for row in table_values):
raise Exception("Not all rows have the same number of values")

# rows= ["(" + ",".join(map(str, sublist)) + ")" for sublist in table_values]
table_values = [[item if item is not None else 'None' for item in sublist] for sublist in table_values]
rows= ["(" + ",".join(map(repr, sublist)) + ")" for sublist in table_values]

table_values = [
[item if item is not None else "None" for item in sublist]
for sublist in table_values
]
rows = ["(" + ",".join(map(repr, sublist)) + ")" for sublist in table_values]

# rows = [str(tuple(sub)) for sub in table_values]
# rows = [item.replace('None', "'None'") for item in rows]


# In order to achieve insertion with parameters we need to create query to the following format:
# INSERT INTO <table_name> VALUES (%s, %s), (%s, %s);
# The following variable 'values' create that specific str according to row_length and the amount of the rows.
Expand All @@ -591,6 +593,7 @@ def insert_data_to_localnode(
# with cursor(commit=True) as cur:
# cursor.execute(sql_clause,list(chain(*table_values)))


def _clean_db(cursor):
class TableType(enum.Enum):
NORMAL = 0
Expand Down
14 changes: 7 additions & 7 deletions tests/standalone_tests/test_local_global_nodes_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT
from tests.standalone_tests.conftest import MONETDB_LOCALNODE2_PORT
from tests.standalone_tests.conftest import TASKS_TIMEOUT
from tests.standalone_tests.conftest import insert_data_to_localnode
from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature
from tests.standalone_tests.std_output_logger import StdOutputLogger
from tests.standalone_tests.conftest import insert_data_to_localnode
from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT
from tests.standalone_tests.conftest import MONETDB_LOCALNODE2_PORT

create_table_task_signature = get_celery_task_signature("create_table")
# insert_task_signature = get_celery_task_signature("insert_data_to_table")
Expand Down Expand Up @@ -107,8 +105,9 @@ def test_create_merge_table_with_remote_tables(
# logger=StdOutputLogger(),
# timeout=TASKS_TIMEOUT,
# )
insert_data_to_localnode(local_node_1_table_info.name,values,MONETDB_LOCALNODE1_PORT)

insert_data_to_localnode(
local_node_1_table_info.name, values, MONETDB_LOCALNODE1_PORT
)

# async_result = localnode2_celery_app.queue_task(
# task_signature=insert_task_signature,
Expand All @@ -123,8 +122,9 @@ def test_create_merge_table_with_remote_tables(
# logger=StdOutputLogger(),
# timeout=TASKS_TIMEOUT,
# )
insert_data_to_localnode(local_node_2_table_info.name,values,MONETDB_LOCALNODE2_PORT)

insert_data_to_localnode(
local_node_2_table_info.name, values, MONETDB_LOCALNODE2_PORT
)

# Create remote tables
local_node_1_monetdb_sock_address = f"{str(COMMON_IP)}:{MONETDB_LOCALNODE1_PORT}"
Expand Down
6 changes: 3 additions & 3 deletions tests/standalone_tests/test_merge_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
from exareme2.node_tasks_DTOs import TableInfo
from exareme2.node_tasks_DTOs import TableSchema
from exareme2.node_tasks_DTOs import TableType
from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT
from tests.standalone_tests.conftest import TASKS_TIMEOUT
from tests.standalone_tests.conftest import insert_data_to_localnode
from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature
from tests.standalone_tests.std_output_logger import StdOutputLogger
from tests.standalone_tests.conftest import insert_data_to_localnode
from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT

create_table_task_signature = get_celery_task_signature("create_table")
create_merge_table_task_signature = get_celery_task_signature("create_merge_table")
Expand Down Expand Up @@ -97,7 +97,7 @@ def create_three_column_table_with_data(
# logger=StdOutputLogger(),
# timeout=TASKS_TIMEOUT,
# )
insert_data_to_localnode(table_info.name,values,MONETDB_LOCALNODE1_PORT)
insert_data_to_localnode(table_info.name, values, MONETDB_LOCALNODE1_PORT)

return table_info

Expand Down
53 changes: 22 additions & 31 deletions tests/standalone_tests/test_smpc_node_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@
from tests.algorithms.orphan_udfs import smpc_local_step
from tests.standalone_tests.conftest import LOCALNODE1_SMPC_CONFIG_FILE
from tests.standalone_tests.conftest import LOCALNODE2_SMPC_CONFIG_FILE
from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT
from tests.standalone_tests.conftest import MONETDB_SMPC_LOCALNODE1_PORT
from tests.standalone_tests.conftest import MONETDB_SMPC_LOCALNODE2_PORT
from tests.standalone_tests.conftest import SMPC_COORDINATOR_ADDRESS
from tests.standalone_tests.conftest import TASKS_TIMEOUT
from tests.standalone_tests.conftest import get_node_config_by_id
from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature
from tests.standalone_tests.conftest import insert_data_to_localnode
from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT
from tests.standalone_tests.conftest import MONETDB_SMPC_LOCALNODE1_PORT
from tests.standalone_tests.conftest import MONETDB_SMPC_LOCALNODE2_PORT

from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature

request_id = "testsmpcudfs" + str(uuid.uuid4().hex)[:10] + "request"
context_id = "testsmpcudfs" + str(uuid.uuid4().hex)[:10]
Expand All @@ -49,8 +48,7 @@


def create_table_with_one_column_and_ten_rows(
celery_app: Celery,
monetdb_localnode_port
celery_app: Celery, monetdb_localnode_port
) -> Tuple[TableInfo, int]:
create_table_task = get_celery_task_signature("create_table")
# insert_data_to_table_task = get_celery_task_signature("insert_data_to_table")
Expand Down Expand Up @@ -79,7 +77,7 @@ def create_table_with_one_column_and_ten_rows(
# values=values,
# ).get(timeout=TASKS_TIMEOUT)

insert_data_to_localnode(table_info.name,values,monetdb_localnode_port)
insert_data_to_localnode(table_info.name, values, monetdb_localnode_port)

return table_info, 55

Expand Down Expand Up @@ -109,8 +107,7 @@ def create_secure_transfer_table(celery_app: Celery) -> TableInfo:

# TODO More dynamic so it can receive any secure_transfer values
def create_table_with_secure_transfer_results_with_smpc_off(
celery_app: Celery,
monetdb_localnode_port
celery_app: Celery, monetdb_localnode_port
) -> Tuple[TableInfo, int]:
# task_signature = get_celery_task_signature("insert_data_to_table")

Expand All @@ -135,16 +132,13 @@ def create_table_with_secure_transfer_results_with_smpc_off(
# table_name=table_info.name,
# values=values,
# ).get(timeout=TASKS_TIMEOUT)
insert_data_to_localnode(table_info.name,values,monetdb_localnode_port)
insert_data_to_localnode(table_info.name, values, monetdb_localnode_port)


return table_info, secure_transfer_1_value + secure_transfer_2_value


def create_table_with_multiple_secure_transfer_templates(
celery_app: Celery,
monetdb_localnode_port:int,
similar: bool
celery_app: Celery, monetdb_localnode_port: int, similar: bool
) -> TableInfo:
# task_signature = get_celery_task_signature("insert_data_to_table")

Expand Down Expand Up @@ -173,14 +167,13 @@ def create_table_with_multiple_secure_transfer_templates(
# table_name=table_info.name,
# values=values,
# ).get(timeout=TASKS_TIMEOUT)
insert_data_to_localnode(table_info.name,values,monetdb_localnode_port)
insert_data_to_localnode(table_info.name, values, monetdb_localnode_port)

return table_info


def create_table_with_smpc_sum_op_values(
celery_app: Celery,
monetdb_localnode_port:int
celery_app: Celery, monetdb_localnode_port: int
) -> Tuple[TableInfo, str]:
# task_signature = get_celery_task_signature("insert_data_to_table")

Expand All @@ -196,7 +189,7 @@ def create_table_with_smpc_sum_op_values(
# table_name=table_info.name,
# values=values,
# ).get(timeout=TASKS_TIMEOUT)
insert_data_to_localnode(table_info.name,values,monetdb_localnode_port)
insert_data_to_localnode(table_info.name, values, monetdb_localnode_port)

return table_info, json.dumps(sum_op_values)

Expand Down Expand Up @@ -229,8 +222,7 @@ def test_secure_transfer_output_with_smpc_off(
get_table_data_task = get_celery_task_signature("get_table_data")

input_table_info, input_table_name_sum = create_table_with_one_column_and_ten_rows(
localnode1_celery_app,
MONETDB_LOCALNODE1_PORT
localnode1_celery_app, MONETDB_LOCALNODE1_PORT
)

pos_args_str = NodeUDFPosArguments(
Expand Down Expand Up @@ -277,7 +269,9 @@ def test_secure_transfer_input_with_smpc_off(
(
secure_transfer_results_tableinfo,
secure_transfer_results_values_sum,
) = create_table_with_secure_transfer_results_with_smpc_off(localnode1_celery_app,MONETDB_LOCALNODE1_PORT)
) = create_table_with_secure_transfer_results_with_smpc_off(
localnode1_celery_app, MONETDB_LOCALNODE1_PORT
)

pos_args_str = NodeUDFPosArguments(
args=[NodeTableDTO(value=secure_transfer_results_tableinfo)]
Expand Down Expand Up @@ -325,7 +319,7 @@ def test_validate_smpc_templates_match(
)

table_info = create_table_with_multiple_secure_transfer_templates(
smpc_localnode1_celery_app, MONETDB_SMPC_LOCALNODE1_PORT,True
smpc_localnode1_celery_app, MONETDB_SMPC_LOCALNODE1_PORT, True
)

try:
Expand All @@ -351,7 +345,7 @@ def test_validate_smpc_templates_dont_match(
)

table_info = create_table_with_multiple_secure_transfer_templates(
smpc_localnode1_celery_app,MONETDB_SMPC_LOCALNODE1_PORT, False
smpc_localnode1_celery_app, MONETDB_SMPC_LOCALNODE1_PORT, False
)

with pytest.raises(ValueError) as exc:
Expand All @@ -376,8 +370,7 @@ def test_secure_transfer_run_udf_flow_with_smpc_on(

# ----------------------- SECURE TRANSFER OUTPUT ----------------------
input_table_name, input_table_name_sum = create_table_with_one_column_and_ten_rows(
smpc_localnode1_celery_app,
MONETDB_SMPC_LOCALNODE1_PORT
smpc_localnode1_celery_app, MONETDB_SMPC_LOCALNODE1_PORT
)

pos_args_str = NodeUDFPosArguments(
Expand Down Expand Up @@ -487,7 +480,7 @@ def test_load_data_to_smpc_client(
):
smpc_localnode1_celery_app = smpc_localnode1_celery_app._celery_app
table_info, sum_op_values_str = create_table_with_smpc_sum_op_values(
smpc_localnode1_celery_app,MONETDB_SMPC_LOCALNODE1_PORT
smpc_localnode1_celery_app, MONETDB_SMPC_LOCALNODE1_PORT
)
load_data_to_smpc_client_task = get_celery_task_signature(
"load_data_to_smpc_client"
Expand Down Expand Up @@ -674,15 +667,13 @@ def test_orchestrate_SMPC_between_two_localnodes_and_the_globalnode(
input_table_1_name,
input_table_1_name_sum,
) = create_table_with_one_column_and_ten_rows(
smpc_localnode1_celery_app,
MONETDB_SMPC_LOCALNODE1_PORT
smpc_localnode1_celery_app, MONETDB_SMPC_LOCALNODE1_PORT
)
(
input_table_2_name,
input_table_2_name_sum,
) = create_table_with_one_column_and_ten_rows(
smpc_localnode2_celery_app,
MONETDB_SMPC_LOCALNODE2_PORT
smpc_localnode2_celery_app, MONETDB_SMPC_LOCALNODE2_PORT
)

# ---------------- RUN LOCAL UDFS WITH SECURE TRANSFER OUTPUT ----------------------
Expand Down
9 changes: 4 additions & 5 deletions tests/standalone_tests/test_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
from exareme2.table_data_DTOs import ColumnDataFloat
from exareme2.table_data_DTOs import ColumnDataInt
from exareme2.table_data_DTOs import ColumnDataStr
from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT
from tests.standalone_tests.conftest import TASKS_TIMEOUT
from tests.standalone_tests.conftest import insert_data_to_localnode
from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature
from tests.standalone_tests.std_output_logger import StdOutputLogger
from tests.standalone_tests.conftest import insert_data_to_localnode
from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT

create_table_task_signature = get_celery_task_signature("create_table")
get_tables_task_signature = get_celery_task_signature("get_tables")
Expand Down Expand Up @@ -92,7 +92,7 @@ def test_create_and_find_tables(
# logger=StdOutputLogger(),
# timeout=TASKS_TIMEOUT,
# )
insert_data_to_localnode(table_1_info.name,values,MONETDB_LOCALNODE1_PORT)
insert_data_to_localnode(table_1_info.name, values, MONETDB_LOCALNODE1_PORT)

async_result = localnode1_celery_app.queue_task(
task_signature=get_table_data_task_signature,
Expand Down Expand Up @@ -158,8 +158,7 @@ def test_create_and_find_tables(
# logger=StdOutputLogger(),
# timeout=TASKS_TIMEOUT,
# )
insert_data_to_localnode(table_2_info.name,values,MONETDB_LOCALNODE1_PORT)

insert_data_to_localnode(table_2_info.name, values, MONETDB_LOCALNODE1_PORT)

async_result = localnode1_celery_app.queue_task(
task_signature=get_table_data_task_signature,
Expand Down
18 changes: 8 additions & 10 deletions tests/standalone_tests/test_udfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,25 @@
from tests.algorithms.orphan_udfs import get_column_rows
from tests.algorithms.orphan_udfs import local_step
from tests.algorithms.orphan_udfs import one_hundred_seconds_udf
from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT
from tests.standalone_tests.conftest import TASKS_TIMEOUT
from tests.standalone_tests.conftest import insert_data_to_localnode
from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature
from tests.standalone_tests.std_output_logger import StdOutputLogger
from tests.standalone_tests.conftest import insert_data_to_localnode
from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT


command_id = "command123"
request_id = "testsmpcudfs" + str(uuid.uuid4().hex)[:10] + "request"
context_id = "testsmpcudfs" + str(uuid.uuid4().hex)[:10]



# Alias locslnode1_db_cursor to db
@pytest.fixture(scope="module")
def db(localnode1_db_cursor):
return localnode1_db_cursor


def create_table_with_one_column_and_ten_rows(
celery_app, request_id,db
celery_app, request_id, db
) -> Tuple[TableInfo, int]:
create_table_task = get_celery_task_signature("create_table")
# insert_data_to_table_task = get_celery_task_signature("insert_data_to_table")
Expand Down Expand Up @@ -79,7 +77,7 @@ def create_table_with_one_column_and_ten_rows(
# celery_app.get_result(
# async_result=async_result, logger=StdOutputLogger(), timeout=TASKS_TIMEOUT)

insert_data_to_localnode(table_info.name,values,MONETDB_LOCALNODE1_PORT)
insert_data_to_localnode(table_info.name, values, MONETDB_LOCALNODE1_PORT)

# breakpoint()
# sql = f"""
Expand Down Expand Up @@ -132,14 +130,14 @@ def test_run_udf_state_and_transfer_output(
use_localnode1_database,
localnode1_db_cursor,
localnode1_celery_app,
db
db,
):
run_udf_task = get_celery_task_signature("run_udf")

local_node_get_table_data = get_celery_task_signature("get_table_data")

input_table_info, input_table_name_sum = create_table_with_one_column_and_ten_rows(
localnode1_celery_app, request_id,db
localnode1_celery_app, request_id, db
)

kw_args_str = NodeUDFKeyArguments(
Expand Down Expand Up @@ -245,12 +243,12 @@ def test_run_udf_with_remote_state_table_passed_as_normal_table(
@pytest.mark.skip(reason="https://team-1617704806227.atlassian.net/browse/MIP-473")
@pytest.mark.slow
def test_slow_udf_exception(
localnode1_node_service, use_localnode1_database, localnode1_celery_app,db
localnode1_node_service, use_localnode1_database, localnode1_celery_app, db
):
run_udf_task = get_celery_task_signature("run_udf")

input_table_name, input_table_name_sum = create_table_with_one_column_and_ten_rows(
localnode1_celery_app, request_id,db
localnode1_celery_app, request_id, db
)

kw_args_str = NodeUDFKeyArguments(
Expand Down
Loading

0 comments on commit f385e89

Please sign in to comment.