Skip to content

Commit

Permalink
System tables datasets and data models are loaded from sqlite.
Browse files Browse the repository at this point in the history
  • Loading branch information
KFilippopolitis committed Jun 26, 2024
1 parent de748c5 commit 79c050f
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 153 deletions.
16 changes: 16 additions & 0 deletions exareme2/worker/worker_info/sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import sqlite3
from typing import List

from exareme2.worker import config as worker_config


def execute_and_fetchall(query) -> List:
conn = sqlite3.connect(
f"{str(worker_config.data_path)}/{worker_config.identifier}.db"
)
cur = conn.cursor()
cur.execute(query)
result = cur.fetchall()
cur.close()
conn.close()
return result
22 changes: 11 additions & 11 deletions exareme2/worker/worker_info/worker_info_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from exareme2.worker.exareme2.monetdb.guard import is_datamodel
from exareme2.worker.exareme2.monetdb.guard import sql_injection_guard
from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_and_fetchall
from exareme2.worker.worker_info.sqlite import execute_and_fetchall
from exareme2.worker_communication import CommonDataElement
from exareme2.worker_communication import CommonDataElements
from exareme2.worker_communication import DataModelAttributes
Expand All @@ -22,9 +22,9 @@ def get_data_models() -> List[str]:
The data_models.
"""

data_models_code_and_version = db_execute_and_fetchall(
data_models_code_and_version = execute_and_fetchall(
f"""SELECT code, version
FROM "mipdb_metadata"."data_models"
FROM data_models
WHERE status = 'ENABLED'
"""
)
Expand All @@ -46,14 +46,14 @@ def get_dataset_code_per_dataset_label(data_model: str) -> Dict[str, str]:
"""
data_model_code, data_model_version = data_model.split(":")

