From 97c0a64561db357bbec6195c641ba7eef75797c5 Mon Sep 17 00:00:00 2001 From: kfilippopolitis Date: Mon, 1 Jul 2024 22:37:48 +0300 Subject: [PATCH] Update data processing for client, so they load data from csv and not the database. --- ...ocessing.py => inputdata_preprocessing.py} | 41 +- .../flower/logistic_regression/client.py | 8 +- .../flower/logistic_regression/server.py | 10 +- .../mnist_logistic_regression/server.py | 2 +- exareme2/controller/celery/tasks_handler.py | 3 +- exareme2/controller/quart/endpoints.py | 9 +- .../controller/services/flower/controller.py | 24 +- .../services/flower/tasks_handler.py | 8 +- .../worker_info_tasks_handler.py | 6 +- .../worker_landscape_aggregator.py | 216 ++-- .../worker/exareme2/views/views_service.py | 7 +- exareme2/worker/flower/starter/starter_api.py | 4 +- .../worker/flower/starter/starter_service.py | 3 +- .../worker/worker_info/worker_info_api.py | 6 +- exareme2/worker/worker_info/worker_info_db.py | 12 +- .../worker/worker_info/worker_info_service.py | 23 +- exareme2/worker_communication.py | 10 + ...ing.py => test_inputdata_preprocessing.py} | 12 +- .../api/test_validate_algorithm_request.py | 20 +- .../test_data_model_registry.py | 51 +- .../test_federation_info_script.py | 19 +- .../test_worker_landscape_aggregator.py | 1087 +++++++++++------ 22 files changed, 1022 insertions(+), 559 deletions(-) rename exareme2/algorithms/flower/{flower_data_processing.py => inputdata_preprocessing.py} (79%) rename tests/standalone_tests/algorithms/flower/{test_flower_data_processing.py => test_inputdata_preprocessing.py} (88%) diff --git a/exareme2/algorithms/flower/flower_data_processing.py b/exareme2/algorithms/flower/inputdata_preprocessing.py similarity index 79% rename from exareme2/algorithms/flower/flower_data_processing.py rename to exareme2/algorithms/flower/inputdata_preprocessing.py index 286322653..87a962108 100644 --- a/exareme2/algorithms/flower/flower_data_processing.py +++ b/exareme2/algorithms/flower/inputdata_preprocessing.py @@ -5,7 +5,6 @@ from typing import Optional import pandas as pd -import pymonetdb import requests from flwr.common.logger import FLOWER_LOGGER from pydantic import BaseModel @@ -29,37 +28,29 @@ class Inputdata(BaseModel): x: Optional[List[str]] -def fetch_data(data_model, datasets, from_db=False) -> pd.DataFrame: - return ( - _fetch_data_from_db(data_model, datasets) - if from_db - else _fetch_data_from_csv(data_model, datasets) - ) +def fetch_client_data(inputdata) -> pd.DataFrame: + dataframes = [ + pd.read_csv(f"{os.getenv('DATA_PATH')}{csv_path}") + for csv_path in os.getenv("CSV_PATHS").split(",") + ] + df = pd.concat(dataframes, ignore_index=True) + df = df[df["dataset"].isin(inputdata.datasets)] + return df[inputdata.x + inputdata.y] -def _fetch_data_from_db(data_model, datasets) -> pd.DataFrame: - query = f'SELECT * FROM "{data_model}"."primary_data"' - conn = pymonetdb.connect( - hostname=os.getenv("MONETDB_IP"), - port=int(os.getenv("MONETDB_PORT")), - username=os.getenv("MONETDB_USERNAME"), - password=os.getenv("MONETDB_PASSWORD"), - database=os.getenv("MONETDB_DB"), +def fetch_server_data(inputdata) -> pd.DataFrame: + data_folder = Path( + f"{os.getenv('DATA_PATH')}/{inputdata.data_model.split(':')[0]}_v_0_1" ) - df = pd.read_sql(query, conn) - conn.close() - df = df[df["dataset"].isin(datasets)] - return df - - -def _fetch_data_from_csv(data_model, datasets) -> pd.DataFrame: - data_folder = Path(f"{os.getenv('DATA_PATH')}/{data_model.split(':')[0]}_v_0_1") + print(f"Loading data from folder: {data_folder}") dataframes = [ pd.read_csv(data_folder / f"{dataset}.csv") - for dataset in datasets + for dataset in inputdata.datasets if (data_folder / f"{dataset}.csv").exists() ] - return pd.concat(dataframes, ignore_index=True) + df = pd.concat(dataframes, ignore_index=True) + df = df[df["dataset"].isin(inputdata.datasets)] + return df[inputdata.x + inputdata.y] def preprocess_data(inputdata, full_data): diff --git a/exareme2/algorithms/flower/logistic_regression/client.py b/exareme2/algorithms/flower/logistic_regression/client.py index 360b4caa2..5a3bed360 100644 --- a/exareme2/algorithms/flower/logistic_regression/client.py +++ b/exareme2/algorithms/flower/logistic_regression/client.py @@ -11,9 +11,9 @@ from utils import set_initial_params from utils import set_model_params -from exareme2.algorithms.flower.flower_data_processing import fetch_data -from exareme2.algorithms.flower.flower_data_processing import get_input -from exareme2.algorithms.flower.flower_data_processing import preprocess_data +from exareme2.algorithms.flower.inputdata_preprocessing import fetch_client_data +from exareme2.algorithms.flower.inputdata_preprocessing import get_input +from exareme2.algorithms.flower.inputdata_preprocessing import preprocess_data class LogisticRegressionClient(fl.client.NumPyClient): @@ -42,7 +42,7 @@ def evaluate(self, parameters, config): if __name__ == "__main__": model = LogisticRegression(penalty="l2", max_iter=1, warm_start=True) inputdata = get_input() - full_data = fetch_data(inputdata.data_model, inputdata.datasets, from_db=True) + full_data = fetch_client_data(inputdata) X_train, y_train = preprocess_data(inputdata, full_data) set_initial_params(model, X_train, full_data, inputdata) diff --git a/exareme2/algorithms/flower/logistic_regression/server.py b/exareme2/algorithms/flower/logistic_regression/server.py index 065eddb52..a00af71af 100644 --- a/exareme2/algorithms/flower/logistic_regression/server.py +++ b/exareme2/algorithms/flower/logistic_regression/server.py @@ -6,10 +6,10 @@ from utils import set_initial_params from utils import set_model_params -from exareme2.algorithms.flower.flower_data_processing import fetch_data -from exareme2.algorithms.flower.flower_data_processing import get_input -from exareme2.algorithms.flower.flower_data_processing import post_result -from exareme2.algorithms.flower.flower_data_processing import preprocess_data +from exareme2.algorithms.flower.inputdata_preprocessing import fetch_server_data +from exareme2.algorithms.flower.inputdata_preprocessing import get_input +from exareme2.algorithms.flower.inputdata_preprocessing import post_result +from exareme2.algorithms.flower.inputdata_preprocessing import preprocess_data # TODO: NUM_OF_ROUNDS should become a parameter of the algorithm and be set on the AlgorithmRequestDTO NUM_OF_ROUNDS = 5 @@ -35,7 +35,7 @@ def evaluate(server_round, parameters, config): if __name__ == "__main__": model = LogisticRegression() inputdata = get_input() - full_data = fetch_data(inputdata.data_model, inputdata.datasets) + full_data = fetch_server_data(inputdata) X_train, y_train = preprocess_data(inputdata, full_data) set_initial_params(model, X_train, full_data, inputdata) strategy = fl.server.strategy.FedAvg( diff --git a/exareme2/algorithms/flower/mnist_logistic_regression/server.py b/exareme2/algorithms/flower/mnist_logistic_regression/server.py index 9028f6c8d..124d76588 100644 --- a/exareme2/algorithms/flower/mnist_logistic_regression/server.py +++ b/exareme2/algorithms/flower/mnist_logistic_regression/server.py @@ -5,7 +5,7 @@ from sklearn.linear_model import LogisticRegression from sklearn.metrics import log_loss -from exareme2.algorithms.flower.flower_data_processing import post_result +from exareme2.algorithms.flower.inputdata_preprocessing import post_result from exareme2.algorithms.flower.mnist_logistic_regression import utils NUM_OF_ROUNDS = 5 diff --git a/exareme2/controller/celery/tasks_handler.py b/exareme2/controller/celery/tasks_handler.py index 33e5bb0a4..643969c77 100644 --- a/exareme2/controller/celery/tasks_handler.py +++ b/exareme2/controller/celery/tasks_handler.py @@ -298,13 +298,14 @@ def queue_healthcheck_task( ) def start_flower_client( - self, request_id, algorithm_name, server_address, execution_timeout + self, request_id, algorithm_name, server_address, csv_paths, execution_timeout ) -> WorkerTaskResult: return self._queue_task( task_signature=TASK_SIGNATURES["start_flower_client"], request_id=request_id, algorithm_name=algorithm_name, server_address=server_address, + csv_paths=csv_paths, execution_timeout=execution_timeout, ) diff --git a/exareme2/controller/quart/endpoints.py b/exareme2/controller/quart/endpoints.py index a6a9ffbdf..1bdaba54e 100644 --- a/exareme2/controller/quart/endpoints.py +++ b/exareme2/controller/quart/endpoints.py @@ -32,7 +32,14 @@ async def get_datasets() -> dict: @algorithms.route("/datasets_locations", methods=["GET"]) async def get_datasets_locations() -> dict: - return get_worker_landscape_aggregator().get_datasets_locations().datasets_locations + return { + data_model: { + dataset: info.worker_id for dataset, info in datasets_location.items() + } + for data_model, datasets_location in get_worker_landscape_aggregator() + .get_datasets_locations() + .datasets_locations.items() + } @algorithms.route("/cdes_metadata", methods=["GET"]) diff --git a/exareme2/controller/services/flower/controller.py b/exareme2/controller/services/flower/controller.py index d60a202e1..cb7383f38 100644 --- a/exareme2/controller/services/flower/controller.py +++ b/exareme2/controller/services/flower/controller.py @@ -1,4 +1,5 @@ import asyncio +from typing import Dict from typing import List from exareme2.controller import config as ctrl_config @@ -53,10 +54,17 @@ async def exec_algorithm(self, algorithm_name, algorithm_request_dto): request_id = algorithm_request_dto.request_id context_id = UIDGenerator().get_a_uid() logger = ctrl_logger.get_request_logger(request_id) - workers_info = self._get_workers_info_by_dataset( + csv_paths_per_worker_id: Dict[ + str, List[str] + ] = self.worker_landscape_aggregator.get_csv_paths_per_worker_id( algorithm_request_dto.inputdata.data_model, algorithm_request_dto.inputdata.datasets, ) + + workers_info = [ + self.worker_landscape_aggregator.get_worker_info(worker_id) + for worker_id in csv_paths_per_worker_id + ] task_handlers = [ self._create_worker_tasks_handler(request_id, worker) for worker in workers_info @@ -81,7 +89,6 @@ async def exec_algorithm(self, algorithm_name, algorithm_request_dto): server_pid = None clients_pids = {} server_address = f"{server_ip}:{FLOWER_SERVER_PORT}" - try: server_pid = server_task_handler.start_flower_server( algorithm_name, len(task_handlers), str(server_address) @@ -90,6 +97,7 @@ async def exec_algorithm(self, algorithm_name, algorithm_request_dto): handler.start_flower_client( algorithm_name, str(server_address), + csv_paths_per_worker_id[handler.worker_id], ctrl_config.flower_execution_timeout, ): handler for handler in task_handlers @@ -130,15 +138,3 @@ async def _cleanup( server_task_handler.stop_flower_server(server_pid, algorithm_name) for pid, handler in clients_pids.items(): handler.stop_flower_client(pid, algorithm_name) - - def _get_workers_info_by_dataset(self, data_model, datasets) -> List[WorkerInfo]: - """Retrieves worker information for those handling the specified datasets.""" - worker_ids = ( - self.worker_landscape_aggregator.get_worker_ids_with_any_of_datasets( - data_model, datasets - ) - ) - return [ - self.worker_landscape_aggregator.get_worker_info(worker_id) - for worker_id in worker_ids - ] diff --git a/exareme2/controller/services/flower/tasks_handler.py b/exareme2/controller/services/flower/tasks_handler.py index 75f053a85..32467f08b 100644 --- a/exareme2/controller/services/flower/tasks_handler.py +++ b/exareme2/controller/services/flower/tasks_handler.py @@ -30,10 +30,14 @@ def worker_data_address(self) -> str: return self._db_address def start_flower_client( - self, algorithm_name, server_address, execution_timeout + self, algorithm_name, server_address, csv_paths, execution_timeout ) -> int: return self._worker_tasks_handler.start_flower_client( - self._request_id, algorithm_name, server_address, execution_timeout + self._request_id, + algorithm_name, + server_address, + csv_paths, + execution_timeout, ).get(timeout=self._tasks_timeout) def start_flower_server( diff --git a/exareme2/controller/services/worker_landscape_aggregator/worker_info_tasks_handler.py b/exareme2/controller/services/worker_landscape_aggregator/worker_info_tasks_handler.py index 9660b2d9e..0f5f3a935 100644 --- a/exareme2/controller/services/worker_landscape_aggregator/worker_info_tasks_handler.py +++ b/exareme2/controller/services/worker_landscape_aggregator/worker_info_tasks_handler.py @@ -4,6 +4,7 @@ from exareme2.controller.celery.tasks_handler import WorkerTasksHandler from exareme2.worker_communication import CommonDataElements from exareme2.worker_communication import DataModelAttributes +from exareme2.worker_communication import DatasetsInfoPerDataModel from exareme2.worker_communication import WorkerInfo @@ -23,10 +24,11 @@ def get_worker_info_task(self) -> WorkerInfo: ).get(self._tasks_timeout) return WorkerInfo.parse_raw(result) - def get_worker_datasets_per_data_model_task(self) -> Dict[str, Dict[str, str]]: - return self._worker_tasks_handler.queue_worker_datasets_per_data_model_task( + def get_worker_datasets_per_data_model_task(self) -> DatasetsInfoPerDataModel: + result = self._worker_tasks_handler.queue_worker_datasets_per_data_model_task( self._request_id ).get(self._tasks_timeout) + return DatasetsInfoPerDataModel.parse_raw(result) def get_data_model_cdes_task(self, data_model: str) -> CommonDataElements: result = self._worker_tasks_handler.queue_data_model_cdes_task( diff --git a/exareme2/controller/services/worker_landscape_aggregator/worker_landscape_aggregator.py b/exareme2/controller/services/worker_landscape_aggregator/worker_landscape_aggregator.py index da12e24cf..41851a9e9 100644 --- a/exareme2/controller/services/worker_landscape_aggregator/worker_landscape_aggregator.py +++ b/exareme2/controller/services/worker_landscape_aggregator/worker_landscape_aggregator.py @@ -2,6 +2,7 @@ import time import traceback from abc import ABC +from collections import defaultdict from logging import Logger from typing import Any from typing import Dict @@ -28,6 +29,8 @@ from exareme2.worker_communication import CommonDataElement from exareme2.worker_communication import CommonDataElements from exareme2.worker_communication import DataModelAttributes +from exareme2.worker_communication import DatasetInfo +from exareme2.worker_communication import DatasetsInfoPerDataModel from exareme2.worker_communication import WorkerInfo from exareme2.worker_communication import WorkerRole @@ -68,6 +71,11 @@ class DataModelsAttributes(ImmutableBaseModel): data_models_attributes: Optional[Dict[str, DataModelAttributes]] = {} +class DatasetLocation(ImmutableBaseModel): + worker_id: str + csv_path: str + + class DatasetsLocations(ImmutableBaseModel): """ A dictionary representation of the locations of each dataset in the federation. @@ -75,7 +83,7 @@ class DatasetsLocations(ImmutableBaseModel): Values are Dictionaries of datasets and their locations. """ - datasets_locations: Optional[Dict[str, Dict[str, str]]] = {} + datasets_locations: Optional[Dict[str, Dict[str, DatasetLocation]]] = {} class DataModelRegistry(ImmutableBaseModel): @@ -122,6 +130,28 @@ def dataset_exists(self, data_model: str, dataset: str) -> bool: and dataset in self.datasets_locations.datasets_locations[data_model] ) + def get_csv_paths_per_worker_id( + self, data_model: str, datasets: List[str] + ) -> Dict[str, List[str]]: + if not self.data_model_exists(data_model): + return {} + csv_paths_per_worker_id = {} + dataset_infos = [ + dataset_info + for dataset, dataset_info in self.datasets_locations.datasets_locations[ + data_model + ].items() + if dataset in datasets + ] + for dataset_info in dataset_infos: + if dataset_info.worker_id not in csv_paths_per_worker_id: + csv_paths_per_worker_id[dataset_info.worker_id] = [] + csv_paths_per_worker_id[dataset_info.worker_id].append( + dataset_info.csv_path + ) + + return csv_paths_per_worker_id + def get_worker_ids_with_any_of_datasets( self, data_model: str, datasets: List[str] ) -> List[str]: @@ -129,7 +159,7 @@ def get_worker_ids_with_any_of_datasets( return [] local_workers_with_datasets = [ - self.datasets_locations.datasets_locations[data_model][dataset] + self.datasets_locations.datasets_locations[data_model][dataset].worker_id for dataset in self.datasets_locations.datasets_locations[data_model] if dataset in datasets ] @@ -161,7 +191,7 @@ def get_worker_specific_datasets( for dataset in self.datasets_locations.datasets_locations[data_model] if dataset in wanted_datasets and worker_id - == self.datasets_locations.datasets_locations[data_model][dataset] + == self.datasets_locations.datasets_locations[data_model][dataset].worker_id ] return datasets_in_worker @@ -209,31 +239,12 @@ class Config: arbitrary_types_allowed = True -class DatasetsLabels(ImmutableBaseModel): - """ - A dictionary representation of a dataset's information. - Key values are the names of the datasets. - Values are the labels of the datasets. - """ - - datasets_labels: Dict[str, str] - - -class DatasetsLabelsPerDataModel(ImmutableBaseModel): - """ - Key values are the names of the data_models. - Values are DatasetsLabels. - """ - - datasets_labels_per_data_model: Dict[str, DatasetsLabels] - - class DataModelMetadata(ImmutableBaseModel): """ A representation of a data model's Metadata datasets info, cdes and attributes for a specific data model """ - datasets_labels: DatasetsLabels + dataset_infos: List[DatasetInfo] cdes: Optional[CommonDataElements] attributes: Optional[DataModelAttributes] @@ -382,7 +393,7 @@ def _get_workers_info(self, workers_socket_addr: List[str]) -> List[WorkerInfo]: def _get_worker_datasets_per_data_model( self, worker_queue_addr: str, - ) -> Dict[str, Dict[str, str]]: + ) -> DatasetsInfoPerDataModel: tasks_handler = WorkerInfoTasksHandler( worker_queue_addr=worker_queue_addr, tasks_timeout=self._worker_info_tasks_timeout, @@ -507,6 +518,11 @@ def data_model_exists(self, data_model: str) -> bool: def dataset_exists(self, data_model: str, dataset: str) -> bool: return self._registries.data_model_registry.dataset_exists(data_model, dataset) + def get_csv_paths_per_worker_id(self, data_model: str, datasets: List[str]): + return self._registries.data_model_registry.get_csv_paths_per_worker_id( + data_model, datasets + ) + def get_worker_ids_with_any_of_datasets( self, data_model: str, datasets: List[str] ) -> List[str]: @@ -561,7 +577,10 @@ def _get_data_models_metadata_per_worker( ) if datasets_per_data_model: worker_socket_addr = _get_worker_socket_addr(worker_info) - for data_model, datasets in datasets_per_data_model.items(): + for ( + data_model, + dataset_infos, + ) in datasets_per_data_model.dataset_infos_per_data_model.items(): cdes = self._get_worker_cdes(worker_socket_addr, data_model) attributes = self._get_data_model_attributes( worker_socket_addr, data_model @@ -569,7 +588,7 @@ def _get_data_models_metadata_per_worker( cdes = cdes if cdes else None attributes = attributes if attributes else None data_models_metadata[data_model] = DataModelMetadata( - datasets_labels=DatasetsLabels(datasets_labels=datasets), + dataset_infos=dataset_infos, cdes=cdes, attributes=attributes, ) @@ -593,20 +612,19 @@ def _crunch_data_model_registry_data( data_models_metadata_per_worker, incompatible_data_models ) ) - ( - datasets_locations, - datasets_labels_per_data_model, - ) = _aggregate_datasets_locations_and_labels( + + cleaned_data_models_metadata_per_worker = _remove_duplicate_datasets( data_models_metadata_per_worker_with_compatible_data_models, logger ) data_models_cdes = _aggregate_data_models_cdes( - data_models_metadata_per_worker_with_compatible_data_models, - datasets_labels_per_data_model, + cleaned_data_models_metadata_per_worker, ) data_models_attributes = _aggregate_data_models_attributes( - data_models_metadata_per_worker_with_compatible_data_models, + cleaned_data_models_metadata_per_worker, + ) + datasets_locations = _extract_datasets_locations( + cleaned_data_models_metadata_per_worker ) - return DataModelRegistry( data_models_cdes=data_models_cdes, datasets_locations=datasets_locations, @@ -614,73 +632,63 @@ def _crunch_data_model_registry_data( ) -def _aggregate_datasets_locations_and_labels( - data_models_metadata_per_worker, logger -) -> Tuple[DatasetsLocations, DatasetsLabelsPerDataModel]: - """ - Args: - data_models_metadata_per_worker - Returns: - A tuple with: - 1. DatasetsLocations - 2. DatasetsLabelsPerDataModel - """ - datasets_locations = {} - datasets_labels = {} +def _remove_duplicate_datasets( + data_models_metadata_per_worker: DataModelsMetadataPerWorker, logger +) -> DataModelsMetadataPerWorker: + dataset_to_workers = defaultdict(lambda: defaultdict(set)) + updated_data_models_metadata_per_worker = {} + + # First pass to identify duplicates and log them for ( - worker_id, + worker, data_models_metadata, ) in data_models_metadata_per_worker.data_models_metadata_per_worker.items(): for ( data_model, - data_model_metadata, + model_metadata, ) in data_models_metadata.data_models_metadata.items(): - current_labels = ( - datasets_labels[data_model].datasets_labels - if data_model in datasets_labels - else {} - ) - current_datasets = ( - datasets_locations[data_model] - if data_model in datasets_locations - else {} - ) - - for ( - dataset_name, - dataset_label, - ) in data_model_metadata.datasets_labels.datasets_labels.items(): - current_labels[dataset_name] = dataset_label - - if dataset_name in current_datasets: - current_datasets[dataset_name].append(worker_id) + for dataset in model_metadata.dataset_infos: + if dataset.code in dataset_to_workers[data_model]: + dataset_to_workers[data_model][dataset.code].add(worker) + _log_duplicated_dataset( + list(dataset_to_workers[data_model][dataset.code]), + data_model, + dataset, + logger, + ) else: - current_datasets[dataset_name] = [worker_id] - - datasets_labels[data_model] = DatasetsLabels(datasets_labels=current_labels) - datasets_locations[data_model] = current_datasets - - datasets_locations_without_duplicates = {} - for data_model, dataset_locations in datasets_locations.items(): - datasets_locations_without_duplicates[data_model] = {} + dataset_to_workers[data_model][dataset.code].add(worker) - for dataset, worker_ids in dataset_locations.items(): - if len(worker_ids) == 1: - datasets_locations_without_duplicates[data_model][dataset] = worker_ids[ - 0 - ] - else: - del datasets_labels[data_model].datasets_labels[dataset] - _log_duplicated_dataset(worker_ids, data_model, dataset, logger) + # Second pass to create new instances without duplicates + for ( + worker, + data_models_metadata, + ) in data_models_metadata_per_worker.data_models_metadata_per_worker.items(): + updated_data_models_metadata = {} + for ( + data_model, + model_metadata, + ) in data_models_metadata.data_models_metadata.items(): + unique_datasets = [] + for dataset in model_metadata.dataset_infos: + if len(dataset_to_workers[data_model][dataset.code]) == 1: + unique_datasets.append(dataset) + updated_data_models_metadata[data_model] = DataModelMetadata( + dataset_infos=unique_datasets, + cdes=model_metadata.cdes, + attributes=model_metadata.attributes, + ) + updated_data_models_metadata_per_worker[worker] = DataModelsMetadata( + data_models_metadata=updated_data_models_metadata + ) - return DatasetsLocations( - datasets_locations=datasets_locations_without_duplicates - ), DatasetsLabelsPerDataModel(datasets_labels_per_data_model=datasets_labels) + return DataModelsMetadataPerWorker( + data_models_metadata_per_worker=updated_data_models_metadata_per_worker + ) def _aggregate_data_models_cdes( data_models_metadata_per_worker: DataModelsMetadataPerWorker, - datasets_labels_per_data_model: DatasetsLabelsPerDataModel, ) -> DataModelsCDES: data_models = {} for ( @@ -698,9 +706,10 @@ def _aggregate_data_models_cdes( label=dataset_cde.label, sql_type=dataset_cde.sql_type, is_categorical=dataset_cde.is_categorical, - enumerations=datasets_labels_per_data_model.datasets_labels_per_data_model[ - data_model - ].datasets_labels, + enumerations={ + dataset_info.code: dataset_info.label + for dataset_info in data_model_metadata.dataset_infos + }, min=dataset_cde.min, max=dataset_cde.max, ) @@ -736,6 +745,27 @@ def _aggregate_data_models_attributes( return DataModelsAttributes(data_models_attributes=data_models_attributes) +def _extract_datasets_locations( + data_models_metadata_per_worker: DataModelsMetadataPerWorker, +) -> DatasetsLocations: + datasets_locations_dict = defaultdict(lambda: defaultdict(dict)) + + for ( + worker_id, + data_models_metadata, + ) in data_models_metadata_per_worker.data_models_metadata_per_worker.items(): + for ( + data_model, + model_metadata, + ) in data_models_metadata.data_models_metadata.items(): + for dataset in model_metadata.dataset_infos: + datasets_locations_dict[data_model][dataset.code] = DatasetLocation( + worker_id=worker_id, csv_path=dataset.csv_path + ) + + return DatasetsLocations(datasets_locations=datasets_locations_dict) + + def _get_updated_properties(data_model, data_models_attributes, properties_to_be_added): if data_model not in data_models_attributes: return {key: [value] for key, value in properties_to_be_added.items()} @@ -873,7 +903,7 @@ def _log_datasets_added(old_datasets_locations, new_datasets_locations, logger): data_model, dataset, logger, - new_datasets_locations[data_model][dataset], + new_datasets_locations[data_model][dataset].worker_id, ) @@ -887,7 +917,7 @@ def _log_datasets_removed(old_datasets_locations, new_datasets_locations, logger data_model, dataset, logger, - old_datasets_locations[data_model][dataset], + old_datasets_locations[data_model][dataset].worker_id, ) diff --git a/exareme2/worker/exareme2/views/views_service.py b/exareme2/worker/exareme2/views/views_service.py index b13ecb88b..7f1a75379 100644 --- a/exareme2/worker/exareme2/views/views_service.py +++ b/exareme2/worker/exareme2/views/views_service.py @@ -6,9 +6,7 @@ from exareme2.worker.exareme2.views import views_db from exareme2.worker.utils.logger import initialise_logger from exareme2.worker.worker_info.worker_info_db import get_data_models -from exareme2.worker.worker_info.worker_info_db import ( - get_dataset_code_per_dataset_label, -) +from exareme2.worker.worker_info.worker_info_db import get_dataset_infos from exareme2.worker_communication import DataModelUnavailable from exareme2.worker_communication import DatasetUnavailable from exareme2.worker_communication import TableInfo @@ -184,7 +182,8 @@ def _validate_data_model_and_datasets_exist(data_model: str, datasets: List[str] if data_model not in get_data_models(): raise DataModelUnavailable(worker_config.identifier, data_model) - available_datasets = get_dataset_code_per_dataset_label(data_model) + available_dataset_infos = get_dataset_infos(data_model) + available_datasets = [dataset_info.code for dataset_info in available_dataset_infos] for dataset in datasets: if dataset not in available_datasets: raise DatasetUnavailable(worker_config.identifier, dataset) diff --git a/exareme2/worker/flower/starter/starter_api.py b/exareme2/worker/flower/starter/starter_api.py index fe710b6db..e2878a5bd 100644 --- a/exareme2/worker/flower/starter/starter_api.py +++ b/exareme2/worker/flower/starter/starter_api.py @@ -5,10 +5,10 @@ @shared_task def start_flower_client( - request_id: str, algorithm_name, server_address, execution_timeout + request_id: str, algorithm_name, server_address, csv_paths, execution_timeout ) -> int: return starter_service.start_flower_client( - request_id, algorithm_name, server_address, execution_timeout + request_id, algorithm_name, server_address, csv_paths, execution_timeout ) diff --git a/exareme2/worker/flower/starter/starter_service.py b/exareme2/worker/flower/starter/starter_service.py index 387a94235..821ad8d3b 100644 --- a/exareme2/worker/flower/starter/starter_service.py +++ b/exareme2/worker/flower/starter/starter_service.py @@ -6,7 +6,7 @@ @initialise_logger def start_flower_client( - request_id: str, algorithm_name, server_address, execution_timeout + request_id: str, algorithm_name, server_address, csv_paths, execution_timeout ) -> int: env_vars = { "MONETDB_IP": worker_config.monetdb.ip, @@ -22,6 +22,7 @@ def start_flower_client( "CONTROLLER_IP": worker_config.controller.ip, "CONTROLLER_PORT": worker_config.controller.port, "DATA_PATH": worker_config.data_path, + "CSV_PATHS": ",".join(csv_paths), "TIMEOUT": execution_timeout, } process = FlowerProcess(f"{algorithm_name}/client.py", env_vars=env_vars) diff --git a/exareme2/worker/worker_info/worker_info_api.py b/exareme2/worker/worker_info/worker_info_api.py index 50ef98e48..c646e8af8 100644 --- a/exareme2/worker/worker_info/worker_info_api.py +++ b/exareme2/worker/worker_info/worker_info_api.py @@ -1,5 +1,3 @@ -from typing import Dict - from celery import shared_task from exareme2.worker.worker_info import worker_info_service @@ -11,8 +9,8 @@ def get_worker_info(request_id: str) -> str: @shared_task -def get_worker_datasets_per_data_model(request_id: str) -> Dict[str, Dict[str, str]]: - return worker_info_service.get_worker_datasets_per_data_model(request_id) +def get_worker_datasets_per_data_model(request_id: str) -> str: + return worker_info_service.get_worker_datasets_per_data_model(request_id).json() @shared_task diff --git a/exareme2/worker/worker_info/worker_info_db.py b/exareme2/worker/worker_info/worker_info_db.py index 1c25904a8..947e2de8f 100644 --- a/exareme2/worker/worker_info/worker_info_db.py +++ b/exareme2/worker/worker_info/worker_info_db.py @@ -8,6 +8,7 @@ from exareme2.worker_communication import CommonDataElement from exareme2.worker_communication import CommonDataElements from exareme2.worker_communication import DataModelAttributes +from exareme2.worker_communication import DatasetInfo HEALTHCHECK_VALIDATION_STRING = "HEALTHCHECK" @@ -35,9 +36,9 @@ def get_data_models() -> List[str]: @sql_injection_guard(data_model=is_datamodel) -def get_dataset_code_per_dataset_label(data_model: str) -> Dict[str, str]: +def get_dataset_infos(data_model: str) -> List[DatasetInfo]: """ - Retrieves the enabled key-value pair of code and label, for a specific data_model. + Retrieves the enabled dataset, for a specific data_model. Returns ------ @@ -48,7 +49,7 @@ def get_dataset_code_per_dataset_label(data_model: str) -> Dict[str, str]: datasets_rows = sqlite.execute_and_fetchall( f""" - SELECT code, label + SELECT code, label, csv_path FROM datasets WHERE data_model_id = ( @@ -60,8 +61,9 @@ def get_dataset_code_per_dataset_label(data_model: str) -> Dict[str, str]: AND status = 'ENABLED' """ ) - datasets = {code: label for code, label in datasets_rows} - return datasets + return [ + DatasetInfo(code=row[0], label=row[1], csv_path=row[2]) for row in datasets_rows + ] @sql_injection_guard(data_model=is_datamodel) diff --git a/exareme2/worker/worker_info/worker_info_service.py b/exareme2/worker/worker_info/worker_info_service.py index 6dd03a11c..da8b21047 100644 --- a/exareme2/worker/worker_info/worker_info_service.py +++ b/exareme2/worker/worker_info/worker_info_service.py @@ -1,15 +1,12 @@ -from typing import Dict - from exareme2.worker import config as worker_config from exareme2.worker.utils.logger import initialise_logger from exareme2.worker.worker_info import worker_info_db from exareme2.worker.worker_info.worker_info_db import check_database_connection from exareme2.worker.worker_info.worker_info_db import get_data_models -from exareme2.worker.worker_info.worker_info_db import ( - get_dataset_code_per_dataset_label, -) +from exareme2.worker.worker_info.worker_info_db import get_dataset_infos from exareme2.worker_communication import CommonDataElements from exareme2.worker_communication import DataModelAttributes +from exareme2.worker_communication import DatasetsInfoPerDataModel from exareme2.worker_communication import WorkerInfo @@ -33,7 +30,7 @@ def get_worker_info(request_id: str) -> WorkerInfo: @initialise_logger -def get_worker_datasets_per_data_model(request_id: str) -> Dict[str, Dict[str, str]]: +def get_worker_datasets_per_data_model(request_id: str) -> DatasetsInfoPerDataModel: """ Parameters ---------- @@ -41,13 +38,15 @@ def get_worker_datasets_per_data_model(request_id: str) -> Dict[str, Dict[str, s The identifier for the logging Returns ------ - Dict[str, Dict[str, str]] - A dictionary with key data model and value a list of pairs (dataset code and dataset label) + DatasetsInfoPerDataModel + A dictionary with key data model and value a dictionary with keys dataset and value each corresponding Info (label, csv_path) """ - return { - data_model: get_dataset_code_per_dataset_label(data_model) - for data_model in get_data_models() - } + return DatasetsInfoPerDataModel( + datasets_info_per_data_model={ + data_model: get_dataset_infos(data_model) + for data_model in get_data_models() + } + ) @initialise_logger diff --git a/exareme2/worker_communication.py b/exareme2/worker_communication.py index 652c01bb7..3db2418cf 100644 --- a/exareme2/worker_communication.py +++ b/exareme2/worker_communication.py @@ -258,6 +258,16 @@ class DataModelAttributes(ImmutableBaseModel): properties: Dict +class DatasetInfo(ImmutableBaseModel): + code: str + label: str + csv_path: str + + +class DatasetsInfoPerDataModel(ImmutableBaseModel): + datasets_info_per_data_model: Dict[str, List[DatasetInfo]] + + class CommonDataElement(ImmutableBaseModel): code: str label: str diff --git a/tests/standalone_tests/algorithms/flower/test_flower_data_processing.py b/tests/standalone_tests/algorithms/flower/test_inputdata_preprocessing.py similarity index 88% rename from tests/standalone_tests/algorithms/flower/test_flower_data_processing.py rename to tests/standalone_tests/algorithms/flower/test_inputdata_preprocessing.py index 5798e9bf9..2c22f4330 100644 --- a/tests/standalone_tests/algorithms/flower/test_flower_data_processing.py +++ b/tests/standalone_tests/algorithms/flower/test_inputdata_preprocessing.py @@ -5,12 +5,12 @@ from pydantic import ValidationError -from exareme2.algorithms.flower.flower_data_processing import HEADERS -from exareme2.algorithms.flower.flower_data_processing import RESULT_URL -from exareme2.algorithms.flower.flower_data_processing import error_handling -from exareme2.algorithms.flower.flower_data_processing import get_enumerations -from exareme2.algorithms.flower.flower_data_processing import get_input -from exareme2.algorithms.flower.flower_data_processing import post_result +from exareme2.algorithms.flower.inputdata_preprocessing import HEADERS +from exareme2.algorithms.flower.inputdata_preprocessing import RESULT_URL +from exareme2.algorithms.flower.inputdata_preprocessing import error_handling +from exareme2.algorithms.flower.inputdata_preprocessing import get_enumerations +from exareme2.algorithms.flower.inputdata_preprocessing import get_input +from exareme2.algorithms.flower.inputdata_preprocessing import post_result class TestAPIMethods(unittest.TestCase): diff --git a/tests/standalone_tests/controller/services/api/test_validate_algorithm_request.py b/tests/standalone_tests/controller/services/api/test_validate_algorithm_request.py index 35a31f844..53dd25416 100644 --- a/tests/standalone_tests/controller/services/api/test_validate_algorithm_request.py +++ b/tests/standalone_tests/controller/services/api/test_validate_algorithm_request.py @@ -27,6 +27,9 @@ from exareme2.controller.services.worker_landscape_aggregator.worker_landscape_aggregator import ( DataModelsCDES, ) +from exareme2.controller.services.worker_landscape_aggregator.worker_landscape_aggregator import ( + DatasetLocation, +) from exareme2.controller.services.worker_landscape_aggregator.worker_landscape_aggregator import ( DatasetsLocations, ) @@ -115,10 +118,21 @@ def worker_landscape_aggregator(): datasets_locations=DatasetsLocations( datasets_locations={ "data_model_with_all_cde_types:0.1": { - "sample_dataset1": "sample_worker", - "sample_dataset2": "sample_worker", + "sample_dataset1": DatasetLocation( + worker_id="sample_worker", + csv_path="/opt/data/sample_dataset1.csv", + ), + "sample_dataset2": DatasetLocation( + worker_id="sample_worker", + csv_path="/opt/data/sample_dataset2.csv", + ), + }, + "sample_data_model:0.1": { + "sample_dataset": DatasetLocation( + worker_id="sample_worker", + csv_path="/opt/data/sample_dataset.csv", + ) }, - "sample_data_model:0.1": {"sample_dataset": "sample_worker"}, } ), ) diff --git a/tests/standalone_tests/controller/services/worker_landscape_agreggator/test_data_model_registry.py b/tests/standalone_tests/controller/services/worker_landscape_agreggator/test_data_model_registry.py index 3ab40eadc..d46221102 100644 --- a/tests/standalone_tests/controller/services/worker_landscape_agreggator/test_data_model_registry.py +++ b/tests/standalone_tests/controller/services/worker_landscape_agreggator/test_data_model_registry.py @@ -10,6 +10,9 @@ from exareme2.controller.services.worker_landscape_aggregator.worker_landscape_aggregator import ( DataModelsCDES, ) +from exareme2.controller.services.worker_landscape_aggregator.worker_landscape_aggregator import ( + DatasetLocation, +) from exareme2.controller.services.worker_landscape_aggregator.worker_landscape_aggregator import ( DatasetsLocations, ) @@ -23,20 +26,44 @@ def mocked_datasets_locations(): yield DatasetsLocations( datasets_locations={ "tbi:0.1": { - "dummy_tbi0": "localworker1", - "dummy_tbi1": "localworker2", - "dummy_tbi3": "localworker2", + "dummy_tbi0": DatasetLocation( + worker_id="localworker1", csv_path="/opt/data/dummy_tbi0.csv" + ), + "dummy_tbi1": DatasetLocation( + worker_id="localworker2", csv_path="/opt/data/dummy_tbi1.csv" + ), + "dummy_tbi3": DatasetLocation( + worker_id="localworker2", csv_path="/opt/data/dummy_tbi3.csv" + ), }, "dementia:0.1": { - "ppmi0": "localworker1", - "ppmi1": "localworker2", - "ppmi3": "localworker2", - "edsd0": "localworker1", - "edsd1": "localworker2", - "edsd3": "localworker2", - "desd-synthdata0": "localworker1", - "desd-synthdata1": "localworker2", - "desd-synthdata3": "localworker2", + "ppmi0": DatasetLocation( + worker_id="localworker1", csv_path="/opt/data/ppmi0.csv" + ), + "ppmi1": DatasetLocation( + worker_id="localworker2", csv_path="/opt/data/ppmi1.csv" + ), + "ppmi3": DatasetLocation( + worker_id="localworker2", csv_path="/opt/data/ppmi3.csv" + ), + "edsd0": DatasetLocation( + worker_id="localworker1", csv_path="/opt/data/edsd0.csv" + ), + "edsd1": DatasetLocation( + worker_id="localworker2", csv_path="/opt/data/edsd1.csv" + ), + "edsd3": DatasetLocation( + worker_id="localworker2", csv_path="/opt/data/edsd3.csv" + ), + "synthdata0": DatasetLocation( + worker_id="localworker1", csv_path="/opt/data/synthdata0.csv" + ), + "synthdata1": DatasetLocation( + worker_id="localworker2", csv_path="/opt/data/synthdata1.csv" + ), + "synthdata2": DatasetLocation( + worker_id="localworker2", csv_path="/opt/data/synthdata2.csv" + ), }, } ) diff --git a/tests/standalone_tests/controller/services/worker_landscape_agreggator/test_federation_info_script.py b/tests/standalone_tests/controller/services/worker_landscape_agreggator/test_federation_info_script.py index 553e4e117..adc6bfae4 100644 --- a/tests/standalone_tests/controller/services/worker_landscape_agreggator/test_federation_info_script.py +++ b/tests/standalone_tests/controller/services/worker_landscape_agreggator/test_federation_info_script.py @@ -7,6 +7,9 @@ from exareme2 import AttrDict from exareme2.controller.federation_info_logs import log_experiment_execution from exareme2.controller.logger import init_logger +from exareme2.controller.services.worker_landscape_aggregator.worker_landscape_aggregator import ( + DatasetLocation, +) from exareme2.controller.services.worker_landscape_aggregator.worker_landscape_aggregator import ( _log_data_model_changes, ) @@ -73,8 +76,20 @@ def test_show_controller_audit_entries(patch_controller_logger_config, capsys): logger=logger, ) _log_dataset_changes( - old_datasets_locations={"dementia:0.1": {"edsd": "localworker1"}}, - new_datasets_locations={"tbi:0.1": {"dummy_tbi": "localworker2"}}, + old_datasets_locations={ + "dementia:0.1": { + "edsd": DatasetLocation( + worker_id="localworker1", csv_path="/opt/data/edsd.csv" + ) + } + }, + new_datasets_locations={ + "tbi:0.1": { + "dummy_tbi": DatasetLocation( + worker_id="localworker2", csv_path="/opt/data/dummy_tbi.csv" + ) + } + }, logger=logger, ) log_experiment_execution( diff --git a/tests/standalone_tests/controller/services/worker_landscape_agreggator/test_worker_landscape_aggregator.py b/tests/standalone_tests/controller/services/worker_landscape_agreggator/test_worker_landscape_aggregator.py index 8d4039e2b..cfe4f5924 100644 --- a/tests/standalone_tests/controller/services/worker_landscape_agreggator/test_worker_landscape_aggregator.py +++ b/tests/standalone_tests/controller/services/worker_landscape_agreggator/test_worker_landscape_aggregator.py @@ -21,7 +21,7 @@ DataModelsMetadataPerWorker, ) from exareme2.controller.services.worker_landscape_aggregator.worker_landscape_aggregator import ( - DatasetsLabels, + DatasetLocation, ) from exareme2.controller.services.worker_landscape_aggregator.worker_landscape_aggregator import ( DatasetsLocations, @@ -35,6 +35,7 @@ from exareme2.worker_communication import CommonDataElement from exareme2.worker_communication import CommonDataElements from exareme2.worker_communication import DataModelAttributes +from exareme2.worker_communication import DatasetInfo from tests.standalone_tests.conftest import RABBITMQ_LOCALWORKERTMP_ADDR @@ -80,12 +81,18 @@ def get_parametrization_cases(): "localworker1": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -108,11 +115,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ) + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -138,12 +147,18 @@ def get_parametrization_cases(): "localworker2": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset3": "DATASET3", - "dataset4": "DATASET4", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset3", + label="DATASET3", + csv_path="/opt/data/dataset3.csv", + ), + DatasetInfo( + code="dataset4", + label="DATASET4", + csv_path="/opt/data/dataset4.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -170,12 +185,18 @@ def get_parametrization_cases(): "localworker3": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset5": "DATASET5", - "dataset6": "DATASET6", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset5", + label="DATASET5", + csv_path="/opt/data/dataset5.csv", + ), + DatasetInfo( + code="dataset6", + label="DATASET6", + csv_path="/opt/data/dataset6.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -198,11 +219,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -290,16 +313,40 @@ def get_parametrization_cases(): datasets_locations=DatasetsLocations( datasets_locations={ "data_model:1": { - "dataset1": "localworker1", - "dataset2": "localworker1", - "dataset3": "localworker2", - "dataset4": "localworker2", - "dataset5": "localworker3", - "dataset6": "localworker3", + "dataset1": DatasetLocation( + worker_id="localworker1", + csv_path="/opt/data/dataset1.csv", + ), + "dataset2": DatasetLocation( + worker_id="localworker1", + csv_path="/opt/data/dataset2.csv", + ), + "dataset3": DatasetLocation( + worker_id="localworker2", + csv_path="/opt/data/dataset3.csv", + ), + "dataset4": DatasetLocation( + worker_id="localworker2", + csv_path="/opt/data/dataset4.csv", + ), + "dataset5": DatasetLocation( + worker_id="localworker3", + csv_path="/opt/data/dataset5.csv", + ), + "dataset6": DatasetLocation( + worker_id="localworker3", + csv_path="/opt/data/dataset6.csv", + ), }, "data_model:2": { - "dataset1": "localworker1", - "dataset2": "localworker3", + "dataset1": DatasetLocation( + worker_id="localworker1", + csv_path="/opt/data/dataset1.csv", + ), + "dataset2": DatasetLocation( + worker_id="localworker3", + csv_path="/opt/data/dataset2.csv", + ), }, } ), @@ -312,12 +359,18 @@ def get_parametrization_cases(): "localworker1": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=None, attributes=DataModelAttributes( tags=[], @@ -325,11 +378,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ) + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -355,12 +410,18 @@ def get_parametrization_cases(): "localworker2": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset3": "DATASET3", - "dataset4": "DATASET4", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset3", + label="DATASET3", + csv_path="/opt/data/dataset3.csv", + ), + DatasetInfo( + code="dataset4", + label="DATASET4", + csv_path="/opt/data/dataset4.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -387,12 +448,18 @@ def get_parametrization_cases(): "localworker3": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset5": "DATASET5", - "dataset6": "DATASET6", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset5", + label="DATASET5", + csv_path="/opt/data/dataset5.csv", + ), + DatasetInfo( + code="dataset6", + label="DATASET6", + csv_path="/opt/data/dataset6.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -415,11 +482,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -492,14 +561,32 @@ def get_parametrization_cases(): datasets_locations=DatasetsLocations( datasets_locations={ "data_model:1": { - "dataset3": "localworker2", - "dataset4": "localworker2", - "dataset5": "localworker3", - "dataset6": "localworker3", + "dataset3": DatasetLocation( + worker_id="localworker2", + csv_path="/opt/data/dataset3.csv", + ), + "dataset4": DatasetLocation( + worker_id="localworker2", + csv_path="/opt/data/dataset4.csv", + ), + "dataset5": DatasetLocation( + worker_id="localworker3", + csv_path="/opt/data/dataset5.csv", + ), + "dataset6": DatasetLocation( + worker_id="localworker3", + csv_path="/opt/data/dataset6.csv", + ), }, "data_model:2": { - "dataset1": "localworker1", - "dataset2": "localworker3", + "dataset1": DatasetLocation( + worker_id="localworker1", + csv_path="/opt/data/dataset1.csv", + ), + "dataset2": DatasetLocation( + worker_id="localworker3", + csv_path="/opt/data/dataset2.csv", + ), }, } ), @@ -512,12 +599,18 @@ def get_parametrization_cases(): "localworker1": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -549,11 +642,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ) + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -579,12 +674,18 @@ def get_parametrization_cases(): "localworker2": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset3": "DATASET3", - "dataset4": "DATASET4", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset3", + label="DATASET3", + csv_path="/opt/data/dataset3.csv", + ), + DatasetInfo( + code="dataset4", + label="DATASET4", + csv_path="/opt/data/dataset4.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -620,12 +721,18 @@ def get_parametrization_cases(): "localworker3": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset5": "DATASET5", - "dataset6": "DATASET6", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset5", + label="DATASET5", + csv_path="/opt/data/dataset5.csv", + ), + DatasetInfo( + code="dataset6", + label="DATASET6", + csv_path="/opt/data/dataset6.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -657,11 +764,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ) + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -715,8 +824,14 @@ def get_parametrization_cases(): datasets_locations=DatasetsLocations( datasets_locations={ "data_model:2": { - "dataset1": "localworker1", - "dataset2": "localworker3", + "dataset1": DatasetLocation( + worker_id="localworker1", + csv_path="/opt/data/dataset1.csv", + ), + "dataset2": DatasetLocation( + worker_id="localworker3", + csv_path="/opt/data/dataset2.csv", + ), }, } ), @@ -729,7 +844,7 @@ def get_parametrization_cases(): "localworker1": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels(datasets_labels={}), + dataset_infos=[], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -752,7 +867,7 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels(datasets_labels={}), + dataset_infos=[], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -778,7 +893,7 @@ def get_parametrization_cases(): "localworker2": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels(datasets_labels={}), + dataset_infos=[], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -805,7 +920,7 @@ def get_parametrization_cases(): "localworker3": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels(datasets_labels={}), + dataset_infos=[], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -828,7 +943,7 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels(datasets_labels={}), + dataset_infos=[], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -890,12 +1005,7 @@ def get_parametrization_cases(): ), } ), - datasets_locations=DatasetsLocations( - datasets_locations={ - "data_model:1": {}, - "data_model:2": {}, - } - ), + datasets_locations=DatasetsLocations(datasets_locations={}), ), id="no_data_model_or_dataset_case", ), @@ -919,12 +1029,18 @@ def get_parametrization_cases(): "localworker1": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -947,11 +1063,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -977,12 +1095,18 @@ def get_parametrization_cases(): "localworker2": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1005,11 +1129,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1035,12 +1161,18 @@ def get_parametrization_cases(): "localworker3": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1063,11 +1195,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1093,12 +1227,18 @@ def get_parametrization_cases(): "localworker4": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1121,11 +1261,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1187,9 +1329,7 @@ def get_parametrization_cases(): ), } ), - datasets_locations=DatasetsLocations( - datasets_locations={"data_model:1": {}, "data_model:2": {}} - ), + datasets_locations=DatasetsLocations(datasets_locations={}), ), id="same_data_models_and_datasets_on_all_workers", ), @@ -1199,12 +1339,18 @@ def get_parametrization_cases(): "localworker1": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1227,11 +1373,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1257,12 +1405,18 @@ def get_parametrization_cases(): "localworker2": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset3": "DATASET3", - "dataset4": "DATASET4", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset3", + label="DATASET3", + csv_path="/opt/data/dataset3.csv", + ), + DatasetInfo( + code="dataset4", + label="DATASET4", + csv_path="/opt/data/dataset4.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1289,12 +1443,18 @@ def get_parametrization_cases(): "localworker3": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset5": "DATASET5", - "dataset6": "DATASET6", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset5", + label="DATASET5", + csv_path="/opt/data/dataset5.csv", + ), + DatasetInfo( + code="dataset6", + label="DATASET6", + csv_path="/opt/data/dataset6.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1317,11 +1477,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1393,14 +1555,31 @@ def get_parametrization_cases(): datasets_locations=DatasetsLocations( datasets_locations={ "data_model:1": { - "dataset1": "localworker1", - "dataset2": "localworker1", - "dataset3": "localworker2", - "dataset4": "localworker2", - "dataset5": "localworker3", - "dataset6": "localworker3", + "dataset1": DatasetLocation( + worker_id="localworker1", + csv_path="/opt/data/dataset1.csv", + ), + "dataset2": DatasetLocation( + worker_id="localworker1", + csv_path="/opt/data/dataset2.csv", + ), + "dataset3": DatasetLocation( + worker_id="localworker2", + csv_path="/opt/data/dataset3.csv", + ), + "dataset4": DatasetLocation( + worker_id="localworker2", + csv_path="/opt/data/dataset4.csv", + ), + "dataset5": DatasetLocation( + worker_id="localworker3", + csv_path="/opt/data/dataset5.csv", + ), + "dataset6": DatasetLocation( + worker_id="localworker3", + csv_path="/opt/data/dataset6.csv", + ), }, - "data_model:2": {}, } ), ), @@ -1412,12 +1591,18 @@ def get_parametrization_cases(): "localworker1": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1449,11 +1634,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1479,12 +1666,18 @@ def get_parametrization_cases(): "localworker2": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset3": "DATASET3", - "dataset4": "DATASET4", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset3", + label="DATASET3", + csv_path="/opt/data/dataset3.csv", + ), + DatasetInfo( + code="dataset4", + label="DATASET4", + csv_path="/opt/data/dataset4.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1511,12 +1704,18 @@ def get_parametrization_cases(): "localworker3": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset5": "DATASET5", - "dataset6": "DATASET6", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset5", + label="DATASET5", + csv_path="/opt/data/dataset5.csv", + ), + DatasetInfo( + code="dataset6", + label="DATASET6", + csv_path="/opt/data/dataset6.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1539,11 +1738,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1597,8 +1798,14 @@ def get_parametrization_cases(): datasets_locations=DatasetsLocations( datasets_locations={ "data_model:2": { - "dataset1": "localworker1", - "dataset2": "localworker3", + "dataset1": DatasetLocation( + worker_id="localworker1", + csv_path="/opt/data/dataset1.csv", + ), + "dataset2": DatasetLocation( + worker_id="localworker3", + csv_path="/opt/data/dataset2.csv", + ), }, } ), @@ -1611,12 +1818,18 @@ def get_parametrization_cases(): "localworker1": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1648,11 +1861,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1678,12 +1893,18 @@ def get_parametrization_cases(): "localworker2": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset3": "DATASET3", - "dataset4": "DATASET4", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset3", + label="DATASET3", + csv_path="/opt/data/dataset3.csv", + ), + DatasetInfo( + code="dataset4", + label="DATASET4", + csv_path="/opt/data/dataset4.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1710,12 +1931,18 @@ def get_parametrization_cases(): "localworker3": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset5": "DATASET5", - "dataset6": "DATASET6", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset5", + label="DATASET5", + csv_path="/opt/data/dataset5.csv", + ), + DatasetInfo( + code="dataset6", + label="DATASET6", + csv_path="/opt/data/dataset6.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1738,11 +1965,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1796,8 +2025,14 @@ def get_parametrization_cases(): datasets_locations=DatasetsLocations( datasets_locations={ "data_model:2": { - "dataset1": "localworker1", - "dataset2": "localworker3", + "dataset1": DatasetLocation( + worker_id="localworker1", + csv_path="/opt/data/dataset1.csv", + ), + "dataset2": DatasetLocation( + worker_id="localworker3", + csv_path="/opt/data/dataset2.csv", + ), }, } ), @@ -1810,12 +2045,18 @@ def get_parametrization_cases(): "localworker1": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1841,11 +2082,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1871,12 +2114,18 @@ def get_parametrization_cases(): "localworker2": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset3": "DATASET3", - "dataset4": "DATASET4", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset3", + label="DATASET3", + csv_path="/opt/data/dataset3.csv", + ), + DatasetInfo( + code="dataset4", + label="DATASET4", + csv_path="/opt/data/dataset4.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1906,12 +2155,18 @@ def get_parametrization_cases(): "localworker3": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset5": "DATASET5", - "dataset6": "DATASET6", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset5", + label="DATASET5", + csv_path="/opt/data/dataset5.csv", + ), + DatasetInfo( + code="dataset6", + label="DATASET6", + csv_path="/opt/data/dataset6.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -1934,11 +2189,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -2027,16 +2284,40 @@ def get_parametrization_cases(): datasets_locations=DatasetsLocations( datasets_locations={ "data_model:1": { - "dataset1": "localworker1", - "dataset2": "localworker1", - "dataset3": "localworker2", - "dataset4": "localworker2", - "dataset5": "localworker3", - "dataset6": "localworker3", + "dataset1": DatasetLocation( + worker_id="localworker1", + csv_path="/opt/data/dataset1.csv", + ), + "dataset2": DatasetLocation( + worker_id="localworker1", + csv_path="/opt/data/dataset2.csv", + ), + "dataset3": DatasetLocation( + worker_id="localworker2", + csv_path="/opt/data/dataset3.csv", + ), + "dataset4": DatasetLocation( + worker_id="localworker2", + csv_path="/opt/data/dataset4.csv", + ), + "dataset5": DatasetLocation( + worker_id="localworker3", + csv_path="/opt/data/dataset5.csv", + ), + "dataset6": DatasetLocation( + worker_id="localworker3", + csv_path="/opt/data/dataset6.csv", + ), }, "data_model:2": { - "dataset1": "localworker1", - "dataset2": "localworker3", + "dataset1": DatasetLocation( + worker_id="localworker1", + csv_path="/opt/data/dataset1.csv", + ), + "dataset2": DatasetLocation( + worker_id="localworker3", + csv_path="/opt/data/dataset2.csv", + ), }, } ), @@ -2049,12 +2330,18 @@ def get_parametrization_cases(): "localworker1": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -2080,11 +2367,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -2110,12 +2399,18 @@ def get_parametrization_cases(): "localworker2": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset3": "DATASET3", - "dataset4": "DATASET4", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset3", + label="DATASET3", + csv_path="/opt/data/dataset3.csv", + ), + DatasetInfo( + code="dataset4", + label="DATASET4", + csv_path="/opt/data/dataset4.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -2145,12 +2440,18 @@ def get_parametrization_cases(): "localworker3": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset5": "DATASET5", - "dataset6": "DATASET6", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset5", + label="DATASET5", + csv_path="/opt/data/dataset5.csv", + ), + DatasetInfo( + code="dataset6", + label="DATASET6", + csv_path="/opt/data/dataset6.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -2173,11 +2474,13 @@ def get_parametrization_cases(): ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -2266,16 +2569,40 @@ def get_parametrization_cases(): datasets_locations=DatasetsLocations( datasets_locations={ "data_model:1": { - "dataset1": "localworker1", - "dataset2": "localworker1", - "dataset3": "localworker2", - "dataset4": "localworker2", - "dataset5": "localworker3", - "dataset6": "localworker3", + "dataset1": DatasetLocation( + worker_id="localworker1", + csv_path="/opt/data/dataset1.csv", + ), + "dataset2": DatasetLocation( + worker_id="localworker1", + csv_path="/opt/data/dataset2.csv", + ), + "dataset3": DatasetLocation( + worker_id="localworker2", + csv_path="/opt/data/dataset3.csv", + ), + "dataset4": DatasetLocation( + worker_id="localworker2", + csv_path="/opt/data/dataset4.csv", + ), + "dataset5": DatasetLocation( + worker_id="localworker3", + csv_path="/opt/data/dataset5.csv", + ), + "dataset6": DatasetLocation( + worker_id="localworker3", + csv_path="/opt/data/dataset6.csv", + ), }, "data_model:2": { - "dataset1": "localworker1", - "dataset2": "localworker3", + "dataset1": DatasetLocation( + worker_id="localworker1", + csv_path="/opt/data/dataset1.csv", + ), + "dataset2": DatasetLocation( + worker_id="localworker3", + csv_path="/opt/data/dataset2.csv", + ), }, } ), @@ -2301,6 +2628,12 @@ def test_data_model_registry( dmr.data_models_cdes.data_models_cdes == expected.data_models_cdes.data_models_cdes ) + + print("------------------------------------------------------------------\n") + print(dmr.datasets_locations.datasets_locations) + print("----------------------") + print(expected.datasets_locations.datasets_locations) + assert ( dmr.datasets_locations.datasets_locations == expected.datasets_locations.datasets_locations @@ -2317,12 +2650,18 @@ def test_data_model_registry_missing_data_model_attributes(worker_landscape_aggr "localworker1": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -2342,11 +2681,13 @@ def test_data_model_registry_missing_data_model_attributes(worker_landscape_aggr attributes=None, ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset1": "DATASET1", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset1", + label="DATASET1", + csv_path="/opt/data/dataset1.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -2372,12 +2713,18 @@ def test_data_model_registry_missing_data_model_attributes(worker_landscape_aggr "localworker2": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset3": "DATASET3", - "dataset4": "DATASET4", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset3", + label="DATASET3", + csv_path="/opt/data/dataset3.csv", + ), + DatasetInfo( + code="dataset4", + label="DATASET4", + csv_path="/opt/data/dataset4.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -2404,12 +2751,18 @@ def test_data_model_registry_missing_data_model_attributes(worker_landscape_aggr "localworker3": DataModelsMetadata( data_models_metadata={ "data_model:1": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset5": "DATASET5", - "dataset6": "DATASET6", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset5", + label="DATASET5", + csv_path="/opt/data/dataset5.csv", + ), + DatasetInfo( + code="dataset6", + label="DATASET6", + csv_path="/opt/data/dataset6.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -2432,11 +2785,13 @@ def test_data_model_registry_missing_data_model_attributes(worker_landscape_aggr ), ), "data_model:2": DataModelMetadata( - datasets_labels=DatasetsLabels( - datasets_labels={ - "dataset2": "DATASET2", - } - ), + dataset_infos=[ + DatasetInfo( + code="dataset2", + label="DATASET2", + csv_path="/opt/data/dataset2.csv", + ), + ], cdes=CommonDataElements( values={ "dataset": CommonDataElement( @@ -2523,14 +2878,26 @@ def test_data_model_registry_missing_data_model_attributes(worker_landscape_aggr datasets_locations=DatasetsLocations( datasets_locations={ "data_model:1": { - "dataset3": "localworker2", - "dataset4": "localworker2", - "dataset5": "localworker3", - "dataset6": "localworker3", + "dataset3": DatasetLocation( + worker_id="localworker2", csv_path="/opt/data/dataset3.csv" + ), + "dataset4": DatasetLocation( + worker_id="localworker2", csv_path="/opt/data/dataset4.csv" + ), + "dataset5": DatasetLocation( + worker_id="localworker3", csv_path="/opt/data/dataset5.csv" + ), + "dataset6": DatasetLocation( + worker_id="localworker3", csv_path="/opt/data/dataset6.csv" + ), }, "data_model:2": { - "dataset1": "localworker1", - "dataset2": "localworker3", + "dataset1": DatasetLocation( + worker_id="localworker1", csv_path="/opt/data/dataset1.csv" + ), + "dataset2": DatasetLocation( + worker_id="localworker3", csv_path="/opt/data/dataset2.csv" + ), }, } ),