Skip to content

Commit

Permalink
System tables datasets and data models are loaded from sqlite.
Browse files Browse the repository at this point in the history
  • Loading branch information
KFilippopolitis committed Jun 30, 2024
1 parent de748c5 commit e80f8e9
Show file tree
Hide file tree
Showing 22 changed files with 203 additions and 194 deletions.
1 change: 1 addition & 0 deletions exareme2/algorithms/exareme2/descriptive_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
particular variable/dataset pair doesn't contribute to the global computation.
"""
import math
import warnings
from collections import Counter
from functools import reduce
from typing import Dict
Expand Down
2 changes: 2 additions & 0 deletions exareme2/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
if config_file := os.getenv("EXAREME2_WORKER_CONFIG_FILE"):
with open(config_file) as fp:
config = AttrDict(envtoml.load(fp))
config.sqlite = AttrDict({})
config.data_path = TEST_DATA_FOLDER
config.sqlite.db_name = config.identifier
else:
with open_text(worker, "config.toml") as fp:
config = AttrDict(envtoml.load(fp))
3 changes: 3 additions & 0 deletions exareme2/worker/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ user = "user"
password = "password"
vhost = "user_vhost"

[sqlite]
db_name = "$SQLITE_DB_NAME"

[monetdb]
ip = "$MONETDB_IP"
port = "$MONETDB_PORT"
Expand Down
7 changes: 3 additions & 4 deletions exareme2/worker/exareme2/cleanup/cleanup_db.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from typing import Dict
from typing import List

from exareme2.worker.exareme2.monetdb import monetdb_facade
from exareme2.worker.exareme2.monetdb.guard import sql_injection_guard
from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_and_fetchall
from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_query
from exareme2.worker.exareme2.tables.tables_db import get_tables_by_type
from exareme2.worker_communication import TableType

Expand All @@ -24,7 +23,7 @@ def drop_db_artifacts_by_context_id(context_id: str):
udfs_deletion_query = _get_drop_udfs_query(function_names)
table_names_by_type = get_tables_by_type(context_id)
tables_deletion_query = _get_drop_tables_query(table_names_by_type)
db_execute_query(udfs_deletion_query + tables_deletion_query)
monetdb_facade.execute_query(udfs_deletion_query + tables_deletion_query)


@sql_injection_guard(context_id=str.isalnum)
Expand All @@ -37,7 +36,7 @@ def _get_function_names_query_by_context_id(context_id: str) -> List[str]:
context_id : str
The id of the experiment
"""
result = db_execute_and_fetchall(
result = monetdb_facade.execute_and_fetchall(
f"""
SELECT name FROM functions
WHERE name LIKE '%{context_id.lower()}%'
Expand Down
6 changes: 3 additions & 3 deletions exareme2/worker/exareme2/monetdb/monetdb_facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Config:
allow_mutation = False


def db_execute_and_fetchall(
def execute_and_fetchall(
query: str, parameters=None, use_public_user: bool = False
) -> List:
query_execution_timeout = worker_config.celery.tasks_timeout
Expand All @@ -43,7 +43,7 @@ def db_execute_and_fetchall(
return _execute_and_fetchall(db_execution_dto=db_execution_dto)


def db_execute_query(query: str, parameters=None, use_public_user: bool = False):
def execute_query(query: str, parameters=None, use_public_user: bool = False):
query_execution_timeout = worker_config.celery.tasks_timeout
query = convert_to_idempotent(query)
db_execution_dto = _DBExecutionDTO(
Expand All @@ -55,7 +55,7 @@ def db_execute_query(query: str, parameters=None, use_public_user: bool = False)
_execute(db_execution_dto=db_execution_dto, lock=query_execution_lock)


def db_execute_udf(query: str, parameters=None):
def execute_udf(query: str, parameters=None):
# Check if there is only one query
split_queries = [query for query in query.strip().split(";") if query]
if len(split_queries) > 1:
Expand Down
21 changes: 10 additions & 11 deletions exareme2/worker/exareme2/tables/tables_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@

from exareme2 import DType
from exareme2.worker import config as worker_config
from exareme2.worker.exareme2.monetdb import monetdb_facade
from exareme2.worker.exareme2.monetdb.guard import is_list_of_identifiers
from exareme2.worker.exareme2.monetdb.guard import is_socket_address
from exareme2.worker.exareme2.monetdb.guard import is_valid_table_schema
from exareme2.worker.exareme2.monetdb.guard import sql_injection_guard
from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_and_fetchall
from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_query
from exareme2.worker_communication import ColumnData
from exareme2.worker_communication import ColumnDataBinary
from exareme2.worker_communication import ColumnDataFloat
Expand Down Expand Up @@ -71,7 +70,7 @@ def get_table_names(table_type: TableType, context_id: str) -> List[str]:
List[str]
A list of table names.
"""
table_names = db_execute_and_fetchall(
table_names = monetdb_facade.execute_and_fetchall(
f"""
SELECT name FROM tables
WHERE
Expand Down Expand Up @@ -110,7 +109,7 @@ def get_table_schema(table_name: str) -> TableSchema:
TableSchema
A schema which is TableSchema object.
"""
schema = db_execute_and_fetchall(
schema = monetdb_facade.execute_and_fetchall(
f"""
SELECT columns.name, columns.type
FROM columns
Expand Down Expand Up @@ -150,7 +149,7 @@ def get_table_type(table_name: str) -> TableType:
The type of the table.
"""

monetdb_table_type_result = db_execute_and_fetchall(
monetdb_table_type_result = monetdb_facade.execute_and_fetchall(
f"""
SELECT type
FROM
Expand All @@ -168,7 +167,7 @@ def get_table_type(table_name: str) -> TableType:
@sql_injection_guard(table_name=str.isidentifier, table_schema=is_valid_table_schema)
def create_table(table_name: str, table_schema: TableSchema):
columns_schema = convert_schema_to_sql_query_format(table_schema)
db_execute_query(f"CREATE TABLE {table_name} ( {columns_schema} )")
monetdb_facade.execute_query(f"CREATE TABLE {table_name} ( {columns_schema} )")


@sql_injection_guard(
Expand All @@ -191,7 +190,7 @@ def create_merge_table(
merge_table_query += f"ALTER TABLE {table_name} ADD TABLE {name.lower()}; "

try:
db_execute_query(merge_table_query)
monetdb_facade.execute_query(merge_table_query)
except (
pymonetdb.exceptions.ProgrammingError or pymonetdb.exceptions.OperationalError
) as exc:
Expand Down Expand Up @@ -220,7 +219,7 @@ def create_remote_table(
public_password: str,
):
columns_schema = convert_schema_to_sql_query_format(schema)
db_execute_query(
monetdb_facade.execute_query(
f"""
CREATE REMOTE TABLE {table_name}
( {columns_schema}) ON 'mapi:monetdb://{monetdb_socket_address}/db/{table_creator_username}/{table_name}'
Expand Down Expand Up @@ -256,7 +255,7 @@ def get_table_data(table_name: str, use_public_user: bool = True) -> List[Column
worker_config.monetdb.local_username
) # The db local user, on whose namespace the tables are on.

row_stored_data = db_execute_and_fetchall(
row_stored_data = monetdb_facade.execute_and_fetchall(
f"SELECT * FROM {db_local_username}.{table_name}",
use_public_user=use_public_user,
)
Expand Down Expand Up @@ -292,7 +291,7 @@ def insert_data_to_table(
query = f"INSERT INTO {table_name} VALUES {placeholders}"

# Execute the query with the parameters
db_execute_query(query, parameters)
monetdb_facade.execute_query(query, parameters)


def _convert_column_stored_data_to_column_data_objects(
Expand Down Expand Up @@ -393,7 +392,7 @@ def get_tables_by_type(context_id: str) -> Dict[TableType, List[str]]:
context_id : str
The id of the experiment
"""
table_names_and_types = db_execute_and_fetchall(
table_names_and_types = monetdb_facade.execute_and_fetchall(
f"""
SELECT name, type FROM tables
WHERE name LIKE '%{context_id.lower()}%'
Expand Down
7 changes: 3 additions & 4 deletions exareme2/worker/exareme2/udfs/udfs_db.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from typing import List

from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_query
from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_udf
from exareme2.worker.exareme2.monetdb import monetdb_facade


def run_udf(udf_defenitions: List[str], udf_exec_stmt):
db_execute_query(";\n".join(udf_defenitions))
db_execute_udf(udf_exec_stmt)
monetdb_facade.execute_query(";\n".join(udf_defenitions))
monetdb_facade.execute_udf(udf_exec_stmt)
9 changes: 4 additions & 5 deletions exareme2/worker/exareme2/views/views_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
from typing import Optional

from exareme2.data_filters import build_filter_clause
from exareme2.worker.exareme2.monetdb import monetdb_facade
from exareme2.worker.exareme2.monetdb.guard import is_list_of_identifiers
from exareme2.worker.exareme2.monetdb.guard import is_primary_data_table
from exareme2.worker.exareme2.monetdb.guard import is_valid_filter
from exareme2.worker.exareme2.monetdb.guard import sql_injection_guard
from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_and_fetchall
from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_query
from exareme2.worker.exareme2.tables.tables_db import get_table_names
from exareme2.worker.exareme2.tables.tables_db import get_table_schema
from exareme2.worker_communication import ColumnInfo
Expand Down Expand Up @@ -49,9 +48,9 @@ def create_view(
{filter_clause}
"""

db_execute_query(view_creation_query)
monetdb_facade.execute_query(view_creation_query)

view_rows_query_result = db_execute_and_fetchall(
view_rows_query_result = monetdb_facade.execute_and_fetchall(
f"""
SELECT COUNT(*)
FROM {view_name}
Expand All @@ -61,7 +60,7 @@ def create_view(
view_rows_count = view_rows_result_row[0]

if view_rows_count < 1 or (check_min_rows and view_rows_count < minimum_row_count):
db_execute_query(f"""DROP VIEW {view_name}""")
monetdb_facade.execute_query(f"""DROP VIEW {view_name}""")
raise InsufficientDataError(
f"Query: {view_creation_query} creates an "
f"insufficient data view. ({view_name=} has been dropped)"
Expand Down
16 changes: 16 additions & 0 deletions exareme2/worker/worker_info/sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import sqlite3
from typing import List

from exareme2.worker import config as worker_config

CONN = sqlite3.connect(
f"{str(worker_config.data_path)}/{worker_config.sqlite.db_name}.db"
)


def execute_and_fetchall(query) -> List:
cur = CONN.cursor()
cur.execute(query)
result = cur.fetchall()
cur.close()
return result
22 changes: 11 additions & 11 deletions exareme2/worker/worker_info/worker_info_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from exareme2.worker.exareme2.monetdb.guard import is_datamodel
from exareme2.worker.exareme2.monetdb.guard import sql_injection_guard
from exareme2.worker.exareme2.monetdb.monetdb_facade import db_execute_and_fetchall
from exareme2.worker.worker_info import sqlite
from exareme2.worker_communication import CommonDataElement
from exareme2.worker_communication import CommonDataElements
from exareme2.worker_communication import DataModelAttributes
Expand All @@ -22,9 +22,9 @@ def get_data_models() -> List[str]:
The data_models.
"""

data_models_code_and_version = db_execute_and_fetchall(
data_models_code_and_version = sqlite.execute_and_fetchall(
f"""SELECT code, version
FROM "mipdb_metadata"."data_models"
FROM data_models
WHERE status = 'ENABLED'
"""
)
Expand All @@ -46,14 +46,14 @@ def get_dataset_code_per_dataset_label(data_model: str) -> Dict[str, str]:
"""
data_model_code, data_model_version = data_model.split(":")

datasets_rows = db_execute_and_fetchall(
datasets_rows = sqlite.execute_and_fetchall(
f"""
SELECT code, label
FROM "mipdb_metadata"."datasets"
FROM datasets
WHERE data_model_id =
(
SELECT data_model_id
FROM "mipdb_metadata"."data_models"
FROM data_models
WHERE code = '{data_model_code}'
AND version = '{data_model_version}'
)
Expand All @@ -76,9 +76,9 @@ def get_data_model_cdes(data_model: str) -> CommonDataElements:
"""
data_model_code, data_model_version = data_model.split(":")

cdes_rows = db_execute_and_fetchall(
cdes_rows = sqlite.execute_and_fetchall(
f"""
SELECT code, metadata FROM "{data_model_code}:{data_model_version}"."variables_metadata"
SELECT code, metadata FROM "{data_model_code}:{data_model_version}_variables_metadata"
"""
)

Expand All @@ -102,10 +102,10 @@ def get_data_model_attributes(data_model: str) -> DataModelAttributes:
"""
data_model_code, data_model_version = data_model.split(":")

attributes = db_execute_and_fetchall(
attributes = sqlite.execute_and_fetchall(
f"""
SELECT properties
FROM "mipdb_metadata"."data_models"
FROM data_models
WHERE code = '{data_model_code}'
AND version = '{data_model_version}'
"""
Expand All @@ -121,5 +121,5 @@ def check_database_connection():
"""
Check that the connection with the database is working.
"""
result = db_execute_and_fetchall(f"SELECT '{HEALTHCHECK_VALIDATION_STRING}'")
result = sqlite.execute_and_fetchall(f"SELECT '{HEALTHCHECK_VALIDATION_STRING}'")
assert result[0][0] == HEALTHCHECK_VALIDATION_STRING
31 changes: 0 additions & 31 deletions federation_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@
DB_USERNAME = "admin"
DB_PASSWORD = "executor"
DB_FARM = "db"
DB_METADATA_SCHEMA = "mipdb_metadata"
ACTIONS_TABLE = "actions"
ADD_DATA_MODEL_ACTION_CODE = "ADD DATA MODEL"
DELETE_DATA_MODEL_ACTION_CODE = "DELETE DATA MODEL"
ADD_DATASET_ACTION_CODE = "ADD DATASET"
DELETE_DATASET_ACTION_CODE = "DELETE DATASET"


@contextmanager
Expand Down Expand Up @@ -43,31 +37,6 @@ def cli():
pass


@cli.command()
@click.option("--ip", default="127.0.0.1", help="The ip of the database.")
@click.option("--port", default=50000, type=int, help="The port of the database.")
def show_worker_db_actions(ip, port):
with db_cursor(ip, port) as cur:
cur.execute(f"select * from {DB_METADATA_SCHEMA}.{ACTIONS_TABLE};")
results = cur.fetchall()
for _, action_str in results:
action = json.loads(action_str)
if (
action["action"] == ADD_DATA_MODEL_ACTION_CODE
or action["action"] == DELETE_DATA_MODEL_ACTION_CODE
):
print(
f"{action['date']} - {action['user']} - {action['action']} - {action['data_model_code']}:{action['data_model_version']} - {action['data_model_label']}"
)
elif (
action["action"] == ADD_DATASET_ACTION_CODE
or action["action"] == DELETE_DATASET_ACTION_CODE
):
print(
f"{action['date']} - {action['user']} - {action['action']} - {action['dataset_code']} - {action['dataset_label']} - {action['data_model_code']}:{action['data_model_version']} - {action['data_model_label']}"
)


LOG_FILE_CHUNK_SIZE = 1024 # Will read the logfile in chunks
TIMESTAMP_REGEX = (
r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}" # 2022-04-13 18:25:22,875
Expand Down
6 changes: 6 additions & 0 deletions kubernetes/templates/exareme2-localnode.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ spec:
- name: db-importer
image: {{ .Values.exareme2_images.repository }}/exareme2_mipdb:{{ .Values.exareme2_images.version }}
env:
- name: WORKER_IDENTIFIER
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: DB_IP
valueFrom:
fieldRef:
Expand Down Expand Up @@ -161,6 +165,8 @@ spec:
fieldPath: status.podIP
- name: RABBITMQ_PORT
value: "5672"
- name: SQLITE_DB_NAME
value: "sqlite"
- name: MONETDB_IP
valueFrom:
fieldRef:
Expand Down
5 changes: 4 additions & 1 deletion kubernetes/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ framework_log_level: ERROR

max_concurrent_experiments: 32

db:
sqlite:
db_name: sqlite

monetdb:
credentials_location: /opt/exareme2/credentials
storage_location: /opt/exareme2/db
csvs_location: /opt/exareme2/csvs
Expand Down
Loading

0 comments on commit e80f8e9

Please sign in to comment.