diff --git a/exareme2/algorithms/exareme2/descriptive_stats.py b/exareme2/algorithms/exareme2/descriptive_stats.py index 23bfdde85..c2c3ba944 100644 --- a/exareme2/algorithms/exareme2/descriptive_stats.py +++ b/exareme2/algorithms/exareme2/descriptive_stats.py @@ -25,6 +25,7 @@ particular variable/dataset pair doesn't contribute to the global computation. """ import math +import warnings from collections import Counter from functools import reduce from typing import Dict diff --git a/exareme2/worker/__init__.py b/exareme2/worker/__init__.py index 95cb2d6f6..4ab5863af 100644 --- a/exareme2/worker/__init__.py +++ b/exareme2/worker/__init__.py @@ -13,7 +13,9 @@ if config_file := os.getenv("EXAREME2_WORKER_CONFIG_FILE"): with open(config_file) as fp: config = AttrDict(envtoml.load(fp)) + config.sqlite = AttrDict({}) config.data_path = TEST_DATA_FOLDER + config.sqlite.db_name = config.identifier else: with open_text(worker, "config.toml") as fp: config = AttrDict(envtoml.load(fp)) diff --git a/exareme2/worker/config.toml b/exareme2/worker/config.toml index 14d506e93..1f811a3b8 100644 --- a/exareme2/worker/config.toml +++ b/exareme2/worker/config.toml @@ -25,6 +25,9 @@ user = "user" password = "password" vhost = "user_vhost" +[sqlite] +db_name = "$SQLITE_DB_NAME" + [monetdb] ip = "$MONETDB_IP" port = "$MONETDB_PORT" diff --git a/exareme2/worker/exareme2/cleanup/cleanup_db.py b/exareme2/worker/exareme2/cleanup/cleanup_db.py index c994f7828..a33072562 100644 --- a/exareme2/worker/exareme2/cleanup/cleanup_db.py +++ b/exareme2/worker/exareme2/cleanup/cleanup_db.py @@ -1,9 +1,8 @@ from typing import Dict from typing import List +from exareme2.worker.exareme2.monetdb import monetdb_facade from exareme2.worker.exareme2.monetdb.guard import sql_injection_guard -from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_and_fetchall -from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_query from exareme2.worker.exareme2.tables.tables_db import get_tables_by_type from exareme2.worker_communication import TableType @@ -24,7 +23,7 @@ def drop_db_artifacts_by_context_id(context_id: str): udfs_deletion_query = _get_drop_udfs_query(function_names) table_names_by_type = get_tables_by_type(context_id) tables_deletion_query = _get_drop_tables_query(table_names_by_type) - db_execute_query(udfs_deletion_query + tables_deletion_query) + monetdb_facade.execute_query(udfs_deletion_query + tables_deletion_query) @sql_injection_guard(context_id=str.isalnum) @@ -37,7 +36,7 @@ def _get_function_names_query_by_context_id(context_id: str) -> List[str]: context_id : str The id of the experiment """ - result = db_execute_and_fetchall( + result = monetdb_facade.execute_and_fetchall( f""" SELECT name FROM functions WHERE name LIKE '%{context_id.lower()}%' diff --git a/exareme2/worker/exareme2/monetdb/monetdb_facade.py b/exareme2/worker/exareme2/monetdb/monetdb_facade.py index 9689bd07a..d88083833 100644 --- a/exareme2/worker/exareme2/monetdb/monetdb_facade.py +++ b/exareme2/worker/exareme2/monetdb/monetdb_facade.py @@ -30,7 +30,7 @@ class Config: allow_mutation = False -def db_execute_and_fetchall( +def execute_and_fetchall( query: str, parameters=None, use_public_user: bool = False ) -> List: query_execution_timeout = worker_config.celery.tasks_timeout @@ -43,7 +43,7 @@ def db_execute_and_fetchall( return _execute_and_fetchall(db_execution_dto=db_execution_dto) -def db_execute_query(query: str, parameters=None, use_public_user: bool = False): +def execute_query(query: str, parameters=None, use_public_user: bool = False): query_execution_timeout = worker_config.celery.tasks_timeout query = convert_to_idempotent(query) db_execution_dto = _DBExecutionDTO( @@ -55,7 +55,7 @@ def db_execute_query(query: str, parameters=None, use_public_user: bool = False) _execute(db_execution_dto=db_execution_dto, lock=query_execution_lock) -def db_execute_udf(query: str, parameters=None): +def execute_udf(query: str, parameters=None): # Check if there is only one query split_queries = [query for query in query.strip().split(";") if query] if len(split_queries) > 1: diff --git a/exareme2/worker/exareme2/tables/tables_db.py b/exareme2/worker/exareme2/tables/tables_db.py index f1ec011d6..b7699da96 100644 --- a/exareme2/worker/exareme2/tables/tables_db.py +++ b/exareme2/worker/exareme2/tables/tables_db.py @@ -7,12 +7,11 @@ from exareme2 import DType from exareme2.worker import config as worker_config +from exareme2.worker.exareme2.monetdb import monetdb_facade from exareme2.worker.exareme2.monetdb.guard import is_list_of_identifiers from exareme2.worker.exareme2.monetdb.guard import is_socket_address from exareme2.worker.exareme2.monetdb.guard import is_valid_table_schema from exareme2.worker.exareme2.monetdb.guard import sql_injection_guard -from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_and_fetchall -from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_query from exareme2.worker_communication import ColumnData from exareme2.worker_communication import ColumnDataBinary from exareme2.worker_communication import ColumnDataFloat @@ -71,7 +70,7 @@ def get_table_names(table_type: TableType, context_id: str) -> List[str]: List[str] A list of table names. """ - table_names = db_execute_and_fetchall( + table_names = monetdb_facade.execute_and_fetchall( f""" SELECT name FROM tables WHERE @@ -110,7 +109,7 @@ def get_table_schema(table_name: str) -> TableSchema: TableSchema A schema which is TableSchema object. """ - schema = db_execute_and_fetchall( + schema = monetdb_facade.execute_and_fetchall( f""" SELECT columns.name, columns.type FROM columns @@ -150,7 +149,7 @@ def get_table_type(table_name: str) -> TableType: The type of the table. """ - monetdb_table_type_result = db_execute_and_fetchall( + monetdb_table_type_result = monetdb_facade.execute_and_fetchall( f""" SELECT type FROM @@ -168,7 +167,7 @@ def get_table_type(table_name: str) -> TableType: @sql_injection_guard(table_name=str.isidentifier, table_schema=is_valid_table_schema) def create_table(table_name: str, table_schema: TableSchema): columns_schema = convert_schema_to_sql_query_format(table_schema) - db_execute_query(f"CREATE TABLE {table_name} ( {columns_schema} )") + monetdb_facade.execute_query(f"CREATE TABLE {table_name} ( {columns_schema} )") @sql_injection_guard( @@ -191,7 +190,7 @@ def create_merge_table( merge_table_query += f"ALTER TABLE {table_name} ADD TABLE {name.lower()}; " try: - db_execute_query(merge_table_query) + monetdb_facade.execute_query(merge_table_query) except ( pymonetdb.exceptions.ProgrammingError or pymonetdb.exceptions.OperationalError ) as exc: @@ -220,7 +219,7 @@ def create_remote_table( public_password: str, ): columns_schema = convert_schema_to_sql_query_format(schema) - db_execute_query( + monetdb_facade.execute_query( f""" CREATE REMOTE TABLE {table_name} ( {columns_schema}) ON 'mapi:monetdb://{monetdb_socket_address}/db/{table_creator_username}/{table_name}' @@ -256,7 +255,7 @@ def get_table_data(table_name: str, use_public_user: bool = True) -> List[Column worker_config.monetdb.local_username ) # The db local user, on whose namespace the tables are on. - row_stored_data = db_execute_and_fetchall( + row_stored_data = monetdb_facade.execute_and_fetchall( f"SELECT * FROM {db_local_username}.{table_name}", use_public_user=use_public_user, ) @@ -292,7 +291,7 @@ def insert_data_to_table( query = f"INSERT INTO {table_name} VALUES {placeholders}" # Execute the query with the parameters - db_execute_query(query, parameters) + monetdb_facade.execute_query(query, parameters) def _convert_column_stored_data_to_column_data_objects( @@ -393,7 +392,7 @@ def get_tables_by_type(context_id: str) -> Dict[TableType, List[str]]: context_id : str The id of the experiment """ - table_names_and_types = db_execute_and_fetchall( + table_names_and_types = monetdb_facade.execute_and_fetchall( f""" SELECT name, type FROM tables WHERE name LIKE '%{context_id.lower()}%' diff --git a/exareme2/worker/exareme2/udfs/udfs_db.py b/exareme2/worker/exareme2/udfs/udfs_db.py index 31b5b5dea..183201ef5 100644 --- a/exareme2/worker/exareme2/udfs/udfs_db.py +++ b/exareme2/worker/exareme2/udfs/udfs_db.py @@ -1,9 +1,8 @@ from typing import List -from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_query -from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_udf +from exareme2.worker.exareme2.monetdb import monetdb_facade def run_udf(udf_defenitions: List[str], udf_exec_stmt): - db_execute_query(";\n".join(udf_defenitions)) - db_execute_udf(udf_exec_stmt) + monetdb_facade.execute_query(";\n".join(udf_defenitions)) + monetdb_facade.execute_udf(udf_exec_stmt) diff --git a/exareme2/worker/exareme2/views/views_db.py b/exareme2/worker/exareme2/views/views_db.py index df39eab47..e6f4a14e6 100644 --- a/exareme2/worker/exareme2/views/views_db.py +++ b/exareme2/worker/exareme2/views/views_db.py @@ -2,12 +2,11 @@ from typing import Optional from exareme2.data_filters import build_filter_clause +from exareme2.worker.exareme2.monetdb import monetdb_facade from exareme2.worker.exareme2.monetdb.guard import is_list_of_identifiers from exareme2.worker.exareme2.monetdb.guard import is_primary_data_table from exareme2.worker.exareme2.monetdb.guard import is_valid_filter from exareme2.worker.exareme2.monetdb.guard import sql_injection_guard -from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_and_fetchall -from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_query from exareme2.worker.exareme2.tables.tables_db import get_table_names from exareme2.worker.exareme2.tables.tables_db import get_table_schema from exareme2.worker_communication import ColumnInfo @@ -49,9 +48,9 @@ def create_view( {filter_clause} """ - db_execute_query(view_creation_query) + monetdb_facade.execute_query(view_creation_query) - view_rows_query_result = db_execute_and_fetchall( + view_rows_query_result = monetdb_facade.execute_and_fetchall( f""" SELECT COUNT(*) FROM {view_name} @@ -61,7 +60,7 @@ def create_view( view_rows_count = view_rows_result_row[0] if view_rows_count < 1 or (check_min_rows and view_rows_count < minimum_row_count): - db_execute_query(f"""DROP VIEW {view_name}""") + monetdb_facade.execute_query(f"""DROP VIEW {view_name}""") raise InsufficientDataError( f"Query: {view_creation_query} creates an " f"insufficient data view. ({view_name=} has been dropped)" diff --git a/exareme2/worker/worker_info/sqlite.py b/exareme2/worker/worker_info/sqlite.py new file mode 100644 index 000000000..ddc54b4b7 --- /dev/null +++ b/exareme2/worker/worker_info/sqlite.py @@ -0,0 +1,16 @@ +import sqlite3 +from typing import List + +from exareme2.worker import config as worker_config + +CONN = sqlite3.connect( + f"{str(worker_config.data_path)}/{worker_config.sqlite.db_name}.db" +) + + +def execute_and_fetchall(query) -> List: + cur = CONN.cursor() + cur.execute(query) + result = cur.fetchall() + cur.close() + return result diff --git a/exareme2/worker/worker_info/worker_info_db.py b/exareme2/worker/worker_info/worker_info_db.py index 30887ae3f..1c25904a8 100644 --- a/exareme2/worker/worker_info/worker_info_db.py +++ b/exareme2/worker/worker_info/worker_info_db.py @@ -4,7 +4,7 @@ from exareme2.worker.exareme2.monetdb.guard import is_datamodel from exareme2.worker.exareme2.monetdb.guard import sql_injection_guard -from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_and_fetchall +from exareme2.worker.worker_info import sqlite from exareme2.worker_communication import CommonDataElement from exareme2.worker_communication import CommonDataElements from exareme2.worker_communication import DataModelAttributes @@ -22,9 +22,9 @@ def get_data_models() -> List[str]: The data_models. """ - data_models_code_and_version = db_execute_and_fetchall( + data_models_code_and_version = sqlite.execute_and_fetchall( f"""SELECT code, version - FROM "mipdb_metadata"."data_models" + FROM data_models WHERE status = 'ENABLED' """ ) @@ -46,14 +46,14 @@ def get_dataset_code_per_dataset_label(data_model: str) -> Dict[str, str]: """ data_model_code, data_model_version = data_model.split(":") - datasets_rows = db_execute_and_fetchall( + datasets_rows = sqlite.execute_and_fetchall( f""" SELECT code, label - FROM "mipdb_metadata"."datasets" + FROM datasets WHERE data_model_id = ( SELECT data_model_id - FROM "mipdb_metadata"."data_models" + FROM data_models WHERE code = '{data_model_code}' AND version = '{data_model_version}' ) @@ -76,9 +76,9 @@ def get_data_model_cdes(data_model: str) -> CommonDataElements: """ data_model_code, data_model_version = data_model.split(":") - cdes_rows = db_execute_and_fetchall( + cdes_rows = sqlite.execute_and_fetchall( f""" - SELECT code, metadata FROM "{data_model_code}:{data_model_version}"."variables_metadata" + SELECT code, metadata FROM "{data_model_code}:{data_model_version}_variables_metadata" """ ) @@ -102,10 +102,10 @@ def get_data_model_attributes(data_model: str) -> DataModelAttributes: """ data_model_code, data_model_version = data_model.split(":") - attributes = db_execute_and_fetchall( + attributes = sqlite.execute_and_fetchall( f""" SELECT properties - FROM "mipdb_metadata"."data_models" + FROM data_models WHERE code = '{data_model_code}' AND version = '{data_model_version}' """ @@ -121,5 +121,5 @@ def check_database_connection(): """ Check that the connection with the database is working. """ - result = db_execute_and_fetchall(f"SELECT '{HEALTHCHECK_VALIDATION_STRING}'") + result = sqlite.execute_and_fetchall(f"SELECT '{HEALTHCHECK_VALIDATION_STRING}'") assert result[0][0] == HEALTHCHECK_VALIDATION_STRING diff --git a/federation_info.py b/federation_info.py index caef7d77a..d2257df6a 100644 --- a/federation_info.py +++ b/federation_info.py @@ -9,12 +9,6 @@ DB_USERNAME = "admin" DB_PASSWORD = "executor" DB_FARM = "db" -DB_METADATA_SCHEMA = "mipdb_metadata" -ACTIONS_TABLE = "actions" -ADD_DATA_MODEL_ACTION_CODE = "ADD DATA MODEL" -DELETE_DATA_MODEL_ACTION_CODE = "DELETE DATA MODEL" -ADD_DATASET_ACTION_CODE = "ADD DATASET" -DELETE_DATASET_ACTION_CODE = "DELETE DATASET" @contextmanager @@ -43,31 +37,6 @@ def cli(): pass -@cli.command() -@click.option("--ip", default="127.0.0.1", help="The ip of the database.") -@click.option("--port", default=50000, type=int, help="The port of the database.") -def show_worker_db_actions(ip, port): - with db_cursor(ip, port) as cur: - cur.execute(f"select * from {DB_METADATA_SCHEMA}.{ACTIONS_TABLE};") - results = cur.fetchall() - for _, action_str in results: - action = json.loads(action_str) - if ( - action["action"] == ADD_DATA_MODEL_ACTION_CODE - or action["action"] == DELETE_DATA_MODEL_ACTION_CODE - ): - print( - f"{action['date']} - {action['user']} - {action['action']} - {action['data_model_code']}:{action['data_model_version']} - {action['data_model_label']}" - ) - elif ( - action["action"] == ADD_DATASET_ACTION_CODE - or action["action"] == DELETE_DATASET_ACTION_CODE - ): - print( - f"{action['date']} - {action['user']} - {action['action']} - {action['dataset_code']} - {action['dataset_label']} - {action['data_model_code']}:{action['data_model_version']} - {action['data_model_label']}" - ) - - LOG_FILE_CHUNK_SIZE = 1024 # Will read the logfile in chunks TIMESTAMP_REGEX = ( r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}" # 2022-04-13 18:25:22,875 diff --git a/kubernetes/templates/exareme2-localnode.yaml b/kubernetes/templates/exareme2-localnode.yaml index 624043ec8..a39ebb663 100644 --- a/kubernetes/templates/exareme2-localnode.yaml +++ b/kubernetes/templates/exareme2-localnode.yaml @@ -86,6 +86,10 @@ spec: - name: db-importer image: {{ .Values.exareme2_images.repository }}/exareme2_mipdb:{{ .Values.exareme2_images.version }} env: + - name: WORKER_IDENTIFIER + valueFrom: + fieldRef: + fieldPath: spec.nodeName - name: DB_IP valueFrom: fieldRef: @@ -161,6 +165,8 @@ spec: fieldPath: status.podIP - name: RABBITMQ_PORT value: "5672" + - name: SQLITE_DB_NAME + value: "sqlite" - name: MONETDB_IP valueFrom: fieldRef: diff --git a/kubernetes/values.yaml b/kubernetes/values.yaml index e5d87b2d3..ef5e57e77 100644 --- a/kubernetes/values.yaml +++ b/kubernetes/values.yaml @@ -9,7 +9,10 @@ framework_log_level: ERROR max_concurrent_experiments: 32 -db: +sqlite: + db_name: sqlite + +monetdb: credentials_location: /opt/exareme2/credentials storage_location: /opt/exareme2/db csvs_location: /opt/exareme2/csvs diff --git a/mipdb/Dockerfile b/mipdb/Dockerfile index 86cfdbc99..33113a467 100644 --- a/mipdb/Dockerfile +++ b/mipdb/Dockerfile @@ -17,7 +17,8 @@ ENV PYTHONUNBUFFERED=1 \ DATA_PATH="/opt/data" \ DB_IP="172.17.0.1" \ DB_PORT=50000 \ - DB_NAME="db" + DB_NAME="db" \ + SQLITE_DB_NAME="sqlite" ####################################################### # Creating the data folder @@ -29,7 +30,7 @@ WORKDIR $DATA_PATH ####################################################### # Installing dependencies ####################################################### -RUN pip install mipdb==2.4.7 # Must be updated together with pyproject.toml +RUN pip install mipdb==3.0.0 # Must be updated together with pyproject.toml RUN pip install click==8.1.2 RUN pip install pymonetdb==1.6.3 # Must be updated together with pyproject.toml @@ -60,4 +61,4 @@ VOLUME $CREDENTIALS_CONFIG_FOLDER COPY mipdb/bootstrap.sh /home/bootstrap.sh RUN chmod 775 /home/bootstrap.sh -CMD ["sh", "-c", "/home/bootstrap.sh"] \ No newline at end of file +CMD ["sh", "-c", "/home/bootstrap.sh"] diff --git a/mipdb/bootstrap.sh b/mipdb/bootstrap.sh index ff2ddc69b..dc66a6434 100644 --- a/mipdb/bootstrap.sh +++ b/mipdb/bootstrap.sh @@ -10,5 +10,6 @@ MONETDB_ADMIN_USERNAME = \"$MONETDB_ADMIN_USERNAME\" MONETDB_LOCAL_USERNAME = \"$MONETDB_LOCAL_USERNAME\" MONETDB_LOCAL_PASSWORD = \"$MONETDB_LOCAL_PASSWORD\" MONETDB_PUBLIC_USERNAME = \"$MONETDB_PUBLIC_USERNAME\" -MONETDB_PUBLIC_PASSWORD = \"$MONETDB_PUBLIC_PASSWORD\"" > "/home/config.toml" -tail -f /dev/null \ No newline at end of file +MONETDB_PUBLIC_PASSWORD = \"$MONETDB_PUBLIC_PASSWORD\" +SQLITE_DB_PATH = \"$DATA_PATH/$SQLITE_DB_NAME.db\"" > "/home/config.toml" +tail -f /dev/null diff --git a/poetry.lock b/poetry.lock index c934a312b..5e037e8a8 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1494,13 +1494,13 @@ files = [ [[package]] name = "mipdb" -version = "2.4.7" +version = "3.0.0" description = "" optional = false -python-versions = ">=3.8,<3.9" +python-versions = "<3.9,>=3.8" files = [ - {file = "mipdb-2.4.7-py3-none-any.whl", hash = "sha256:ed725e2411dfbdd571a17692dd9ae8dc03ececf1df4b6d0d49d846e113e0477a"}, - {file = "mipdb-2.4.7.tar.gz", hash = "sha256:11ea083bb3448c50b8603aca17e19527e89a6cd3126e2b94f0d8c65f30cbf978"}, + {file = "mipdb-3.0.0-py3-none-any.whl", hash = "sha256:084dfe2d8da81f13102dbf5b84712ff5edb33e291049940b49f3f9bbee0f1421"}, + {file = "mipdb-3.0.0.tar.gz", hash = "sha256:e338692be3ceb4d4cdc47e7d265e9d293185bfa740a964b0e8179d2d5ee82a85"}, ] [package.dependencies] @@ -3020,4 +3020,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "~3.8" -content-hash = "a2bea930cc10c9936a101beb532743be9caf8d7d253f3701eb113186da5bd572" +content-hash = "4deb80a36802e6d62edf58af656989ba90a0a8a5a2a9f5437b21f19b9fa097ff" diff --git a/pyproject.toml b/pyproject.toml index ffc92cc04..2d7ccac37 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,7 @@ hypothesis = "~6.81" pytest-rerunfailures = "~12.0" [tool.poetry.group.dev.dependencies] -mipdb = "2.4.7" # Must be updated together with mipdb Dockerfile +mipdb = "3.0.0" # Must be updated together with mipdb Dockerfile [tool.pytest.ini_options] markers = [ diff --git a/tasks.py b/tasks.py index 126b96387..92d0d1717 100644 --- a/tasks.py +++ b/tasks.py @@ -52,6 +52,7 @@ import copy import itertools import json +import os import pathlib import shutil import sys @@ -141,12 +142,14 @@ def create_configs(c): worker_config["controller"]["ip"] = deployment_config["ip"] worker_config["controller"]["port"] = deployment_config["controller"]["port"] + worker_config["sqlite"]["db_name"] = worker["id"] worker_config["monetdb"]["ip"] = deployment_config["ip"] worker_config["monetdb"]["port"] = worker["monetdb_port"] worker_config["monetdb"]["local_username"] = worker["local_monetdb_username"] worker_config["monetdb"]["local_password"] = worker["local_monetdb_password"] worker_config["monetdb"]["public_username"] = worker["public_monetdb_username"] worker_config["monetdb"]["public_password"] = worker["public_monetdb_password"] + worker_config["monetdb"]["public_password"] = worker["public_monetdb_password"] worker_config["rabbitmq"]["ip"] = deployment_config["ip"] worker_config["rabbitmq"]["port"] = worker["rabbitmq_port"] @@ -392,7 +395,7 @@ def init_monetdb(c, port): f"Initializing MonetDB with mipdb in port: {port}...", Level.HEADER, ) - cmd = f"""poetry run mipdb init --ip 127.0.0.1 {get_monetdb_configs_in_mipdb_format(port)}""" + cmd = f"""poetry run mipdb init {get_sqlite_path(port)}""" run(c, cmd) @@ -437,7 +440,7 @@ def load_data(c, use_sockets=False, port=None): if len(local_worker_ports) == 1: port = local_worker_ports[0] - cmd = f"poetry run mipdb load-folder {TEST_DATA_FOLDER} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)}" + cmd = f"poetry run mipdb load-folder {TEST_DATA_FOLDER} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(port)}" message( f"Loading the folder '{TEST_DATA_FOLDER}' in MonetDB at port {local_worker_ports[0]}...", Level.HEADER, @@ -445,30 +448,28 @@ def load_data(c, use_sockets=False, port=None): run(c, cmd) return - # Load the test data folder into the dbs - data_model_folders = [ - TEST_DATA_FOLDER / folder for folder in listdir(TEST_DATA_FOLDER) - ] - for data_model_folder in data_model_folders: + for dirpath, dirnames, filenames in os.walk(TEST_DATA_FOLDER): + if "CDEsMetadata.json" not in filenames: + continue + cdes_file = os.path.join(dirpath, "CDEsMetadata.json") # Load all data models in each db - with open(data_model_folder / "CDEsMetadata.json") as data_model_metadata_file: + with open(cdes_file) as data_model_metadata_file: data_model_metadata = json.load(data_model_metadata_file) data_model_code = data_model_metadata["code"] data_model_version = data_model_metadata["version"] - cdes_file = data_model_folder / "CDEsMetadata.json" for port in local_worker_ports: message( f"Loading data model '{data_model_code}:{data_model_version}' metadata in MonetDB at port {port}...", Level.HEADER, ) - cmd = f"poetry run mipdb add-data-model {cdes_file} {get_monetdb_configs_in_mipdb_format(port)}" + cmd = f"poetry run mipdb add-data-model {cdes_file} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(port)}" run(c, cmd) # Load only the 1st csv of each dataset "with 0 suffix" in the 1st worker first_worker_csvs = sorted( [ - data_model_folder / file - for file in listdir(data_model_folder) + f"{dirpath}/{file}" + for file in filenames if file.endswith("0.csv") and not file.endswith("10.csv") ] ) @@ -478,14 +479,14 @@ def load_data(c, use_sockets=False, port=None): f"Loading dataset {pathlib.PurePath(csv).name} in MonetDB at port {port}...", Level.HEADER, ) - cmd = f"poetry run mipdb add-dataset {csv} -d {data_model_code} -v {data_model_version} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)}" + cmd = f"poetry run mipdb add-dataset {csv} -d {data_model_code} -v {data_model_version} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(port)}" run(c, cmd) # Load the data model's remaining csvs in the rest of the workers with round-robin fashion remaining_csvs = sorted( [ - data_model_folder / file - for file in listdir(data_model_folder) + f"{dirpath}/{file}" + for file in filenames if file.endswith(".csv") and not file.endswith("0.csv") ] ) @@ -499,10 +500,26 @@ def load_data(c, use_sockets=False, port=None): f"Loading dataset {pathlib.PurePath(csv).name} in MonetDB at port {port}...", Level.HEADER, ) - cmd = f"poetry run mipdb add-dataset {csv} -d {data_model_code} -v {data_model_version} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)}" + cmd = f"poetry run mipdb add-dataset {csv} -d {data_model_code} -v {data_model_version} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(port)}" run(c, cmd) +def get_sqlite_path(port): + config_files = [WORKERS_CONFIG_DIR / file for file in listdir(WORKERS_CONFIG_DIR)] + for worker_config_file in config_files: + with open(worker_config_file) as fp: + worker_config = toml.load(fp) + + if worker_config["role"] == "LOCALWORKER" and str( + worker_config["monetdb"]["port"] + ) == str(port): + return ( + f"--sqlite_db_path {TEST_DATA_FOLDER}/{worker_config['identifier']}.db" + ) + else: + raise ValueError(f"There is no database with port:{port}") + + def get_monetdb_configs_in_mipdb_format(port): return ( f"--ip 127.0.0.1 " diff --git a/tests/standalone_tests/conftest.py b/tests/standalone_tests/conftest.py index b7dc27506..cda5b69c5 100644 --- a/tests/standalone_tests/conftest.py +++ b/tests/standalone_tests/conftest.py @@ -3,6 +3,7 @@ import os import pathlib import re +import sqlite3 import subprocess import time from itertools import chain @@ -202,6 +203,7 @@ def _search_for_string_in_logfile( for _ in range(retries): try: with open(logspath) as logfile: + print(logfile.read()) if bool(re.search(log_to_search_for, logfile.read())): return except FileNotFoundError: @@ -293,6 +295,13 @@ def remove_monetdb_container(cont_name): print(f"Monetdb container '{cont_name}' is already removed.") +def get_worker_id(worker_config_file): + worker_config_filepath = path.join(TEST_ENV_CONFIG_FOLDER, worker_config_file) + with open(worker_config_filepath) as fp: + tmp = toml.load(fp) + return tmp["identifier"] + + @pytest.fixture(scope="session") def monetdb_globalworker(): cont_name = MONETDB_GLOBALWORKER_NAME @@ -362,22 +371,20 @@ def monetdb_localworkertmp(): remove_monetdb_container(cont_name) -def _init_database_monetdb_container(db_port): +def _init_database_monetdb_container(db_port, worker_id): monetdb_configs = MonetDBConfigurations(db_port) - print(f"\nInitializing database ({monetdb_configs.ip}:{monetdb_configs.port})") - cmd = f"mipdb init {monetdb_configs.convert_to_mipdb_format()}" + cmd = f"mipdb init --sqlite_db_path {TEST_DATA_FOLDER}/{worker_id}.db" subprocess.run( cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) print(f"\nDatabase ({monetdb_configs.ip}:{monetdb_configs.port}) initialized.") -def _load_data_monetdb_container(db_port, dataset_suffixes): +def _load_data_monetdb_container(db_port, dataset_suffixes, worker_id): monetdb_configs = MonetDBConfigurations(db_port) - # Check if the database is already loaded - cmd = f"mipdb list-datasets {monetdb_configs.convert_to_mipdb_format()}" + cmd = f"mipdb list-datasets {monetdb_configs.convert_to_mipdb_format()} --sqlite_db_path {TEST_DATA_FOLDER}/{worker_id}.db" res = subprocess.run( cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) @@ -389,36 +396,35 @@ def _load_data_monetdb_container(db_port, dataset_suffixes): datasets_per_data_model = {} # Load the test data folder into the dbs - data_model_folders = [ - TEST_DATA_FOLDER / folder for folder in os.listdir(TEST_DATA_FOLDER) - ] - for data_model_folder in data_model_folders: - with open(data_model_folder / "CDEsMetadata.json") as data_model_metadata_file: + for dirpath, dirnames, filenames in os.walk(TEST_DATA_FOLDER): + if "CDEsMetadata.json" not in filenames: + continue + cdes_file = os.path.join(dirpath, "CDEsMetadata.json") + with open(cdes_file) as data_model_metadata_file: data_model_metadata = json.load(data_model_metadata_file) data_model_code = data_model_metadata["code"] data_model_version = data_model_metadata["version"] data_model = f"{data_model_code}:{data_model_version}" - cdes_file = data_model_folder / "CDEsMetadata.json" print( f"\nLoading data model '{data_model_code}:{data_model_version}' metadata to database ({monetdb_configs.ip}:{monetdb_configs.port})" ) - cmd = f"mipdb add-data-model {cdes_file} {monetdb_configs.convert_to_mipdb_format()}" + cmd = f"mipdb add-data-model {cdes_file} {monetdb_configs.convert_to_mipdb_format()} --sqlite_db_path {TEST_DATA_FOLDER}/{worker_id}.db" subprocess.run( cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) csvs = sorted( [ - data_model_folder / file - for file in os.listdir(data_model_folder) + f"{dirpath}/{file}" + for file in filenames for suffix in dataset_suffixes if file.endswith(".csv") and str(suffix) in file ] ) for csv in csvs: - cmd = f"mipdb add-dataset {csv} -d {data_model_code} -v {data_model_version} {monetdb_configs.convert_to_mipdb_format()}" + cmd = f"mipdb add-dataset {csv} -d {data_model_code} -v {data_model_version} {monetdb_configs.convert_to_mipdb_format()} --sqlite_db_path {TEST_DATA_FOLDER}/{worker_id}.db" subprocess.run( cmd, shell=True, @@ -436,78 +442,76 @@ def _load_data_monetdb_container(db_port, dataset_suffixes): return datasets_per_data_model -def _remove_data_model_from_localworkertmp_monetdb(data_model_code, data_model_version): - # Remove data_model - cmd = f"mipdb delete-data-model {data_model_code} -v {data_model_version} -f --ip {COMMON_IP} --port {MONETDB_LOCALWORKERTMP_PORT} " - subprocess.run( - cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - - @pytest.fixture(scope="session") def init_data_globalworker(monetdb_globalworker): - _init_database_monetdb_container( - MONETDB_GLOBALWORKER_PORT, - ) + worker_config_file = GLOBALWORKER_CONFIG_FILE + worker_id = get_worker_id(worker_config_file) + _init_database_monetdb_container(MONETDB_GLOBALWORKER_PORT, worker_id) yield @pytest.fixture(scope="session") def load_data_localworker1(monetdb_localworker1): - _init_database_monetdb_container( - MONETDB_LOCALWORKER1_PORT, - ) + worker_config_file = LOCALWORKER1_CONFIG_FILE + worker_id = get_worker_id(worker_config_file) + _init_database_monetdb_container(MONETDB_GLOBALWORKER_PORT, worker_id) loaded_datasets_per_data_model = _load_data_monetdb_container( - MONETDB_LOCALWORKER1_PORT, DATASET_SUFFIXES_LOCALWORKER1 + MONETDB_LOCALWORKER1_PORT, DATASET_SUFFIXES_LOCALWORKER1, worker_id ) yield loaded_datasets_per_data_model @pytest.fixture(scope="session") def load_data_localworker2(monetdb_localworker2): - _init_database_monetdb_container( - MONETDB_LOCALWORKER2_PORT, - ) + worker_config_file = LOCALWORKER2_CONFIG_FILE + worker_id = get_worker_id(worker_config_file) + _init_database_monetdb_container(MONETDB_LOCALWORKER2_PORT, worker_id) loaded_datasets_per_data_model = _load_data_monetdb_container( - MONETDB_LOCALWORKER2_PORT, DATASET_SUFFIXES_LOCALWORKER2 + MONETDB_LOCALWORKER2_PORT, DATASET_SUFFIXES_LOCALWORKER2, worker_id ) yield loaded_datasets_per_data_model @pytest.fixture(scope="function") def load_data_localworkertmp(monetdb_localworkertmp): - _init_database_monetdb_container( - MONETDB_LOCALWORKERTMP_PORT, - ) + worker_config_file = LOCALWORKERTMP_CONFIG_FILE + worker_id = get_worker_id(worker_config_file) + _init_database_monetdb_container(MONETDB_LOCALWORKERTMP_PORT, worker_id) loaded_datasets_per_data_model = _load_data_monetdb_container( - MONETDB_LOCALWORKERTMP_PORT, DATASET_SUFFIXES_LOCALWORKERTMP + MONETDB_LOCALWORKERTMP_PORT, + DATASET_SUFFIXES_LOCALWORKERTMP, + worker_id, ) yield loaded_datasets_per_data_model @pytest.fixture(scope="session") def load_data_smpc_localworker1(monetdb_smpc_localworker1): - _init_database_monetdb_container( - MONETDB_SMPC_LOCALWORKER1_PORT, - ) + worker_config_file = LOCALWORKER1_SMPC_CONFIG_FILE + worker_id = get_worker_id(worker_config_file) + _init_database_monetdb_container(MONETDB_SMPC_LOCALWORKER1_PORT, worker_id) loaded_datasets_per_data_model = _load_data_monetdb_container( - MONETDB_SMPC_LOCALWORKER1_PORT, DATASET_SUFFIXES_SMPC_LOCALWORKER1 + MONETDB_SMPC_LOCALWORKER1_PORT, + DATASET_SUFFIXES_SMPC_LOCALWORKER1, + worker_id, ) yield loaded_datasets_per_data_model @pytest.fixture(scope="session") def load_data_smpc_localworker2(monetdb_smpc_localworker2): - _init_database_monetdb_container( - MONETDB_SMPC_LOCALWORKER2_PORT, - ) + worker_config_file = LOCALWORKER2_SMPC_CONFIG_FILE + worker_id = get_worker_id(worker_config_file) + _init_database_monetdb_container(MONETDB_SMPC_LOCALWORKER2_PORT, worker_id) loaded_datasets_per_data_model = _load_data_monetdb_container( - MONETDB_SMPC_LOCALWORKER2_PORT, DATASET_SUFFIXES_SMPC_LOCALWORKER2 + MONETDB_SMPC_LOCALWORKER2_PORT, + DATASET_SUFFIXES_SMPC_LOCALWORKER2, + worker_id, ) yield loaded_datasets_per_data_model -def _create_db_cursor(db_port, db_username="executor", db_password="executor"): +def _create_monetdb_cursor(db_port, db_username="executor", db_password="executor"): class MonetDBTesting: """MonetDB class used for testing.""" @@ -524,44 +528,63 @@ def execute(self, query, *args, **kwargs): return MonetDBTesting() +def _create_sqlite_cursor(worker_id): + class SqliteDBTesting: + """SqliteDBTesting class used for testing.""" + + def __init__(self) -> None: + self.url = f"{TEST_DATA_FOLDER}/{worker_id}.db" + + def execute(self, query): + conn = sqlite3.connect(f"{TEST_DATA_FOLDER}/{worker_id}.db") + cur = conn.cursor() + cur.execute(query) + cur.fetchall() + cur.close() + conn.commit() + conn.close() + + return SqliteDBTesting() + + @pytest.fixture(scope="session") -def globalworker_db_cursor_with_user_admin(): - return _create_db_cursor(MONETDB_GLOBALWORKER_PORT, "admin", "executor") +def globalworker_sqlite_db_cursor(): + return _create_sqlite_cursor("testglobalworker") @pytest.fixture(scope="session") def globalworker_db_cursor(): - return _create_db_cursor(MONETDB_GLOBALWORKER_PORT) + return _create_monetdb_cursor(MONETDB_GLOBALWORKER_PORT) @pytest.fixture(scope="session") def localworker1_db_cursor(): - return _create_db_cursor(MONETDB_LOCALWORKER1_PORT) + return _create_monetdb_cursor(MONETDB_LOCALWORKER1_PORT) @pytest.fixture(scope="session") def localworker2_db_cursor(): - return _create_db_cursor(MONETDB_LOCALWORKER2_PORT) + return _create_monetdb_cursor(MONETDB_LOCALWORKER2_PORT) @pytest.fixture(scope="session") def globalworker_smpc_db_cursor(): - return _create_db_cursor(MONETDB_SMPC_GLOBALWORKER_PORT) + return _create_monetdb_cursor(MONETDB_SMPC_GLOBALWORKER_PORT) @pytest.fixture(scope="session") def localworker1_smpc_db_cursor(): - return _create_db_cursor(MONETDB_SMPC_LOCALWORKER1_PORT) + return _create_monetdb_cursor(MONETDB_SMPC_LOCALWORKER1_PORT) @pytest.fixture(scope="session") def localworker2_smpc_db_cursor(): - return _create_db_cursor(MONETDB_SMPC_LOCALWORKER2_PORT) + return _create_monetdb_cursor(MONETDB_SMPC_LOCALWORKER2_PORT) @pytest.fixture(scope="function") def localworkertmp_db_cursor(): - return _create_db_cursor(MONETDB_LOCALWORKERTMP_PORT) + return _create_monetdb_cursor(MONETDB_LOCALWORKERTMP_PORT) def create_table_in_db( diff --git a/tests/standalone_tests/controller/services/worker_landscape_agreggator/test_federation_info_script.py b/tests/standalone_tests/controller/services/worker_landscape_agreggator/test_federation_info_script.py index e8578da74..553e4e117 100644 --- a/tests/standalone_tests/controller/services/worker_landscape_agreggator/test_federation_info_script.py +++ b/tests/standalone_tests/controller/services/worker_landscape_agreggator/test_federation_info_script.py @@ -18,36 +18,12 @@ ) from exareme2.worker_communication import WorkerInfo from tests.standalone_tests.conftest import MONETDB_LOCALWORKERTMP_PORT +from tests.standalone_tests.conftest import TEST_DATA_FOLDER from tests.standalone_tests.conftest import MonetDBConfigurations LOGFILE_NAME = "test_show_controller_audit_entries.out" -@pytest.mark.slow -@pytest.mark.very_slow -def test_show_worker_db_actions(monetdb_localworkertmp, load_data_localworkertmp): - """ - Load data into the db and then remove datamodel and datasets. - Assert that the logs produced with federation_info.py contain these changes. - """ - monet_db_confs = MonetDBConfigurations(port=MONETDB_LOCALWORKERTMP_PORT) - cmd = f'mipdb delete-data-model dementia -v "0.1" {monet_db_confs.convert_to_mipdb_format()} --force' - res = subprocess.run( - cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - assert res.returncode == 0 - - cmd = f"python3 federation_info.py show-worker-db-actions --port {MONETDB_LOCALWORKERTMP_PORT}" - res = subprocess.run( - cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - output = str(res.stdout) - assert re.match(r".*\\n.* - .* - ADD DATA MODEL - .* - .*\\n.*", output) - assert re.match(r".*\\n.* - .* - DELETE DATA MODEL - .* - .*\\n.*", output) - assert re.match(r".*\\n.* - .* - ADD DATASET - .* - .* - .* - .*\\n.*", output) - assert re.match(r".*\\n.* - .* - DELETE DATASET - .* - .* - .* - .*\\n.*", output) - - @pytest.fixture(scope="session") def controller_config_dict_mock(): controller_config = { diff --git a/tests/standalone_tests/worker/exareme2/test_monet_db_facade.py b/tests/standalone_tests/worker/exareme2/test_monet_db_facade.py index dc49df590..23e9d1d35 100644 --- a/tests/standalone_tests/worker/exareme2/test_monet_db_facade.py +++ b/tests/standalone_tests/worker/exareme2/test_monet_db_facade.py @@ -6,13 +6,12 @@ from pymonetdb import ProgrammingError from exareme2 import AttrDict +from exareme2.worker.exareme2.monetdb import monetdb_facade from exareme2.worker.exareme2.monetdb.monetdb_facade import _DBExecutionDTO from exareme2.worker.exareme2.monetdb.monetdb_facade import _execute_and_fetchall from exareme2.worker.exareme2.monetdb.monetdb_facade import ( _validate_exception_could_be_recovered, ) -from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_and_fetchall -from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_query from exareme2.worker.utils.logger import init_logger from tests.standalone_tests.conftest import COMMON_IP from tests.standalone_tests.conftest import MONETDB_LOCALWORKERTMP_NAME @@ -175,9 +174,9 @@ def test_db_execute_use_public_user_parameter( ): table_name = "local_user_table" - db_execute_query(query=f"create table {table_name} (col1 int);") + monetdb_facade.execute_query(query=f"create table {table_name} (col1 int);") with pytest.raises(OperationalError, match=r"no such table"): - db_execute_and_fetchall( + monetdb_facade.execute_and_fetchall( query=f"select * from {table_name};", use_public_user=True ) diff --git a/tests/standalone_tests/worker/worker_info/test_get_datasets_per_data_model.py b/tests/standalone_tests/worker/worker_info/test_get_datasets_per_data_model.py index 2079e0dad..7c309501f 100644 --- a/tests/standalone_tests/worker/worker_info/test_get_datasets_per_data_model.py +++ b/tests/standalone_tests/worker/worker_info/test_get_datasets_per_data_model.py @@ -18,24 +18,20 @@ def setup_data_table_in_db(datasets_per_data_model, cursor): data_model_id += 1 dataset_id += 1 data_model_code, data_model_version = data_model.split(":") - sql_query = f"""INSERT INTO "mipdb_metadata"."data_models" VALUES ({data_model_id}, '{data_model_code}', '{data_model_version}', '{label_identifier}', 'ENABLED', null);""" + sql_query = f"""INSERT INTO "data_models" (code, version, label, status, properties) VALUES ('{data_model_code}', '{data_model_version}', '{label_identifier}', 'ENABLED', null);""" cursor.execute(sql_query) for dataset_name in datasets_per_data_model[data_model]: dataset_id += 1 - sql_query = f"""INSERT INTO "mipdb_metadata"."datasets" VALUES ({dataset_id}, {data_model_id}, '{dataset_name}', '{label_identifier}', 'ENABLED', null);""" + sql_query = f"""INSERT INTO "datasets" (data_model_id, code, label, csv_path, status, properties) VALUES ({data_model_id}, '{dataset_name}', '{label_identifier}', 'csv_path', 'ENABLED', null);""" cursor.execute(sql_query) # The cleanup task cannot be used because it requires specific table name convention # that doesn't fit with the initial data table names def teardown_data_tables_in_db(cursor): - sql_query = ( - f"DELETE FROM mipdb_metadata.datasets WHERE label = '{label_identifier}';" - ) + sql_query = f"DELETE FROM datasets WHERE label = '{label_identifier}';" cursor.execute(sql_query) - sql_query = ( - f"DELETE FROM mipdb_metadata.data_models WHERE label = '{label_identifier}';" - ) + sql_query = f"DELETE FROM data_models WHERE label = '{label_identifier}';" cursor.execute(sql_query) @@ -80,12 +76,12 @@ def test_get_worker_datasets_per_data_model( globalworker_worker_service, globalworker_celery_app, use_globalworker_database, - globalworker_db_cursor_with_user_admin, + globalworker_sqlite_db_cursor, init_data_globalworker, ): request_id = "test_worker_info_" + uuid.uuid4().hex + "_request" setup_data_table_in_db( - expected_datasets_per_data_model, globalworker_db_cursor_with_user_admin + expected_datasets_per_data_model, globalworker_sqlite_db_cursor ) task_signature = get_celery_task_signature("get_worker_datasets_per_data_model") async_result = globalworker_celery_app.queue_task( @@ -108,4 +104,4 @@ def test_get_worker_datasets_per_data_model( expected_datasets_per_data_model[data_model] ) - teardown_data_tables_in_db(globalworker_db_cursor_with_user_admin) + teardown_data_tables_in_db(globalworker_sqlite_db_cursor)