Skip to content

Commit

Permalink
Added test datasets.
Browse files Browse the repository at this point in the history
Exclude this test data from Exareme2 flow
  • Loading branch information
Kostas Filippopolitis committed Jul 15, 2024
1 parent 847826a commit cab4379
Show file tree
Hide file tree
Showing 15 changed files with 593 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ def get_worker_ids_with_any_of_datasets(
self.datasets_locations.datasets_locations[data_model][dataset].worker_id
for dataset in self.datasets_locations.datasets_locations[data_model]
if dataset in datasets
if self.datasets_locations.datasets_locations[data_model][dataset].worker_id
!= "globalworker"
]
return list(set(local_workers_with_datasets))

Expand Down Expand Up @@ -547,7 +549,7 @@ def get_data_models_attributes(self) -> Dict[str, DataModelAttributes]:

def _fetch_workers_metadata(
self,
) -> Tuple[List[WorkerInfo], DataModelsMetadataPerWorker,]:
) -> Tuple[List[WorkerInfo], DataModelsMetadataPerWorker]:
"""
Returns a list of all the workers in the federation and their metadata (data_models, datasets, cdes).
"""
Expand Down
94 changes: 80 additions & 14 deletions tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,18 +443,20 @@ def get_worker_configs():
worker_configs.append(worker_config)
return worker_configs

def filter_worker_configs(worker_configs, worker):
def filter_worker_configs(worker_configs, worker, node_type):
"""
Filter worker configurations based on a specific worker identifier.
Filter worker configurations based on a specific worker identifier and node type.
:param worker_configs: A list of all worker configurations.
:param worker: The identifier of the worker to filter for.
:param node_type: The type of node to filter for (default is "localworker").
:return: A list of tuples containing worker identifiers and ports.
"""
return [
(config["identifier"], config["monetdb"]["port"])
for config in worker_configs
if not worker or config["identifier"] == worker
if (not worker or config["identifier"] == worker)
and config["role"] == node_type
]

