Skip to content

Commit

Permalink
Celery task 'insert_data_to_table' removed
Browse files Browse the repository at this point in the history
  • Loading branch information
apmariglis authored Sep 1, 2023
1 parent 7498fcf commit 8fe3753
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 170 deletions.
19 changes: 0 additions & 19 deletions exareme2/node/tasks/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,3 @@ def create_table(
schema_=schema,
type_=TableType.NORMAL,
).json()


# TODO: https://team-1617704806227.atlassian.net/browse/MIP-762
@shared_task
@initialise_logger
def insert_data_to_table(
request_id: str, table_name: str, values: List[List[Union[str, int, float, bool]]]
):
"""
Parameters
----------
request_id : str
The identifier for the logging
table_name : str
The name of the table
values : List[List[Union[str, int, float, bool]]
The data of the table to be inserted
"""
tables.insert_data_to_table(table_name, values)
18 changes: 18 additions & 0 deletions tests/standalone_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
import re
import subprocess
import time
from itertools import chain
from os import path
from pathlib import Path
from typing import List
from typing import Union

import docker
import psutil
Expand Down Expand Up @@ -556,6 +559,21 @@ def localnodetmp_db_cursor():
return _create_db_cursor(MONETDB_LOCALNODETMP_PORT)


def insert_data_to_db(
table_name: str, table_values: List[List[Union[str, int, float]]], db_cursor
):
row_length = len(table_values[0])
if all(len(row) != row_length for row in table_values):
raise Exception("Not all rows have the same number of values")

values = ", ".join(
"(" + ", ".join("%s" for _ in range(row_length)) + ")" for _ in table_values
)
sql_clause = f"INSERT INTO {table_name} VALUES {values}"

db_cursor.execute(sql_clause, list(chain(*table_values)))


def _clean_db(cursor):
class TableType(enum.Enum):
NORMAL = 0
Expand Down
32 changes: 5 additions & 27 deletions tests/standalone_tests/test_local_global_nodes_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT
from tests.standalone_tests.conftest import MONETDB_LOCALNODE2_PORT
from tests.standalone_tests.conftest import TASKS_TIMEOUT
from tests.standalone_tests.conftest import insert_data_to_db
from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature
from tests.standalone_tests.std_output_logger import StdOutputLogger

