Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

System tables datasets and data models are loaded from sqlite. #488

Merged
merged 1 commit into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions exareme2/algorithms/exareme2/descriptive_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
particular variable/dataset pair doesn't contribute to the global computation.
"""
import math
import warnings
from collections import Counter
from functools import reduce
from typing import Dict
Expand Down
1 change: 1 addition & 0 deletions exareme2/worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ WORKER_IDENTIFIER=globalworker
WORKER_ROLE=GLOBALWORKER
LOG_LEVEL=INFO
FRAMEWORK_LOG_LEVEL=INFO
SQLITE_DB_NAME=sqlite
DATA_PATH=/opt/data/
CONTROLLER_IP=172.17.0.1
CONTROLLER_PORT=5000
Expand Down
2 changes: 2 additions & 0 deletions exareme2/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
if config_file := os.getenv("EXAREME2_WORKER_CONFIG_FILE"):
with open(config_file) as fp:
config = AttrDict(envtoml.load(fp))
config.sqlite = AttrDict({})
config.data_path = TEST_DATA_FOLDER
config.sqlite.db_name = config.identifier
else:
with open_text(worker, "config.toml") as fp:
config = AttrDict(envtoml.load(fp))
3 changes: 3 additions & 0 deletions exareme2/worker/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ user = "user"
password = "password"
vhost = "user_vhost"

[sqlite]
db_name = "$SQLITE_DB_NAME"

[monetdb]
ip = "$MONETDB_IP"
port = "$MONETDB_PORT"
Expand Down
7 changes: 3 additions & 4 deletions exareme2/worker/exareme2/cleanup/cleanup_db.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from typing import Dict
from typing import List

from exareme2.worker.exareme2.monetdb import monetdb_facade
from exareme2.worker.exareme2.monetdb.guard import sql_injection_guard
from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_and_fetchall
from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_query
from exareme2.worker.exareme2.tables.tables_db import get_tables_by_type
from exareme2.worker_communication import TableType

Expand All @@ -24,7 +23,7 @@ def drop_db_artifacts_by_context_id(context_id: str):
udfs_deletion_query = _get_drop_udfs_query(function_names)
table_names_by_type = get_tables_by_type(context_id)
tables_deletion_query = _get_drop_tables_query(table_names_by_type)
db_execute_query(udfs_deletion_query + tables_deletion_query)
monetdb_facade.execute_query(udfs_deletion_query + tables_deletion_query)


@sql_injection_guard(context_id=str.isalnum)
Expand All @@ -37,7 +36,7 @@ def _get_function_names_query_by_context_id(context_id: str) -> List[str]:
context_id : str
The id of the experiment
"""
result = db_execute_and_fetchall(
result = monetdb_facade.execute_and_fetchall(
f"""
SELECT name FROM functions
WHERE name LIKE '%{context_id.lower()}%'
Expand Down
6 changes: 3 additions & 3 deletions exareme2/worker/exareme2/monetdb/monetdb_facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Config:
allow_mutation = False


def db_execute_and_fetchall(
def execute_and_fetchall(
query: str, parameters=None, use_public_user: bool = False
) -> List:
query_execution_timeout = worker_config.celery.tasks_timeout
Expand All @@ -43,7 +43,7 @@ def db_execute_and_fetchall(
return _execute_and_fetchall(db_execution_dto=db_execution_dto)


def db_execute_query(query: str, parameters=None, use_public_user: bool = False):
def execute_query(query: str, parameters=None, use_public_user: bool = False):
query_execution_timeout = worker_config.celery.tasks_timeout
query = convert_to_idempotent(query)
db_execution_dto = _DBExecutionDTO(
Expand All @@ -55,7 +55,7 @@ def db_execute_query(query: str, parameters=None, use_public_user: bool = False)
_execute(db_execution_dto=db_execution_dto, lock=query_execution_lock)


def db_execute_udf(query: str, parameters=None):
def execute_udf(query: str, parameters=None):
# Check if there is only one query
split_queries = [query for query in query.strip().split(";") if query]
if len(split_queries) > 1:
Expand Down
21 changes: 10 additions & 11 deletions exareme2/worker/exareme2/tables/tables_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@

from exareme2 import DType
from exareme2.worker import config as worker_config
from exareme2.worker.exareme2.monetdb import monetdb_facade
from exareme2.worker.exareme2.monetdb.guard import is_list_of_identifiers
from exareme2.worker.exareme2.monetdb.guard import is_socket_address
from exareme2.worker.exareme2.monetdb.guard import is_valid_table_schema
from exareme2.worker.exareme2.monetdb.guard import sql_injection_guard
from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_and_fetchall
from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_query
from exareme2.worker_communication import ColumnData
from exareme2.worker_communication import ColumnDataBinary
from exareme2.worker_communication import ColumnDataFloat
Expand Down Expand Up @@ -71,7 +70,7 @@ def get_table_names(table_type: TableType, context_id: str) -> List[str]:
List[str]
A list of table names.
"""
table_names = db_execute_and_fetchall(
table_names = monetdb_facade.execute_and_fetchall(
f"""
SELECT name FROM tables
WHERE
Expand Down Expand Up @@ -110,7 +109,7 @@ def get_table_schema(table_name: str) -> TableSchema:
TableSchema
A schema which is TableSchema object.
"""
schema = db_execute_and_fetchall(
schema = monetdb_facade.execute_and_fetchall(
f"""
SELECT columns.name, columns.type
FROM columns
Expand Down Expand Up @@ -150,7 +149,7 @@ def get_table_type(table_name: str) -> TableType:
The type of the table.
"""

monetdb_table_type_result = db_execute_and_fetchall(
monetdb_table_type_result = monetdb_facade.execute_and_fetchall(
f"""
SELECT type
FROM
Expand All @@ -168,7 +167,7 @@ def get_table_type(table_name: str) -> TableType:
@sql_injection_guard(table_name=str.isidentifier, table_schema=is_valid_table_schema)
def create_table(table_name: str, table_schema: TableSchema):
columns_schema = convert_schema_to_sql_query_format(table_schema)
db_execute_query(f"CREATE TABLE {table_name} ( {columns_schema} )")
monetdb_facade.execute_query(f"CREATE TABLE {table_name} ( {columns_schema} )")


@sql_injection_guard(
Expand All @@ -191,7 +190,7 @@ def create_merge_table(
merge_table_query += f"ALTER TABLE {table_name} ADD TABLE {name.lower()}; "

try:
db_execute_query(merge_table_query)
monetdb_facade.execute_query(merge_table_query)
except (
pymonetdb.exceptions.ProgrammingError or pymonetdb.exceptions.OperationalError
) as exc:
Expand Down Expand Up @@ -220,7 +219,7 @@ def create_remote_table(
public_password: str,
):
columns_schema = convert_schema_to_sql_query_format(schema)
db_execute_query(
monetdb_facade.execute_query(
f"""
CREATE REMOTE TABLE {table_name}
( {columns_schema}) ON 'mapi:monetdb://{monetdb_socket_address}/db/{table_creator_username}/{table_name}'
Expand Down Expand Up @@ -256,7 +255,7 @@ def get_table_data(table_name: str, use_public_user: bool = True) -> List[Column
worker_config.monetdb.local_username
) # The db local user, on whose namespace the tables are on.

row_stored_data = db_execute_and_fetchall(
row_stored_data = monetdb_facade.execute_and_fetchall(
f"SELECT * FROM {db_local_username}.{table_name}",
use_public_user=use_public_user,
)
Expand Down Expand Up @@ -292,7 +291,7 @@ def insert_data_to_table(
query = f"INSERT INTO {table_name} VALUES {placeholders}"

# Execute the query with the parameters
db_execute_query(query, parameters)
monetdb_facade.execute_query(query, parameters)


def _convert_column_stored_data_to_column_data_objects(
Expand Down Expand Up @@ -393,7 +392,7 @@ def get_tables_by_type(context_id: str) -> Dict[TableType, List[str]]:
context_id : str
The id of the experiment
"""
table_names_and_types = db_execute_and_fetchall(
table_names_and_types = monetdb_facade.execute_and_fetchall(
f"""
SELECT name, type FROM tables
WHERE name LIKE '%{context_id.lower()}%'
Expand Down
7 changes: 3 additions & 4 deletions exareme2/worker/exareme2/udfs/udfs_db.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from typing import List

from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_query
from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_udf
from exareme2.worker.exareme2.monetdb import monetdb_facade


def run_udf(udf_defenitions: List[str], udf_exec_stmt):
db_execute_query(";\n".join(udf_defenitions))
db_execute_udf(udf_exec_stmt)
monetdb_facade.execute_query(";\n".join(udf_defenitions))
monetdb_facade.execute_udf(udf_exec_stmt)
9 changes: 4 additions & 5 deletions exareme2/worker/exareme2/views/views_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
from typing import Optional

from exareme2.data_filters import build_filter_clause
from exareme2.worker.exareme2.monetdb import monetdb_facade
from exareme2.worker.exareme2.monetdb.guard import is_list_of_identifiers
from exareme2.worker.exareme2.monetdb.guard import is_primary_data_table
from exareme2.worker.exareme2.monetdb.guard import is_valid_filter
from exareme2.worker.exareme2.monetdb.guard import sql_injection_guard
from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_and_fetchall
from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_query
from exareme2.worker.exareme2.tables.tables_db import get_table_names
from exareme2.worker.exareme2.tables.tables_db import get_table_schema
from exareme2.worker_communication import ColumnInfo
Expand Down Expand Up @@ -49,9 +48,9 @@ def create_view(
{filter_clause}
"""

db_execute_query(view_creation_query)
monetdb_facade.execute_query(view_creation_query)

view_rows_query_result = db_execute_and_fetchall(
view_rows_query_result = monetdb_facade.execute_and_fetchall(
f"""
SELECT COUNT(*)
FROM {view_name}
Expand All @@ -61,7 +60,7 @@ def create_view(
view_rows_count = view_rows_result_row[0]

if view_rows_count < 1 or (check_min_rows and view_rows_count < minimum_row_count):
db_execute_query(f"""DROP VIEW {view_name}""")
monetdb_facade.execute_query(f"""DROP VIEW {view_name}""")
raise InsufficientDataError(
f"Query: {view_creation_query} creates an "
f"insufficient data view. ({view_name=} has been dropped)"
Expand Down
16 changes: 16 additions & 0 deletions exareme2/worker/worker_info/sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import sqlite3
KFilippopolitis marked this conversation as resolved.
Show resolved Hide resolved
from typing import List

from exareme2.worker import config as worker_config

CONN = sqlite3.connect(
f"{str(worker_config.data_path)}/{worker_config.sqlite.db_name}.db"
)


def execute_and_fetchall(query) -> List:
KFilippopolitis marked this conversation as resolved.
Show resolved Hide resolved
cur = CONN.cursor()
cur.execute(query)
result = cur.fetchall()
cur.close()
return result
22 changes: 11 additions & 11 deletions exareme2/worker/worker_info/worker_info_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from exareme2.worker.exareme2.monetdb.guard import is_datamodel
from exareme2.worker.exareme2.monetdb.guard import sql_injection_guard
from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_and_fetchall
from exareme2.worker.worker_info import sqlite
from exareme2.worker_communication import CommonDataElement
from exareme2.worker_communication import CommonDataElements
from exareme2.worker_communication import DataModelAttributes
Expand All @@ -22,9 +22,9 @@ def get_data_models() -> List[str]:
The data_models.
"""

data_models_code_and_version = db_execute_and_fetchall(
data_models_code_and_version = sqlite.execute_and_fetchall(
f"""SELECT code, version
FROM "mipdb_metadata"."data_models"
FROM data_models
WHERE status = 'ENABLED'
"""
)
Expand All @@ -46,14 +46,14 @@ def get_dataset_code_per_dataset_label(data_model: str) -> Dict[str, str]:
"""
data_model_code, data_model_version = data_model.split(":")

datasets_rows = db_execute_and_fetchall(
datasets_rows = sqlite.execute_and_fetchall(
f"""
SELECT code, label
FROM "mipdb_metadata"."datasets"
FROM datasets
WHERE data_model_id =
(
SELECT data_model_id
FROM "mipdb_metadata"."data_models"
FROM data_models
WHERE code = '{data_model_code}'
AND version = '{data_model_version}'
)
Expand All @@ -76,9 +76,9 @@ def get_data_model_cdes(data_model: str) -> CommonDataElements:
"""
data_model_code, data_model_version = data_model.split(":")

cdes_rows = db_execute_and_fetchall(
cdes_rows = sqlite.execute_and_fetchall(
f"""
SELECT code, metadata FROM "{data_model_code}:{data_model_version}"."variables_metadata"
SELECT code, metadata FROM "{data_model_code}:{data_model_version}_variables_metadata"
"""
)

Expand All @@ -102,10 +102,10 @@ def get_data_model_attributes(data_model: str) -> DataModelAttributes:
"""
data_model_code, data_model_version = data_model.split(":")

attributes = db_execute_and_fetchall(
attributes = sqlite.execute_and_fetchall(
f"""
SELECT properties
FROM "mipdb_metadata"."data_models"
FROM data_models
WHERE code = '{data_model_code}'
AND version = '{data_model_version}'
"""
Expand All @@ -121,5 +121,5 @@ def check_database_connection():
"""
Check that the connection with the database is working.
"""
result = db_execute_and_fetchall(f"SELECT '{HEALTHCHECK_VALIDATION_STRING}'")
result = sqlite.execute_and_fetchall(f"SELECT '{HEALTHCHECK_VALIDATION_STRING}'")
assert result[0][0] == HEALTHCHECK_VALIDATION_STRING
31 changes: 0 additions & 31 deletions federation_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@
DB_USERNAME = "admin"
DB_PASSWORD = "executor"
DB_FARM = "db"
DB_METADATA_SCHEMA = "mipdb_metadata"
ACTIONS_TABLE = "actions"
ADD_DATA_MODEL_ACTION_CODE = "ADD DATA MODEL"
DELETE_DATA_MODEL_ACTION_CODE = "DELETE DATA MODEL"
ADD_DATASET_ACTION_CODE = "ADD DATASET"
DELETE_DATASET_ACTION_CODE = "DELETE DATASET"


@contextmanager
Expand Down Expand Up @@ -43,31 +37,6 @@ def cli():
pass


@cli.command()
@click.option("--ip", default="127.0.0.1", help="The ip of the database.")
@click.option("--port", default=50000, type=int, help="The port of the database.")
def show_worker_db_actions(ip, port):
with db_cursor(ip, port) as cur:
cur.execute(f"select * from {DB_METADATA_SCHEMA}.{ACTIONS_TABLE};")
results = cur.fetchall()
for _, action_str in results:
action = json.loads(action_str)
if (
action["action"] == ADD_DATA_MODEL_ACTION_CODE
or action["action"] == DELETE_DATA_MODEL_ACTION_CODE
):
print(
f"{action['date']} - {action['user']} - {action['action']} - {action['data_model_code']}:{action['data_model_version']} - {action['data_model_label']}"
)
elif (
action["action"] == ADD_DATASET_ACTION_CODE
or action["action"] == DELETE_DATASET_ACTION_CODE
):
print(
f"{action['date']} - {action['user']} - {action['action']} - {action['dataset_code']} - {action['dataset_label']} - {action['data_model_code']}:{action['data_model_version']} - {action['data_model_label']}"
)


LOG_FILE_CHUNK_SIZE = 1024 # Will read the logfile in chunks
TIMESTAMP_REGEX = (
r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}" # 2022-04-13 18:25:22,875
Expand Down
8 changes: 8 additions & 0 deletions kubernetes/templates/exareme2-localnode.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: SQLITE_DB_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: DB_PORT
value: "50000"
volumeMounts:
Expand Down Expand Up @@ -175,6 +179,10 @@ spec:
value: "guest"
- name: MONETDB_PUBLIC_PASSWORD
value: "guest"
- name: SQLITE_DB_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: SMPC_ENABLED
value: {{ quote .Values.smpc.enabled }}
{{ if .Values.smpc.enabled }}
Expand Down
Loading
Loading