datasets_rows = db_execute_and_fetchall(
datasets_rows = execute_and_fetchall(
f"""
SELECT code, label
FROM "mipdb_metadata"."datasets"
FROM datasets
WHERE data_model_id =
(
SELECT data_model_id
FROM "mipdb_metadata"."data_models"
FROM data_models
WHERE code = '{data_model_code}'
AND version = '{data_model_version}'
)
Expand All @@ -76,9 +76,9 @@ def get_data_model_cdes(data_model: str) -> CommonDataElements:
"""
data_model_code, data_model_version = data_model.split(":")

cdes_rows = db_execute_and_fetchall(
cdes_rows = execute_and_fetchall(
f"""
SELECT code, metadata FROM "{data_model_code}:{data_model_version}"."variables_metadata"
SELECT code, metadata FROM "{data_model_code}:{data_model_version}_variables_metadata"
"""
)

Expand All @@ -102,10 +102,10 @@ def get_data_model_attributes(data_model: str) -> DataModelAttributes:
"""
data_model_code, data_model_version = data_model.split(":")

attributes = db_execute_and_fetchall(
attributes = execute_and_fetchall(
f"""
SELECT properties
FROM "mipdb_metadata"."data_models"
FROM data_models
WHERE code = '{data_model_code}'
AND version = '{data_model_version}'
"""
Expand All @@ -121,5 +121,5 @@ def check_database_connection():
"""
Check that the connection with the database is working.
"""
result = db_execute_and_fetchall(f"SELECT '{HEALTHCHECK_VALIDATION_STRING}'")
result = execute_and_fetchall(f"SELECT '{HEALTHCHECK_VALIDATION_STRING}'")
assert result[0][0] == HEALTHCHECK_VALIDATION_STRING
31 changes: 0 additions & 31 deletions federation_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@
DB_USERNAME = "admin"
DB_PASSWORD = "executor"
DB_FARM = "db"
DB_METADATA_SCHEMA = "mipdb_metadata"
ACTIONS_TABLE = "actions"
ADD_DATA_MODEL_ACTION_CODE = "ADD DATA MODEL"
DELETE_DATA_MODEL_ACTION_CODE = "DELETE DATA MODEL"
ADD_DATASET_ACTION_CODE = "ADD DATASET"
DELETE_DATASET_ACTION_CODE = "DELETE DATASET"


@contextmanager
Expand Down Expand Up @@ -43,31 +37,6 @@ def cli():
pass


@cli.command()
@click.option("--ip", default="127.0.0.1", help="The ip of the database.")
@click.option("--port", default=50000, type=int, help="The port of the database.")
def show_worker_db_actions(ip, port):
with db_cursor(ip, port) as cur:
cur.execute(f"select * from {DB_METADATA_SCHEMA}.{ACTIONS_TABLE};")
results = cur.fetchall()
for _, action_str in results:
action = json.loads(action_str)
if (
action["action"] == ADD_DATA_MODEL_ACTION_CODE
or action["action"] == DELETE_DATA_MODEL_ACTION_CODE
):
print(
f"{action['date']} - {action['user']} - {action['action']} - {action['data_model_code']}:{action['data_model_version']} - {action['data_model_label']}"
)
elif (
action["action"] == ADD_DATASET_ACTION_CODE
or action["action"] == DELETE_DATASET_ACTION_CODE
):
print(
f"{action['date']} - {action['user']} - {action['action']} - {action['dataset_code']} - {action['dataset_label']} - {action['data_model_code']}:{action['data_model_version']} - {action['data_model_label']}"
)


LOG_FILE_CHUNK_SIZE = 1024 # Will read the logfile in chunks
TIMESTAMP_REGEX = (
r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}" # 2022-04-13 18:25:22,875
Expand Down
4 changes: 4 additions & 0 deletions kubernetes/templates/exareme2-localnode.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ spec:
- name: db-importer
image: {{ .Values.exareme2_images.repository }}/exareme2_mipdb:{{ .Values.exareme2_images.version }}
env:
- name: WORKER_IDENTIFIER
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: DB_IP
valueFrom:
fieldRef:
Expand Down
4 changes: 2 additions & 2 deletions mipdb/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ WORKDIR $DATA_PATH
#######################################################
# Installing dependencies
#######################################################
RUN pip install mipdb==2.4.7 # Must be updated together with pyproject.toml
RUN pip install mipdb==3.0.0 # Must be updated together with pyproject.toml
RUN pip install click==8.1.2
RUN pip install pymonetdb==1.6.3 # Must be updated together with pyproject.toml

Expand Down Expand Up @@ -60,4 +60,4 @@ VOLUME $CREDENTIALS_CONFIG_FOLDER
COPY mipdb/bootstrap.sh /home/bootstrap.sh
RUN chmod 775 /home/bootstrap.sh

CMD ["sh", "-c", "/home/bootstrap.sh"]
CMD ["sh", "-c", "/home/bootstrap.sh"]
5 changes: 3 additions & 2 deletions mipdb/bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ MONETDB_ADMIN_USERNAME = \"$MONETDB_ADMIN_USERNAME\"
MONETDB_LOCAL_USERNAME = \"$MONETDB_LOCAL_USERNAME\"
MONETDB_LOCAL_PASSWORD = \"$MONETDB_LOCAL_PASSWORD\"
MONETDB_PUBLIC_USERNAME = \"$MONETDB_PUBLIC_USERNAME\"
MONETDB_PUBLIC_PASSWORD = \"$MONETDB_PUBLIC_PASSWORD\"" > "/home/config.toml"
tail -f /dev/null
MONETDB_PUBLIC_PASSWORD = \"$MONETDB_PUBLIC_PASSWORD\"
SQLITE_DB_PATH = \"$DATA_PATH/$WORKER_IDENTIFIER.db\"" > "/home/config.toml"
tail -f /dev/null
10 changes: 5 additions & 5 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ hypothesis = "~6.81"
pytest-rerunfailures = "~12.0"

[tool.poetry.group.dev.dependencies]
mipdb = "2.4.7" # Must be updated together with mipdb Dockerfile
mipdb = "3.0.0" # Must be updated together with mipdb Dockerfile

[tool.pytest.ini_options]
markers = [
Expand Down
47 changes: 31 additions & 16 deletions tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import copy
import itertools
import json
import os
import pathlib
import shutil
import sys
Expand Down Expand Up @@ -392,7 +393,7 @@ def init_monetdb(c, port):
f"Initializing MonetDB with mipdb in port: {port}...",
Level.HEADER,
)
cmd = f"""poetry run mipdb init --ip 127.0.0.1 {get_monetdb_configs_in_mipdb_format(port)}"""
cmd = f"""poetry run mipdb init {get_sqlite_path(port)}"""
run(c, cmd)


Expand Down Expand Up @@ -437,38 +438,36 @@ def load_data(c, use_sockets=False, port=None):

if len(local_worker_ports) == 1:
port = local_worker_ports[0]
cmd = f"poetry run mipdb load-folder {TEST_DATA_FOLDER} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)}"
cmd = f"poetry run mipdb load-folder {TEST_DATA_FOLDER} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(port)}"
message(
f"Loading the folder '{TEST_DATA_FOLDER}' in MonetDB at port {local_worker_ports[0]}...",
Level.HEADER,
)
run(c, cmd)
return

# Load the test data folder into the dbs
data_model_folders = [
TEST_DATA_FOLDER / folder for folder in listdir(TEST_DATA_FOLDER)
]
for data_model_folder in data_model_folders:
for dirpath, dirnames, filenames in os.walk(TEST_DATA_FOLDER):
if "CDEsMetadata.json" not in filenames:
continue
cdes_file = os.path.join(dirpath, "CDEsMetadata.json")
# Load all data models in each db
with open(data_model_folder / "CDEsMetadata.json") as data_model_metadata_file:
with open(cdes_file) as data_model_metadata_file:
data_model_metadata = json.load(data_model_metadata_file)
data_model_code = data_model_metadata["code"]
data_model_version = data_model_metadata["version"]
cdes_file = data_model_folder / "CDEsMetadata.json"
for port in local_worker_ports:
message(
f"Loading data model '{data_model_code}:{data_model_version}' metadata in MonetDB at port {port}...",
Level.HEADER,
)
cmd = f"poetry run mipdb add-data-model {cdes_file} {get_monetdb_configs_in_mipdb_format(port)}"
cmd = f"poetry run mipdb add-data-model {cdes_file} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(port)}"
run(c, cmd)

# Load only the 1st csv of each dataset "with 0 suffix" in the 1st worker
first_worker_csvs = sorted(
[
data_model_folder / file
for file in listdir(data_model_folder)
f"{dirpath}/{file}"
for file in filenames
if file.endswith("0.csv") and not file.endswith("10.csv")
]
)
Expand All @@ -478,14 +477,14 @@ def load_data(c, use_sockets=False, port=None):
f"Loading dataset {pathlib.PurePath(csv).name} in MonetDB at port {port}...",
Level.HEADER,
)
cmd = f"poetry run mipdb add-dataset {csv} -d {data_model_code} -v {data_model_version} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)}"
cmd = f"poetry run mipdb add-dataset {csv} -d {data_model_code} -v {data_model_version} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(port)}"
run(c, cmd)

# Load the data model's remaining csvs in the rest of the workers with round-robin fashion
remaining_csvs = sorted(
[
data_model_folder / file
for file in listdir(data_model_folder)
f"{dirpath}/{file}"
for file in filenames
if file.endswith(".csv") and not file.endswith("0.csv")
]
)
Expand All @@ -499,10 +498,26 @@ def load_data(c, use_sockets=False, port=None):
f"Loading dataset {pathlib.PurePath(csv).name} in MonetDB at port {port}...",
Level.HEADER,
)
cmd = f"poetry run mipdb add-dataset {csv} -d {data_model_code} -v {data_model_version} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)}"
cmd = f"poetry run mipdb add-dataset {csv} -d {data_model_code} -v {data_model_version} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(port)}"
run(c, cmd)


def get_sqlite_path(port):
config_files = [WORKERS_CONFIG_DIR / file for file in listdir(WORKERS_CONFIG_DIR)]
for worker_config_file in config_files:
with open(worker_config_file) as fp:
worker_config = toml.load(fp)

if worker_config["role"] == "LOCALWORKER" and str(
worker_config["monetdb"]["port"]
) == str(port):
return (
f"--sqlite_db_path {TEST_DATA_FOLDER}/{worker_config['identifier']}.db"
)
else:
raise ValueError(f"There is no database with port:{port}")


def get_monetdb_configs_in_mipdb_format(port):
return (
f"--ip 127.0.0.1 "
Expand Down
Loading

0 comments on commit 79c050f

Please sign in to comment.