create_table_task_signature = get_celery_task_signature("create_table")
insert_task_signature = get_celery_task_signature("insert_data_to_table")
create_remote_task_signature = get_celery_task_signature("create_remote_table")
get_remote_tables_task_signature = get_celery_task_signature("get_remote_tables")
create_merge_task_signature = get_celery_task_signature("create_merge_table")
Expand All @@ -42,8 +42,10 @@ def test_create_merge_table_with_remote_tables(
context_id,
localnode1_node_service,
localnode1_celery_app,
localnode1_db_cursor,
localnode2_node_service,
localnode2_celery_app,
localnode2_db_cursor,
globalnode_node_service,
globalnode_celery_app,
):
Expand Down Expand Up @@ -92,32 +94,8 @@ def test_create_merge_table_with_remote_tables(

# Insert data into local tables
values = [[1, 0.1, "test1"], [2, 0.2, "test2"], [3, 0.3, "test3"]]
async_result = localnode1_celery_app.queue_task(
task_signature=insert_task_signature,
logger=StdOutputLogger(),
request_id=request_id,
table_name=local_node_1_table_info.name,
values=values,
)
localnode1_celery_app.get_result(
async_result=async_result,
logger=StdOutputLogger(),
timeout=TASKS_TIMEOUT,
)

async_result = localnode2_celery_app.queue_task(
task_signature=insert_task_signature,
logger=StdOutputLogger(),
request_id=request_id,
table_name=local_node_2_table_info.name,
values=values,
)

localnode2_celery_app.get_result(
async_result=async_result,
logger=StdOutputLogger(),
timeout=TASKS_TIMEOUT,
)
insert_data_to_db(local_node_1_table_info.name, values, localnode1_db_cursor)
insert_data_to_db(local_node_2_table_info.name, values, localnode2_db_cursor)

# Create remote tables
local_node_1_monetdb_sock_address = f"{str(COMMON_IP)}:{MONETDB_LOCALNODE1_PORT}"
Expand Down
31 changes: 12 additions & 19 deletions tests/standalone_tests/test_merge_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
from exareme2.node_tasks_DTOs import TableInfo
from exareme2.node_tasks_DTOs import TableSchema
from exareme2.node_tasks_DTOs import TableType
from tests.standalone_tests.conftest import MONETDB_LOCALNODE1_PORT
from tests.standalone_tests.conftest import TASKS_TIMEOUT
from tests.standalone_tests.conftest import insert_data_to_db
from tests.standalone_tests.nodes_communication_helper import get_celery_task_signature
from tests.standalone_tests.std_output_logger import StdOutputLogger

create_table_task_signature = get_celery_task_signature("create_table")
create_merge_table_task_signature = get_celery_task_signature("create_merge_table")
insert_data_to_table_task_signature = get_celery_task_signature("insert_data_to_table")
get_merge_tables_task_signature = get_celery_task_signature("get_merge_tables")


Expand Down Expand Up @@ -57,7 +58,7 @@ def create_two_column_table(request_id, context_id, table_id: int, celery_app):


def create_three_column_table_with_data(
request_id, context_id, table_id: int, celery_app
request_id, context_id, table_id: int, celery_app, db_cursor
):
table_schema = TableSchema(
columns=[
Expand All @@ -83,18 +84,7 @@ def create_three_column_table_with_data(
)

values = [[1, 0.1, "test1"], [2, 0.2, "test2"], [3, 0.3, "test3"]]
async_result = celery_app.queue_task(
task_signature=insert_data_to_table_task_signature,
logger=StdOutputLogger(),
request_id=request_id,
table_name=table_info.name,
values=values,
)
celery_app.get_result(
async_result=async_result,
logger=StdOutputLogger(),
timeout=TASKS_TIMEOUT,
)
insert_data_to_db(table_info.name, values, db_cursor)

return table_info

Expand All @@ -105,10 +95,11 @@ def test_create_and_get_merge_table(
context_id,
localnode1_node_service,
localnode1_celery_app,
localnode1_db_cursor,
):
tables_to_be_merged = [
create_three_column_table_with_data(
request_id, context_id, count, localnode1_celery_app
request_id, context_id, count, localnode1_celery_app, localnode1_db_cursor
)
for count in range(0, 5)
]
Expand Down Expand Up @@ -148,15 +139,16 @@ def test_incompatible_schemas_merge(
context_id,
localnode1_node_service,
localnode1_celery_app,
localnode1_db_cursor,
):
incompatible_partition_tables = [
create_three_column_table_with_data(
request_id, context_id, 1, localnode1_celery_app
request_id, context_id, 1, localnode1_celery_app, localnode1_db_cursor
),
create_two_column_table(request_id, context_id, 2, localnode1_celery_app),
create_two_column_table(request_id, context_id, 3, localnode1_celery_app),
create_three_column_table_with_data(
request_id, context_id, 4, localnode1_celery_app
request_id, context_id, 4, localnode1_celery_app, localnode1_db_cursor
),
]
async_result = localnode1_celery_app.queue_task(
Expand Down Expand Up @@ -184,13 +176,14 @@ def test_table_cannot_be_found(
context_id,
localnode1_node_service,
localnode1_celery_app,
localnode1_db_cursor,
):
not_found_tables = [
create_three_column_table_with_data(
request_id, context_id, 1, localnode1_celery_app
request_id, context_id, 1, localnode1_celery_app, localnode1_db_cursor
),
create_three_column_table_with_data(
request_id, context_id, 2, localnode1_celery_app
request_id, context_id, 2, localnode1_celery_app, localnode1_db_cursor
),
TableInfo(
name="non_existing_table",
Expand Down
3 changes: 2 additions & 1 deletion tests/standalone_tests/test_node_info_tasks_priority.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ def queue_one_second_udf(
@pytest.mark.very_slow
def test_node_info_tasks_have_higher_priority_over_other_tasks(
globalnode_node_service,
globalnode_db_cursor,
reset_celery_app_factory,
get_controller_testing_logger,
):
cel_app_wrapper = CeleryAppFactory().get_celery_app(RABBITMQ_GLOBALNODE_ADDR)

input_table_name, _ = create_table_with_one_column_and_ten_rows(
cel_app_wrapper, request_id
cel_app_wrapper, globalnode_db_cursor, request_id
)

# Queue an X amount of udfs to fill the rabbitmq.
Expand Down
Loading

0 comments on commit 8fe3753

Please sign in to comment.