def load_data_model_metadata(c, cdes_file, worker_id_and_ports):
Expand Down Expand Up @@ -523,7 +525,9 @@ def load_datasets(
[
f"{dirpath}/{file}"
for file in filenames
if file.endswith(".csv") and not file.endswith("0.csv")
if file.endswith(".csv")
and not file.endswith("0.csv")
and not file.endswith("test.csv")
]
)
worker_id_and_ports_cycle = itertools.cycle(worker_id_and_ports[1:])
Expand All @@ -536,16 +540,50 @@ def load_datasets(
cmd = f"poetry run mipdb add-dataset {csv} -d {data_model_code} -v {data_model_version} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(worker_id)}"
run(c, cmd)

# Retrieve and filter worker configurations
def load_test_datasets(
c,
dirpath,
filenames,
data_model_code,
data_model_version,
worker_id_and_ports,
use_sockets,
):
"""
Load datasets ending with 'test' into the global worker.
:param c: The context object.
:param dirpath: Directory path of the current dataset.
:param filenames: List of filenames in the current directory.
:param data_model_code: The data model code.
:param data_model_version: The data model version.
:param worker_id_and_ports: A list of tuples containing worker identifiers and ports.
:param use_sockets: Flag to determine if data will be loaded via sockets.
"""
test_csvs = sorted(
[f"{dirpath}/{file}" for file in filenames if file.endswith("test.csv")]
)
for csv in test_csvs:
worker_id, port = worker_id_and_ports[0]
message(
f"Loading test dataset {pathlib.PurePath(csv).name} in MonetDB at port {port}...",
Level.HEADER,
)
cmd = f"poetry run mipdb add-dataset {csv} -d {data_model_code} -v {data_model_version} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(worker_id)}"
run(c, cmd)

# Retrieve and filter worker configurations for local workers
worker_configs = get_worker_configs()
worker_id_and_ports = filter_worker_configs(worker_configs, worker)
local_worker_id_and_ports = filter_worker_configs(
worker_configs, worker, "LOCALWORKER"
)

if not worker_id_and_ports:
raise Exception("Worker config files cannot be loaded.")
if not local_worker_id_and_ports:
raise Exception("Local worker config files cannot be loaded.")

# If only one worker is specified, load the entire folder to that worker
if len(worker_id_and_ports) == 1:
worker_id, port = worker_id_and_ports[0]
# If only one local worker is specified, load the entire folder to that worker
if len(local_worker_id_and_ports) == 1:
worker_id, port = local_worker_id_and_ports[0]
cmd = f"poetry run mipdb load-folder {TEST_DATA_FOLDER} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(worker_id)}"
message(
f"Loading the folder '{TEST_DATA_FOLDER}' in MonetDB at port {port}...",
Expand All @@ -554,15 +592,15 @@ def load_datasets(
run(c, cmd)
return

# Process each dataset in the TEST_DATA_FOLDER
# Process each dataset in the TEST_DATA_FOLDER for local workers
for dirpath, dirnames, filenames in os.walk(TEST_DATA_FOLDER):
if "CDEsMetadata.json" not in filenames:
continue
cdes_file = os.path.join(dirpath, "CDEsMetadata.json")

# Load data model metadata
data_model_code, data_model_version = load_data_model_metadata(
c, cdes_file, worker_id_and_ports
c, cdes_file, local_worker_id_and_ports
)

# Load datasets
Expand All @@ -572,7 +610,35 @@ def load_datasets(
filenames,
data_model_code,
data_model_version,
worker_id_and_ports,
local_worker_id_and_ports,
use_sockets,
)

# Retrieve and filter worker configurations for global worker
global_worker_id_and_ports = filter_worker_configs(
worker_configs, worker, "GLOBALWORKER"
)

if not global_worker_id_and_ports:
raise Exception("Global worker config files cannot be loaded.")

# Process each dataset in the TEST_DATA_FOLDER for global worker
for dirpath, dirnames, filenames in os.walk(TEST_DATA_FOLDER):
if "CDEsMetadata.json" not in filenames:
continue
cdes_file = os.path.join(dirpath, "CDEsMetadata.json")

# Load data model metadata
data_model_code, data_model_version = load_data_model_metadata(
c, cdes_file, global_worker_id_and_ports
)
load_test_datasets(
c,
dirpath,
filenames,
data_model_code,
data_model_version,
global_worker_id_and_ports,
use_sockets,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def test_logistic_regression(get_algorithm_result):
"ppmi7",
"ppmi8",
"ppmi9",
"ppmi_test",
],
"filters": None,
},
Expand All @@ -23,7 +24,8 @@ def test_logistic_regression(get_algorithm_result):
}
input["type"] = "flower"
algorithm_result = get_algorithm_result("logistic_regression", input)
assert algorithm_result == {"accuracy": 0.6180758017492711} or algorithm_result == {
print(algorithm_result)
assert algorithm_result == {"accuracy": 0.63} or algorithm_result == {
"accuracy": 0.3819241982507289
}

Expand All @@ -45,6 +47,7 @@ def test_logistic_regression_with_filters(get_algorithm_result):
"ppmi7",
"ppmi8",
"ppmi9",
"ppmi_test",
],
"filters": {
"condition": "AND",
Expand All @@ -67,6 +70,6 @@ def test_logistic_regression_with_filters(get_algorithm_result):
input["type"] = "flower"
algorithm_result = get_algorithm_result("logistic_regression", input)
print(algorithm_result)
assert algorithm_result == {"accuracy": 0.7755681818181818} or algorithm_result == {
assert algorithm_result == {"accuracy": 0.7884615384615384} or algorithm_result == {
"accuracy": 0.22443181818181818
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def test_mnist_logistic_regression(get_algorithm_result):
"ppmi7",
"ppmi8",
"ppmi9",
"ppmi_test",
],
"filters": None,
},
Expand Down
12 changes: 12 additions & 0 deletions tests/test_data/dementia_v_0_1/CDEsMetadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
"code": "ppmi9",
"label": "PPMI_9"
},
{
"code": "ppmi_test",
"label": "PPMI_TEST"
},
{
"code": "edsd0",
"label": "EDSD_0"
Expand Down Expand Up @@ -86,6 +90,10 @@
"code": "edsd9",
"label": "EDSD_9"
},
{
"code": "edsd_test",
"label": "EDSD_TEST"
},
{
"code": "desd-synthdata0",
"label": "DESD-synthdata_0"
Expand Down Expand Up @@ -125,6 +133,10 @@
{
"code": "desd-synthdata9",
"label": "DESD-synthdata_9"
},
{
"code": "desd-synthdata_test",
"label": "DESD-synthdata_TEST"
}
],
"label": "Dataset",
Expand Down
Loading

0 comments on commit cab4379

Please sign in to comment.