diff --git a/README.md b/README.md index 79acbff7a..ed6d5aec5 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,8 @@ monetdb_nclients = 128 monetdb_memory_limit = 2048 # MB - algorithm_folders = "./exareme2/algorithms/exareme2,./exareme2/algorithms/flower,./tests/algorithms" + exareme2_algorithm_folders = "./exareme2/algorithms/exareme2,./tests/algorithms/exareme2" + flower_algorithm_folders = "./exareme2/algorithms/flower,./tests/algorithms/flower" worker_landscape_aggregator_update_interval = 30 flower_execution_timeout = 30 diff --git a/exareme2/__init__.py b/exareme2/__init__.py index c513669ab..4c60e5219 100644 --- a/exareme2/__init__.py +++ b/exareme2/__init__.py @@ -14,18 +14,20 @@ __all__ = [ "DType", "AttrDict", - "ALGORITHM_FOLDERS_ENV_VARIABLE", - "ALGORITHM_FOLDERS", - "algorithm_classes", + "EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE", + "EXAREME2_ALGORITHM_FOLDERS", + "exareme2_algorithm_classes", "DATA_TABLE_PRIMARY_KEY", + "FLOWER_ALGORITHM_FOLDERS_ENV_VARIABLE", + "FLOWER_ALGORITHM_FOLDERS", ] DATA_TABLE_PRIMARY_KEY = "row_id" -ALGORITHM_FOLDERS_ENV_VARIABLE = "ALGORITHM_FOLDERS" -ALGORITHM_FOLDERS = "./exareme2/algorithms/exareme2,./exareme2/algorithms/flower" -if algorithm_folders := os.getenv(ALGORITHM_FOLDERS_ENV_VARIABLE): - ALGORITHM_FOLDERS = algorithm_folders +EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE = "EXAREME2_ALGORITHM_FOLDERS" +EXAREME2_ALGORITHM_FOLDERS = "./exareme2/algorithms/exareme2" +if exareme2_algorithm_folders := os.getenv(EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE): + EXAREME2_ALGORITHM_FOLDERS = exareme2_algorithm_folders class AlgorithmNamesMismatchError(Exception): @@ -46,13 +48,13 @@ def __init__(self, mismatches, algorithm_classes, algorithm_data_loaders): self.message = message -def import_algorithm_modules() -> Dict[str, ModuleType]: +def import_exareme2_algorithm_modules() -> Dict[str, ModuleType]: # Import all algorithm modules # Import all .py modules in the algorithm folder paths # https://stackoverflow.com/questions/67631/how-to-import-a-module-given-the-full-path?page=1&tab=votes#tab-top all_modules = {} - for algorithm_folder in ALGORITHM_FOLDERS.split(","): + for algorithm_folder in EXAREME2_ALGORITHM_FOLDERS.split(","): all_module_paths = glob.glob(f"{algorithm_folder}/*.py") algorithm_module_paths = [ module @@ -84,14 +86,14 @@ def import_algorithm_modules() -> Dict[str, ModuleType]: return all_modules -import_algorithm_modules() +import_exareme2_algorithm_modules() -def get_algorithm_classes() -> Dict[str, type]: +def get_exareme2_algorithm_classes() -> Dict[str, type]: return {cls.algname: cls for cls in Algorithm.__subclasses__()} -def get_algorithm_data_loaders() -> Dict[str, type]: +def get_exareme2_algorithm_data_loaders() -> Dict[str, type]: return {cls.algname: cls for cls in AlgorithmDataLoader.__subclasses__()} @@ -103,8 +105,46 @@ def _check_algo_naming_matching(algo_classes: dict, algo_data_loaders: dict): raise AlgorithmNamesMismatchError(sym_diff, algo_classes, algo_data_loaders) -algorithm_classes = get_algorithm_classes() -algorithm_data_loaders = get_algorithm_data_loaders() +exareme2_algorithm_classes = get_exareme2_algorithm_classes() +exareme2_algorithm_data_loaders = get_exareme2_algorithm_data_loaders() _check_algo_naming_matching( - algo_classes=algorithm_classes, algo_data_loaders=algorithm_data_loaders + algo_classes=exareme2_algorithm_classes, + algo_data_loaders=exareme2_algorithm_data_loaders, +) + + +def find_flower_algorithm_folder_paths(algorithm_folders): + # Split the input string into a list of folder paths + folder_paths = algorithm_folders.split(",") + + # Initialize an empty dictionary to store the result + algorithm_folder_paths = {} + + # Iterate over each folder path + for folder_path in folder_paths: + if not os.path.isdir(folder_path): + continue # Skip if the path is not a valid directory + + # List all files and folders in the current folder path + items = os.listdir(folder_path) + + # Filter for .json files and corresponding folders + for item in items: + if item.endswith(".json"): + algorithm_name = item[:-5] # Remove '.json' to get the algorithm name + algorithm_folder = os.path.join(folder_path, algorithm_name) + if os.path.isdir(algorithm_folder): + # Store the algorithm name and the complete folder path in the dictionary + algorithm_folder_paths[algorithm_name] = algorithm_folder + + return algorithm_folder_paths + + +FLOWER_ALGORITHM_FOLDERS_ENV_VARIABLE = "FLOWER_ALGORITHM_FOLDERS" +FLOWER_ALGORITHM_FOLDERS = "./exareme2/algorithms/flower" +if flower_algorithm_folders := os.getenv(FLOWER_ALGORITHM_FOLDERS_ENV_VARIABLE): + FLOWER_ALGORITHM_FOLDERS = flower_algorithm_folders + +flower_algorithm_folder_paths = find_flower_algorithm_folder_paths( + FLOWER_ALGORITHM_FOLDERS ) diff --git a/exareme2/algorithms/flower/process_manager.py b/exareme2/algorithms/flower/process_manager.py index 3f85eec7c..94a887cfe 100644 --- a/exareme2/algorithms/flower/process_manager.py +++ b/exareme2/algorithms/flower/process_manager.py @@ -4,8 +4,6 @@ import psutil -ALGORITHMS_ROOT = Path(__file__).parent - def process_status(proc): """Check the status of a process.""" @@ -108,9 +106,8 @@ def start(self, logger): if self.proc is not None: logger.error("Process already started!") raise RuntimeError("Process already started!") - flower_executable = ALGORITHMS_ROOT / self.file env = {**os.environ, **{k: str(v) for k, v in self.env_vars.items()}} - command = ["poetry", "run", "python", str(flower_executable), *self.parameters] + command = ["poetry", "run", "python", str(self.file), *self.parameters] logger.info(f"Executing command: {command}") self.proc = subprocess.Popen( command, env=env, stdout=self.stdout, stderr=self.stderr diff --git a/exareme2/controller/celery/tasks_handler.py b/exareme2/controller/celery/tasks_handler.py index f20b66425..e36de6798 100644 --- a/exareme2/controller/celery/tasks_handler.py +++ b/exareme2/controller/celery/tasks_handler.py @@ -298,24 +298,34 @@ def queue_healthcheck_task( ) def start_flower_client( - self, request_id, algorithm_name, server_address, csv_paths, execution_timeout + self, + request_id, + algorithm_folder_path, + server_address, + csv_paths, + execution_timeout, ) -> WorkerTaskResult: return self._queue_task( task_signature=TASK_SIGNATURES["start_flower_client"], request_id=request_id, - algorithm_name=algorithm_name, + algorithm_folder_path=algorithm_folder_path, server_address=server_address, csv_paths=csv_paths, execution_timeout=execution_timeout, ) def start_flower_server( - self, request_id, algorithm_name, number_of_clients, server_address, csv_paths + self, + request_id, + algorithm_folder_path, + number_of_clients, + server_address, + csv_paths, ) -> WorkerTaskResult: return self._queue_task( task_signature=TASK_SIGNATURES["start_flower_server"], request_id=request_id, - algorithm_name=algorithm_name, + algorithm_folder_path=algorithm_folder_path, number_of_clients=number_of_clients, server_address=server_address, csv_paths=csv_paths, diff --git a/exareme2/controller/services/api/algorithm_spec_dtos.py b/exareme2/controller/services/api/algorithm_spec_dtos.py index aa8f16172..32ebade81 100644 --- a/exareme2/controller/services/api/algorithm_spec_dtos.py +++ b/exareme2/controller/services/api/algorithm_spec_dtos.py @@ -9,7 +9,8 @@ from pydantic import BaseModel -from exareme2 import ALGORITHM_FOLDERS +from exareme2 import EXAREME2_ALGORITHM_FOLDERS +from exareme2 import FLOWER_ALGORITHM_FOLDERS from exareme2.algorithms.specifications import AlgorithmSpecification from exareme2.algorithms.specifications import AlgorithmType from exareme2.algorithms.specifications import InputDataSpecification @@ -299,7 +300,11 @@ def load_and_parse_specifications(self): @staticmethod def get_specs_paths(): - return [Path(specs_path.strip()) for specs_path in ALGORITHM_FOLDERS.split(",")] + return [ + Path(specs_path.strip()) + for specs_path in EXAREME2_ALGORITHM_FOLDERS.split(",") + + FLOWER_ALGORITHM_FOLDERS.split(",") + ] def parse_specifications(self, specs_path, all_algorithms, all_transformers): for spec_property_path in specs_path.glob("*.json"): diff --git a/exareme2/controller/services/exareme2/controller.py b/exareme2/controller/services/exareme2/controller.py index 93b65dfab..c07160c8e 100644 --- a/exareme2/controller/services/exareme2/controller.py +++ b/exareme2/controller/services/exareme2/controller.py @@ -10,8 +10,8 @@ from typing import List from typing import Optional -from exareme2 import algorithm_classes -from exareme2 import algorithm_data_loaders +from exareme2 import exareme2_algorithm_classes +from exareme2 import exareme2_algorithm_data_loaders from exareme2.algorithms.exareme2.algorithm import AlgorithmDataLoader from exareme2.algorithms.exareme2.algorithm import ( InitializationParams as AlgorithmInitParams, @@ -531,7 +531,7 @@ def __init__( ): self._algorithm_name = algorithm_name self._variables = variables - self._algorithm_data_loader = algorithm_data_loaders[algorithm_name]( + self._algorithm_data_loader = exareme2_algorithm_data_loaders[algorithm_name]( variables=variables ) self._algorithm_request_dto = algorithm_request_dto @@ -598,7 +598,7 @@ async def run(self, data, metadata): X = data_transformed[0] y = data_transformed[1] alg_vars = Variables(x=X.columns, y=y.columns) - algorithm_data_loader = algorithm_data_loaders[self._algorithm_name]( + algorithm_data_loader = exareme2_algorithm_data_loaders[self._algorithm_name]( variables=alg_vars ) @@ -692,7 +692,7 @@ async def run(self, data, metadata): algorithm_parameters=self._params, datasets=self._datasets, ) - algorithm = algorithm_classes[self._algorithm_name]( + algorithm = exareme2_algorithm_classes[self._algorithm_name]( initialization_params=init_params, data_loader=self._algorithm_data_loader, engine=self._engine, diff --git a/exareme2/controller/services/flower/controller.py b/exareme2/controller/services/flower/controller.py index d71116df0..2a981ee06 100644 --- a/exareme2/controller/services/flower/controller.py +++ b/exareme2/controller/services/flower/controller.py @@ -2,6 +2,7 @@ from typing import Dict from typing import List +from exareme2 import flower_algorithm_folder_paths from exareme2.controller import config as ctrl_config from exareme2.controller import logger as ctrl_logger from exareme2.controller.federation_info_logs import log_experiment_execution @@ -91,10 +92,10 @@ async def exec_algorithm(self, algorithm_name, algorithm_request_dto): server_pid = None clients_pids = {} server_address = f"{server_ip}:{FLOWER_SERVER_PORT}" - + algorithm_folder_path = flower_algorithm_folder_paths[algorithm_name] try: server_pid = server_task_handler.start_flower_server( - algorithm_name, + algorithm_folder_path, len(task_handlers), str(server_address), csv_paths_per_worker_id[server_id] @@ -103,7 +104,7 @@ async def exec_algorithm(self, algorithm_name, algorithm_request_dto): ) clients_pids = { handler.start_flower_client( - algorithm_name, + algorithm_folder_path, str(server_address), csv_paths_per_worker_id[handler.worker_id], ctrl_config.flower_execution_timeout, diff --git a/exareme2/controller/services/flower/tasks_handler.py b/exareme2/controller/services/flower/tasks_handler.py index a74df3c35..938476d29 100644 --- a/exareme2/controller/services/flower/tasks_handler.py +++ b/exareme2/controller/services/flower/tasks_handler.py @@ -30,22 +30,26 @@ def worker_data_address(self) -> str: return self._db_address def start_flower_client( - self, algorithm_name, server_address, csv_paths, execution_timeout + self, algorithm_folder_path, server_address, csv_paths, execution_timeout ) -> int: return self._worker_tasks_handler.start_flower_client( self._request_id, - algorithm_name, + algorithm_folder_path, server_address, csv_paths, execution_timeout, ).get(timeout=self._tasks_timeout) def start_flower_server( - self, algorithm_name: str, number_of_clients: int, server_address, csv_paths + self, + algorithm_folder_path: str, + number_of_clients: int, + server_address, + csv_paths, ) -> int: return self._worker_tasks_handler.start_flower_server( self._request_id, - algorithm_name, + algorithm_folder_path, number_of_clients, server_address, csv_paths, diff --git a/exareme2/worker/flower/starter/starter_api.py b/exareme2/worker/flower/starter/starter_api.py index f2f5e0977..a62392802 100644 --- a/exareme2/worker/flower/starter/starter_api.py +++ b/exareme2/worker/flower/starter/starter_api.py @@ -5,21 +5,21 @@ @shared_task def start_flower_client( - request_id: str, algorithm_name, server_address, csv_paths, execution_timeout + request_id: str, algorithm_folder_path, server_address, csv_paths, execution_timeout ) -> int: return starter_service.start_flower_client( - request_id, algorithm_name, server_address, csv_paths, execution_timeout + request_id, algorithm_folder_path, server_address, csv_paths, execution_timeout ) @shared_task def start_flower_server( request_id: str, - algorithm_name: str, + algorithm_folder_path: str, number_of_clients: int, server_address, csv_paths, ) -> int: return starter_service.start_flower_server( - request_id, algorithm_name, number_of_clients, server_address, csv_paths + request_id, algorithm_folder_path, number_of_clients, server_address, csv_paths ) diff --git a/exareme2/worker/flower/starter/starter_service.py b/exareme2/worker/flower/starter/starter_service.py index d34516335..ab87a6068 100644 --- a/exareme2/worker/flower/starter/starter_service.py +++ b/exareme2/worker/flower/starter/starter_service.py @@ -6,7 +6,7 @@ @initialise_logger def start_flower_client( - request_id: str, algorithm_name, server_address, csv_paths, execution_timeout + request_id: str, algorithm_folder_path, server_address, csv_paths, execution_timeout ) -> int: env_vars = { "MONETDB_IP": worker_config.monetdb.ip, @@ -25,7 +25,7 @@ def start_flower_client( "CSV_PATHS": ",".join(csv_paths), "TIMEOUT": execution_timeout, } - process = FlowerProcess(f"{algorithm_name}/client.py", env_vars=env_vars) + process = FlowerProcess(f"{algorithm_folder_path}/client.py", env_vars=env_vars) logger = get_logger() logger.info("Starting client.py") @@ -37,7 +37,7 @@ def start_flower_client( @initialise_logger def start_flower_server( request_id: str, - algorithm_name: str, + algorithm_folder_path: str, number_of_clients: int, server_address, csv_paths, @@ -53,7 +53,7 @@ def start_flower_server( "DATA_PATH": worker_config.data_path, "CSV_PATHS": ",".join(csv_paths), } - process = FlowerProcess(f"{algorithm_name}/server.py", env_vars=env_vars) + process = FlowerProcess(f"{algorithm_folder_path}/server.py", env_vars=env_vars) logger = get_logger() logger.info("Starting server.py") pid = process.start(logger) diff --git a/tasks.py b/tasks.py index bdf530845..4501ddcfc 100644 --- a/tasks.py +++ b/tasks.py @@ -57,6 +57,7 @@ import pathlib import shutil import sys +import time from enum import Enum from itertools import cycle from os import listdir @@ -94,7 +95,8 @@ TEST_DATA_FOLDER = PROJECT_ROOT / "tests" / "test_data" -ALGORITHM_FOLDERS_ENV_VARIABLE = "ALGORITHM_FOLDERS" +EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE = "EXAREME2_ALGORITHM_FOLDERS" +FLOWER_ALGORITHM_FOLDERS_ENV_VARIABLE = "FLOWER_ALGORITHM_FOLDERS" EXAREME2_WORKER_CONFIG_FILE = "EXAREME2_WORKER_CONFIG_FILE" SMPC_COORDINATOR_PORT = 12314 @@ -473,14 +475,38 @@ def load_data_model_metadata(c, cdes_file, worker_id_and_ports): data_model_code = data_model_metadata["code"] data_model_version = data_model_metadata["version"] + def run_with_retries(c, cmd, retries=5, wait_seconds=1): + """Attempts to run a command, retrying in case of failure.""" + attempt = 0 + while attempt < retries: + try: + run(c, cmd) # Try to run the command + return # Exit if successful + except Exception as e: + attempt += 1 + if attempt < retries: + message( + f"Attempt {attempt} failed. Retrying in {wait_seconds} seconds...", + Level.WARNING, + ) + time.sleep(wait_seconds) # Wait before retrying + else: + message( + f"All {retries} attempts failed. Error: {str(e)}", + Level.ERROR, + ) + raise e # Re-raise the last exception after all retries + + # Main loop for loading data models with retries for worker_id, port in worker_id_and_ports: message( f"Loading data model '{data_model_code}:{data_model_version}' metadata in MonetDB at port {port}...", Level.HEADER, ) cmd = f"poetry run mipdb add-data-model {cdes_file} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(worker_id)}" - run(c, cmd) + # Try running the command with retries + run_with_retries(c, cmd) return data_model_code, data_model_version def load_datasets( @@ -766,6 +792,15 @@ def kill_worker(c, worker=None, all_=False): message("No celery instances found", Level.HEADER) +def validate_algorithm_folders(folders, name): + """Validates and retrieves the algorithm folder configuration.""" + if not folders: + folders = get_deployment_config(name) + if not isinstance(folders, str): + raise ValueError(f"The {name} configuration must be a comma-separated string.") + return folders + + @task def start_worker( c, @@ -773,7 +808,8 @@ def start_worker( all_=False, framework_log_level=None, detached=False, - algorithm_folders=None, + exareme2_algorithm_folders=None, + flower_algorithm_folders=None, ): """ (Re)Start the worker(s) service(s). If a worker service is running, stop and start it again. @@ -782,7 +818,8 @@ def start_worker( :param all_: If set, the workers of which the configuration file exists, will be started. :param framework_log_level: If not provided, it will look into the `DEPLOYMENT_CONFIG_FILE`. :param detached: If set to True, it will start the service in the background. - :param algorithm_folders: Used from the api. If not provided, it looks in the `DEPLOYMENT_CONFIG_FILE`. + :param exareme2_algorithm_folders: Used from the api. If not provided, it looks in the `DEPLOYMENT_CONFIG_FILE`. + :param flower_algorithm_folders: Used from the api. If not provided, it looks in the `DEPLOYMENT_CONFIG_FILE`. The containers related to the api remain unchanged. """ @@ -790,37 +827,45 @@ def start_worker( if not framework_log_level: framework_log_level = get_deployment_config("framework_log_level") - if not algorithm_folders: - algorithm_folders = get_deployment_config("algorithm_folders") - if not isinstance(algorithm_folders, str): - raise ValueError( - "The algorithm_folders configuration must be a comma separated string." - ) + # Validate algorithm folders + exareme2_algorithm_folders = validate_algorithm_folders( + exareme2_algorithm_folders, "exareme2_algorithm_folders" + ) + flower_algorithm_folders = validate_algorithm_folders( + flower_algorithm_folders, "flower_algorithm_folders" + ) worker_ids = get_worker_ids(all_, worker) - worker_ids.sort() # Sorting the ids protects removing a similarly named id, localworker1 would remove localworker10. + worker_ids.sort() # Sorting the ids protects removing a similarly named id for worker_id in worker_ids: kill_worker(c, worker_id) message(f"Starting Worker {worker_id}...", Level.HEADER) worker_config_file = WORKERS_CONFIG_DIR / f"{worker_id}.toml" - with c.prefix(f"export {ALGORITHM_FOLDERS_ENV_VARIABLE}={algorithm_folders}"): - with c.prefix(f"export {EXAREME2_WORKER_CONFIG_FILE}={worker_config_file}"): - outpath = OUTDIR / (worker_id + ".out") - if detached or all_: - cmd = ( - f"PYTHONPATH={PROJECT_ROOT} poetry run celery " - f"-A exareme2.worker.utils.celery_app worker -l {framework_log_level} > {outpath} " - f"--pool=eventlet --purge 2>&1" - ) - run(c, cmd, wait=False) - else: - cmd = ( - f"PYTHONPATH={PROJECT_ROOT} poetry run celery -A " - f"exareme2.worker.utils.celery_app worker -l {framework_log_level} --pool=eventlet --purge" - ) - run(c, cmd, attach_=True) + with c.prefix( + f"export {EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE}={exareme2_algorithm_folders}" + ): + with c.prefix( + f"export {FLOWER_ALGORITHM_FOLDERS_ENV_VARIABLE}={flower_algorithm_folders}" + ): + with c.prefix( + f"export {EXAREME2_WORKER_CONFIG_FILE}={worker_config_file}" + ): + outpath = OUTDIR / (worker_id + ".out") + if detached or all_: + cmd = ( + f"PYTHONPATH={PROJECT_ROOT}: poetry run celery " + f"-A exareme2.worker.utils.celery_app worker -l {framework_log_level} > {outpath} " + f"--pool=eventlet --purge 2>&1" + ) + run(c, cmd, wait=False) + else: + cmd = ( + f"PYTHONPATH={PROJECT_ROOT} poetry run celery -A " + f"exareme2.worker.utils.celery_app worker -l {framework_log_level} --pool=eventlet --purge" + ) + run(c, cmd, attach_=True) @task @@ -837,36 +882,40 @@ def kill_controller(c): @task -def start_controller(c, detached=False, algorithm_folders=None): +def start_controller( + c, detached=False, exareme2_algorithm_folders=None, flower_algorithm_folders=None +): """ (Re)Start the controller service. If the service is already running, stop and start it again. - - :param detached: If set to True, it will start the service in the background. - :param algorithm_folders: Used from the api. If not provided, it looks in the `DEPLOYMENT_CONFIG_FILE`. """ - - if not algorithm_folders: - algorithm_folders = get_deployment_config("algorithm_folders") - if not isinstance(algorithm_folders, str): - raise ValueError( - "The algorithm_folders configuration must be a comma separated string." - ) + # Validate algorithm folders + exareme2_algorithm_folders = validate_algorithm_folders( + exareme2_algorithm_folders, "exareme2_algorithm_folders" + ) + flower_algorithm_folders = validate_algorithm_folders( + flower_algorithm_folders, "flower_algorithm_folders" + ) kill_controller(c) message("Starting Controller...", Level.HEADER) controller_config_file = CONTROLLER_CONFIG_DIR / "controller.toml" - with c.prefix(f"export {ALGORITHM_FOLDERS_ENV_VARIABLE}={algorithm_folders}"): + with c.prefix( + f"export {EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE}={exareme2_algorithm_folders}" + ): with c.prefix( - f"export EXAREME2_CONTROLLER_CONFIG_FILE={controller_config_file}" + f"export {FLOWER_ALGORITHM_FOLDERS_ENV_VARIABLE}={flower_algorithm_folders}" ): - outpath = OUTDIR / "controller.out" - if detached: - cmd = f"PYTHONPATH={PROJECT_ROOT} poetry run hypercorn --config python:exareme2.controller.quart.hypercorn_config -b 0.0.0.0:5000 exareme2/controller/quart/app:app>> {outpath} 2>&1" - run(c, cmd, wait=False) - else: - cmd = f"PYTHONPATH={PROJECT_ROOT} poetry run hypercorn --config python:exareme2.controller.quart.hypercorn_config -b 0.0.0.0:5000 exareme2/controller/quart/app:app" - run(c, cmd, attach_=True) + with c.prefix( + f"export EXAREME2_CONTROLLER_CONFIG_FILE={controller_config_file}" + ): + outpath = OUTDIR / "controller.out" + if detached: + cmd = f"PYTHONPATH={PROJECT_ROOT} poetry run hypercorn --config python:exareme2.controller.quart.hypercorn_config -b 0.0.0.0:5000 exareme2/controller/quart/app:app>> {outpath} 2>&1" + run(c, cmd, wait=False) + else: + cmd = f"PYTHONPATH={PROJECT_ROOT} poetry run hypercorn --config python:exareme2.controller.quart.hypercorn_config -b 0.0.0.0:5000 exareme2/controller/quart/app:app" + run(c, cmd, attach_=True) @task @@ -880,7 +929,8 @@ def deploy( framework_log_level=None, monetdb_image=None, monetdb_nclients=None, - algorithm_folders=None, + exareme2_algorithm_folders=None, + flower_algorithm_folders=None, smpc=None, ): """ @@ -894,7 +944,8 @@ def deploy( :param framework_log_level: Used for the engine api. If not provided, it looks in the `DEPLOYMENT_CONFIG_FILE`. :param monetdb_image: Used for the db containers. If not provided, it looks in the `DEPLOYMENT_CONFIG_FILE`. :param monetdb_nclients: Used for the db containers. If not provided, it looks in the `DEPLOYMENT_CONFIG_FILE`. - :param algorithm_folders: Used from the api. If not provided, it looks in the `DEPLOYMENT_CONFIG_FILE`. + :param exareme2_algorithm_folders: Used from the api. If not provided, it looks in the `DEPLOYMENT_CONFIG_FILE`. + :param flower_algorithm_folders: Used from the api. If not provided, it looks in the `DEPLOYMENT_CONFIG_FILE`. :param smpc: Deploy the SMPC cluster as well. If not provided, it looks in the `DEPLOYMENT_CONFIG_FILE`. """ @@ -910,8 +961,11 @@ def deploy( if not monetdb_nclients: monetdb_nclients = get_deployment_config("monetdb_nclients") - if not algorithm_folders: - algorithm_folders = get_deployment_config("algorithm_folders") + if not exareme2_algorithm_folders: + exareme2_algorithm_folders = get_deployment_config("exareme2_algorithm_folders") + + if not flower_algorithm_folders: + flower_algorithm_folders = get_deployment_config("flower_algorithm_folders") if smpc is None: smpc = get_deployment_config("smpc", subconfig="enabled") @@ -952,12 +1006,18 @@ def deploy( all_=True, framework_log_level=framework_log_level, detached=True, - algorithm_folders=algorithm_folders, + exareme2_algorithm_folders=exareme2_algorithm_folders, + flower_algorithm_folders=flower_algorithm_folders, ) # Start CONTROLLER service if start_controller_ or start_all: - start_controller(c, detached=True, algorithm_folders=algorithm_folders) + start_controller( + c, + detached=True, + exareme2_algorithm_folders=exareme2_algorithm_folders, + flower_algorithm_folders=flower_algorithm_folders, + ) if smpc and not get_deployment_config("smpc", subconfig="coordinator_ip"): deploy_smpc(c) diff --git a/tests/algorithm_validation_tests/five_node_deployment_template.toml b/tests/algorithm_validation_tests/five_node_deployment_template.toml index e3ca5c6e0..bfb5c308e 100644 --- a/tests/algorithm_validation_tests/five_node_deployment_template.toml +++ b/tests/algorithm_validation_tests/five_node_deployment_template.toml @@ -7,7 +7,8 @@ rabbitmq_image = "madgik/exareme2_rabbitmq:testing" monetdb_nclients = 128 monetdb_memory_limit = 2048 # MB -algorithm_folders = "./exareme2/algorithms/exareme2,./exareme2/algorithms/flower,./tests/algorithms" +exareme2_algorithm_folders = "./exareme2/algorithms/exareme2,./tests/algorithms/exareme2" +flower_algorithm_folders = "./exareme2/algorithms/flower,./tests/algorithms/flower" worker_landscape_aggregator_update_interval = 30 flower_execution_timeout = 30 diff --git a/tests/algorithm_validation_tests/one_node_deployment_template.toml b/tests/algorithm_validation_tests/one_node_deployment_template.toml index 95cee1484..18eadd499 100644 --- a/tests/algorithm_validation_tests/one_node_deployment_template.toml +++ b/tests/algorithm_validation_tests/one_node_deployment_template.toml @@ -7,7 +7,8 @@ rabbitmq_image = "madgik/exareme2_rabbitmq:testing" monetdb_nclients = 64 monetdb_memory_limit = 4096 # MB -algorithm_folders = "./exareme2/algorithms/exareme2,./exareme2/algorithms/flower,./tests/algorithms" +exareme2_algorithm_folders = "./exareme2/algorithms/exareme2,./tests/algorithms/exareme2" +flower_algorithm_folders = "./exareme2/algorithms/flower,./tests/algorithms/flower" worker_landscape_aggregator_update_interval = 300 flower_execution_timeout = 30 diff --git a/exareme2/algorithms/flower/mnist_logistic_regression/__init__.py b/tests/algorithms/exareme2/__init__.py similarity index 100% rename from exareme2/algorithms/flower/mnist_logistic_regression/__init__.py rename to tests/algorithms/exareme2/__init__.py diff --git a/tests/algorithms/kfold.py b/tests/algorithms/exareme2/kfold.py similarity index 100% rename from tests/algorithms/kfold.py rename to tests/algorithms/exareme2/kfold.py diff --git a/tests/algorithms/naive_bayes_categorical_testing.py b/tests/algorithms/exareme2/naive_bayes_categorical_testing.py similarity index 100% rename from tests/algorithms/naive_bayes_categorical_testing.py rename to tests/algorithms/exareme2/naive_bayes_categorical_testing.py diff --git a/tests/algorithms/naive_bayes_gaussian_testing.py b/tests/algorithms/exareme2/naive_bayes_gaussian_testing.py similarity index 100% rename from tests/algorithms/naive_bayes_gaussian_testing.py rename to tests/algorithms/exareme2/naive_bayes_gaussian_testing.py diff --git a/tests/algorithms/orphan_udfs.py b/tests/algorithms/exareme2/orphan_udfs.py similarity index 100% rename from tests/algorithms/orphan_udfs.py rename to tests/algorithms/exareme2/orphan_udfs.py diff --git a/tests/algorithms/smpc_standard_deviation.json b/tests/algorithms/exareme2/smpc_standard_deviation.json similarity index 99% rename from tests/algorithms/smpc_standard_deviation.json rename to tests/algorithms/exareme2/smpc_standard_deviation.json index 0a2fca653..b173cfdea 100644 --- a/tests/algorithms/smpc_standard_deviation.json +++ b/tests/algorithms/exareme2/smpc_standard_deviation.json @@ -20,4 +20,4 @@ "multiple": false } } -} \ No newline at end of file +} diff --git a/tests/algorithms/smpc_standard_deviation.py b/tests/algorithms/exareme2/smpc_standard_deviation.py similarity index 100% rename from tests/algorithms/smpc_standard_deviation.py rename to tests/algorithms/exareme2/smpc_standard_deviation.py diff --git a/tests/algorithms/smpc_standard_deviation_int_only.json b/tests/algorithms/exareme2/smpc_standard_deviation_int_only.json similarity index 99% rename from tests/algorithms/smpc_standard_deviation_int_only.json rename to tests/algorithms/exareme2/smpc_standard_deviation_int_only.json index 6f9804a47..fadc6efe9 100644 --- a/tests/algorithms/smpc_standard_deviation_int_only.json +++ b/tests/algorithms/exareme2/smpc_standard_deviation_int_only.json @@ -18,4 +18,4 @@ "multiple": false } } -} \ No newline at end of file +} diff --git a/tests/algorithms/smpc_standard_deviation_int_only.py b/tests/algorithms/exareme2/smpc_standard_deviation_int_only.py similarity index 100% rename from tests/algorithms/smpc_standard_deviation_int_only.py rename to tests/algorithms/exareme2/smpc_standard_deviation_int_only.py diff --git a/tests/algorithms/standard_deviation.json b/tests/algorithms/exareme2/standard_deviation.json similarity index 99% rename from tests/algorithms/standard_deviation.json rename to tests/algorithms/exareme2/standard_deviation.json index 5084d0164..ebceb0091 100644 --- a/tests/algorithms/standard_deviation.json +++ b/tests/algorithms/exareme2/standard_deviation.json @@ -16,4 +16,4 @@ "multiple": false } } -} \ No newline at end of file +} diff --git a/tests/algorithms/standard_deviation.py b/tests/algorithms/exareme2/standard_deviation.py similarity index 100% rename from tests/algorithms/standard_deviation.py rename to tests/algorithms/exareme2/standard_deviation.py diff --git a/tests/algorithms/standard_deviation_pos_and_kw_args.json b/tests/algorithms/exareme2/standard_deviation_pos_and_kw_args.json similarity index 99% rename from tests/algorithms/standard_deviation_pos_and_kw_args.json rename to tests/algorithms/exareme2/standard_deviation_pos_and_kw_args.json index 6e3a2f926..0f0d7b735 100644 --- a/tests/algorithms/standard_deviation_pos_and_kw_args.json +++ b/tests/algorithms/exareme2/standard_deviation_pos_and_kw_args.json @@ -15,4 +15,4 @@ "multiple":false } } -} \ No newline at end of file +} diff --git a/tests/algorithms/standard_deviation_pos_and_kw_args.py b/tests/algorithms/exareme2/standard_deviation_pos_and_kw_args.py similarity index 100% rename from tests/algorithms/standard_deviation_pos_and_kw_args.py rename to tests/algorithms/exareme2/standard_deviation_pos_and_kw_args.py diff --git a/tests/algorithms/test_kfold.json b/tests/algorithms/exareme2/test_kfold.json similarity index 99% rename from tests/algorithms/test_kfold.json rename to tests/algorithms/exareme2/test_kfold.json index 70aa98455..008a36b81 100644 --- a/tests/algorithms/test_kfold.json +++ b/tests/algorithms/exareme2/test_kfold.json @@ -45,4 +45,4 @@ "max":20 } } -} \ No newline at end of file +} diff --git a/tests/algorithms/test_nb_categorical_fit.json b/tests/algorithms/exareme2/test_nb_categorical_fit.json similarity index 100% rename from tests/algorithms/test_nb_categorical_fit.json rename to tests/algorithms/exareme2/test_nb_categorical_fit.json diff --git a/tests/algorithms/test_nb_categorical_predict.json b/tests/algorithms/exareme2/test_nb_categorical_predict.json similarity index 100% rename from tests/algorithms/test_nb_categorical_predict.json rename to tests/algorithms/exareme2/test_nb_categorical_predict.json diff --git a/tests/algorithms/test_nb_gaussian_fit.json b/tests/algorithms/exareme2/test_nb_gaussian_fit.json similarity index 100% rename from tests/algorithms/test_nb_gaussian_fit.json rename to tests/algorithms/exareme2/test_nb_gaussian_fit.json diff --git a/tests/algorithms/test_nb_gaussian_predict.json b/tests/algorithms/exareme2/test_nb_gaussian_predict.json similarity index 100% rename from tests/algorithms/test_nb_gaussian_predict.json rename to tests/algorithms/exareme2/test_nb_gaussian_predict.json diff --git a/tests/algorithms/flower/__init__.py b/tests/algorithms/flower/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/exareme2/algorithms/flower/mnist_logistic_regression.json b/tests/algorithms/flower/mnist_logistic_regression.json similarity index 100% rename from exareme2/algorithms/flower/mnist_logistic_regression.json rename to tests/algorithms/flower/mnist_logistic_regression.json diff --git a/exareme2/algorithms/flower/mnist_logistic_regression/X_test.npy b/tests/algorithms/flower/mnist_logistic_regression/X_test.npy similarity index 100% rename from exareme2/algorithms/flower/mnist_logistic_regression/X_test.npy rename to tests/algorithms/flower/mnist_logistic_regression/X_test.npy diff --git a/tests/algorithms/flower/mnist_logistic_regression/__init__.py b/tests/algorithms/flower/mnist_logistic_regression/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/exareme2/algorithms/flower/mnist_logistic_regression/client.py b/tests/algorithms/flower/mnist_logistic_regression/client.py similarity index 74% rename from exareme2/algorithms/flower/mnist_logistic_regression/client.py rename to tests/algorithms/flower/mnist_logistic_regression/client.py index 7a34fd612..561546fb4 100644 --- a/exareme2/algorithms/flower/mnist_logistic_regression/client.py +++ b/tests/algorithms/flower/mnist_logistic_regression/client.py @@ -1,5 +1,7 @@ import os +import time import warnings +from math import log2 import flwr as fl import numpy as np @@ -7,7 +9,7 @@ from sklearn.linear_model import LogisticRegression from sklearn.metrics import log_loss -from exareme2.algorithms.flower.mnist_logistic_regression import utils +from tests.algorithms.flower.mnist_logistic_regression import utils if __name__ == "__main__": # Load data from file @@ -66,6 +68,23 @@ def evaluate(self, parameters, config): return loss, len(X_test), {"accuracy": accuracy} # Start Flower client - fl.client.start_client( - server_address=os.environ["SERVER_ADDRESS"], client=MnistClient().to_client() - ) + + attempts = 0 + max_attempts = int(log2(int(os.environ["TIMEOUT"]))) + while True: + try: + fl.client.start_client( + server_address=os.environ["SERVER_ADDRESS"], + client=MnistClient().to_client(), + ) + FLOWER_LOGGER.debug(f"Connection successful on attempt: {attempts + 1}") + break + except Exception as e: + FLOWER_LOGGER.warning( + f"Connection with the server failed. Attempt {attempts + 1} failed: {e}" + ) + time.sleep(pow(2, attempts)) + attempts += 1 + if attempts >= max_attempts: + FLOWER_LOGGER.error("Could not establish connection to the server.") + raise e diff --git a/exareme2/algorithms/flower/mnist_logistic_regression/server.py b/tests/algorithms/flower/mnist_logistic_regression/server.py similarity index 96% rename from exareme2/algorithms/flower/mnist_logistic_regression/server.py rename to tests/algorithms/flower/mnist_logistic_regression/server.py index 124d76588..271f180af 100644 --- a/exareme2/algorithms/flower/mnist_logistic_regression/server.py +++ b/tests/algorithms/flower/mnist_logistic_regression/server.py @@ -6,7 +6,7 @@ from sklearn.metrics import log_loss from exareme2.algorithms.flower.inputdata_preprocessing import post_result -from exareme2.algorithms.flower.mnist_logistic_regression import utils +from tests.algorithms.flower.mnist_logistic_regression import utils NUM_OF_ROUNDS = 5 diff --git a/exareme2/algorithms/flower/mnist_logistic_regression/utils.py b/tests/algorithms/flower/mnist_logistic_regression/utils.py similarity index 100% rename from exareme2/algorithms/flower/mnist_logistic_regression/utils.py rename to tests/algorithms/flower/mnist_logistic_regression/utils.py diff --git a/exareme2/algorithms/flower/mnist_logistic_regression/y_test.npy b/tests/algorithms/flower/mnist_logistic_regression/y_test.npy similarity index 100% rename from exareme2/algorithms/flower/mnist_logistic_regression/y_test.npy rename to tests/algorithms/flower/mnist_logistic_regression/y_test.npy diff --git a/tests/standalone_tests/algorithms/exareme2/test_udfs.py b/tests/standalone_tests/algorithms/exareme2/test_udfs.py index b42cb7215..75c46ad5c 100644 --- a/tests/standalone_tests/algorithms/exareme2/test_udfs.py +++ b/tests/standalone_tests/algorithms/exareme2/test_udfs.py @@ -23,7 +23,7 @@ from exareme2.worker_communication import WorkerUDFKeyArguments from exareme2.worker_communication import WorkerUDFPosArguments from exareme2.worker_communication import WorkerUDFResults -from tests.algorithms.orphan_udfs import local_step +from tests.algorithms.exareme2.orphan_udfs import local_step from tests.standalone_tests.conftest import TASKS_TIMEOUT from tests.standalone_tests.conftest import insert_data_to_db from tests.standalone_tests.controller.workers_communication_helper import ( diff --git a/tests/standalone_tests/algorithms/flower/test_process_manager.py b/tests/standalone_tests/algorithms/flower/test_process_manager.py index 34bd4f9ea..4ad122e98 100644 --- a/tests/standalone_tests/algorithms/flower/test_process_manager.py +++ b/tests/standalone_tests/algorithms/flower/test_process_manager.py @@ -1,13 +1,10 @@ import os -import signal import unittest -from unittest import mock from unittest.mock import MagicMock from unittest.mock import patch import psutil -from exareme2.algorithms.flower.process_manager import ALGORITHMS_ROOT from exareme2.algorithms.flower.process_manager import FlowerProcess from exareme2.algorithms.flower.process_manager import handle_zombie from exareme2.algorithms.flower.process_manager import terminate_process @@ -21,7 +18,7 @@ def test_start_process(self, mock_popen): logger = MagicMock() mock_popen.return_value.pid = 12345 - expected_script_path = os.path.join(ALGORITHMS_ROOT, "script.py") + expected_script_path = os.path.join("algorithm_path", "script.py") # Starting the process pid = process.start(logger) diff --git a/tests/standalone_tests/conftest.py b/tests/standalone_tests/conftest.py index a638ee509..2957fd797 100644 --- a/tests/standalone_tests/conftest.py +++ b/tests/standalone_tests/conftest.py @@ -25,8 +25,11 @@ from exareme2.controller.services.exareme2.tasks_handler import Exareme2TasksHandler from exareme2.worker_communication import TableSchema -ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE = ( - "./exareme2/algorithms/exareme2,./exareme2/algorithms/flower,./tests/algorithms" +EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE = ( + "./exareme2/algorithms/exareme2,./tests/algorithms" +) +FLOWER_ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE = ( + "./exareme2/algorithms/flower,./tests/algorithms" ) TESTING_RABBITMQ_CONT_IMAGE = "madgik/exareme2_rabbitmq:dev" TESTING_MONETDB_CONT_IMAGE = "madgik/exareme2_db:dev" @@ -920,7 +923,7 @@ def _create_worker_service(algo_folders_env_variable_val, worker_config_filepath os.remove(logpath) env = os.environ.copy() - env["ALGORITHM_FOLDERS"] = algo_folders_env_variable_val + env["EXAREME2_ALGORITHM_FOLDERS"] = algo_folders_env_variable_val env["EXAREME2_WORKER_CONFIG_FILE"] = worker_config_filepath cmd = f"poetry run celery -A exareme2.worker.utils.celery_app worker -l DEBUG >> {logpath} --pool=eventlet --purge 2>&1 " @@ -965,7 +968,7 @@ def kill_service(proc): @pytest.fixture(scope="session") def globalworker_worker_service(rabbitmq_globalworker, monetdb_globalworker): worker_config_file = GLOBALWORKER_CONFIG_FILE - algo_folders_env_variable_val = ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE + algo_folders_env_variable_val = EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE worker_config_filepath = path.join(TEST_ENV_CONFIG_FOLDER, worker_config_file) proc = _create_worker_service(algo_folders_env_variable_val, worker_config_filepath) yield @@ -975,7 +978,7 @@ def globalworker_worker_service(rabbitmq_globalworker, monetdb_globalworker): @pytest.fixture(scope="session") def localworker1_worker_service(rabbitmq_localworker1, monetdb_localworker1): worker_config_file = LOCALWORKER1_CONFIG_FILE - algo_folders_env_variable_val = ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE + algo_folders_env_variable_val = EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE worker_config_filepath = path.join(TEST_ENV_CONFIG_FOLDER, worker_config_file) proc = _create_worker_service(algo_folders_env_variable_val, worker_config_filepath) yield @@ -985,7 +988,7 @@ def localworker1_worker_service(rabbitmq_localworker1, monetdb_localworker1): @pytest.fixture(scope="session") def localworker2_worker_service(rabbitmq_localworker2, monetdb_localworker2): worker_config_file = LOCALWORKER2_CONFIG_FILE - algo_folders_env_variable_val = ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE + algo_folders_env_variable_val = EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE worker_config_filepath = path.join(TEST_ENV_CONFIG_FOLDER, worker_config_file) proc = _create_worker_service(algo_folders_env_variable_val, worker_config_filepath) yield @@ -997,7 +1000,7 @@ def smpc_globalworker_worker_service( rabbitmq_smpc_globalworker, monetdb_smpc_globalworker ): worker_config_file = GLOBALWORKER_SMPC_CONFIG_FILE - algo_folders_env_variable_val = ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE + algo_folders_env_variable_val = EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE worker_config_filepath = path.join(TEST_ENV_CONFIG_FOLDER, worker_config_file) proc = _create_worker_service(algo_folders_env_variable_val, worker_config_filepath) yield @@ -1009,7 +1012,7 @@ def smpc_localworker1_worker_service( rabbitmq_smpc_localworker1, monetdb_smpc_localworker1 ): worker_config_file = LOCALWORKER1_SMPC_CONFIG_FILE - algo_folders_env_variable_val = ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE + algo_folders_env_variable_val = EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE worker_config_filepath = path.join(TEST_ENV_CONFIG_FOLDER, worker_config_file) proc = _create_worker_service(algo_folders_env_variable_val, worker_config_filepath) yield @@ -1021,7 +1024,7 @@ def smpc_localworker2_worker_service( rabbitmq_smpc_localworker2, monetdb_smpc_localworker2 ): worker_config_file = LOCALWORKER2_SMPC_CONFIG_FILE - algo_folders_env_variable_val = ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE + algo_folders_env_variable_val = EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE worker_config_filepath = path.join(TEST_ENV_CONFIG_FOLDER, worker_config_file) proc = _create_worker_service(algo_folders_env_variable_val, worker_config_filepath) yield @@ -1030,7 +1033,7 @@ def smpc_localworker2_worker_service( def create_localworkertmp_worker_service(): worker_config_file = LOCALWORKERTMP_CONFIG_FILE - algo_folders_env_variable_val = ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE + algo_folders_env_variable_val = EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE worker_config_filepath = path.join(TEST_ENV_CONFIG_FOLDER, worker_config_file) return _create_worker_service(algo_folders_env_variable_val, worker_config_filepath) @@ -1248,7 +1251,7 @@ def _create_controller_service( os.remove(logpath) env = os.environ.copy() - env["ALGORITHM_FOLDERS"] = ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE + env["EXAREME2_ALGORITHM_FOLDERS"] = EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE env["LOCALWORKERS_CONFIG_FILE"] = localworkers_config_filepath env["EXAREME2_CONTROLLER_CONFIG_FILE"] = controller_config_filepath env["PYTHONPATH"] = str(Path(__file__).parent.parent.parent) diff --git a/tests/standalone_tests/controller/celery/test_celery_app.py b/tests/standalone_tests/controller/celery/test_celery_app.py index 651c44867..f70141872 100644 --- a/tests/standalone_tests/controller/celery/test_celery_app.py +++ b/tests/standalone_tests/controller/celery/test_celery_app.py @@ -14,7 +14,7 @@ from exareme2.worker_communication import WorkerTableDTO from exareme2.worker_communication import WorkerUDFKeyArguments from exareme2.worker_communication import WorkerUDFPosArguments -from tests.algorithms.orphan_udfs import five_seconds_udf +from tests.algorithms.exareme2.orphan_udfs import five_seconds_udf from tests.standalone_tests.algorithms.exareme2.test_udfs import ( create_table_with_one_column_and_ten_rows, ) diff --git a/tests/standalone_tests/controller/celery/test_worker_info_tasks_priority.py b/tests/standalone_tests/controller/celery/test_worker_info_tasks_priority.py index d07583954..b1c8e1b3e 100644 --- a/tests/standalone_tests/controller/celery/test_worker_info_tasks_priority.py +++ b/tests/standalone_tests/controller/celery/test_worker_info_tasks_priority.py @@ -11,7 +11,7 @@ from exareme2.worker_communication import WorkerTableDTO from exareme2.worker_communication import WorkerUDFKeyArguments from exareme2.worker_communication import WorkerUDFPosArguments -from tests.algorithms.orphan_udfs import one_second_udf +from tests.algorithms.exareme2.orphan_udfs import one_second_udf from tests.standalone_tests.algorithms.exareme2.test_udfs import ( create_table_with_one_column_and_ten_rows, ) diff --git a/tests/standalone_tests/controller/services/exareme2/test_cleanup_after_algorithm_execution.py b/tests/standalone_tests/controller/services/exareme2/test_cleanup_after_algorithm_execution.py index f29165a7f..a52aea4c0 100644 --- a/tests/standalone_tests/controller/services/exareme2/test_cleanup_after_algorithm_execution.py +++ b/tests/standalone_tests/controller/services/exareme2/test_cleanup_after_algorithm_execution.py @@ -8,7 +8,7 @@ from freezegun import freeze_time from exareme2 import AttrDict -from exareme2 import algorithm_classes +from exareme2 import exareme2_algorithm_classes from exareme2.algorithms.exareme2.algorithm import ( InitializationParams as AlgorithmInitParams, ) @@ -35,10 +35,12 @@ WorkerLandscapeAggregator, ) from exareme2.controller.uid_generator import UIDGenerator -from tests.standalone_tests.conftest import ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE from tests.standalone_tests.conftest import ( CONTROLLER_GLOBALWORKER_LOCALWORKER1_LOCALWORKER2_LOCALWORKERTMP_ADDRESSES_FILE, ) +from tests.standalone_tests.conftest import ( + EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE, +) from tests.standalone_tests.conftest import LOCALWORKERTMP_CONFIG_FILE from tests.standalone_tests.conftest import RABBITMQ_LOCALWORKERTMP_NAME from tests.standalone_tests.conftest import RABBITMQ_LOCALWORKERTMP_PORT @@ -290,7 +292,7 @@ def algorithm(algorithm_request_dto, metadata): algorithm_parameters=algorithm_parameters, metadata=metadata, ) - return algorithm_classes[algorithm_name](initialization_params=init_params) + return exareme2_algorithm_classes[algorithm_name](initialization_params=init_params) @pytest.fixture(scope="function") @@ -710,7 +712,7 @@ def test_cleanup_after_worker_service_restart( def start_localworkertmp_worker_service(): worker_config_file = LOCALWORKERTMP_CONFIG_FILE - algo_folders_env_variable_val = ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE + algo_folders_env_variable_val = EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE_VALUE worker_config_filepath = path.join(TEST_ENV_CONFIG_FOLDER, worker_config_file) proc = _create_worker_service(algo_folders_env_variable_val, worker_config_filepath) return proc diff --git a/tests/standalone_tests/controller/services/exareme2/test_single_local_worker_algorithm_execution.py b/tests/standalone_tests/controller/services/exareme2/test_single_local_worker_algorithm_execution.py index b6c1adbbc..00018fafd 100644 --- a/tests/standalone_tests/controller/services/exareme2/test_single_local_worker_algorithm_execution.py +++ b/tests/standalone_tests/controller/services/exareme2/test_single_local_worker_algorithm_execution.py @@ -3,8 +3,8 @@ import pytest from exareme2 import AttrDict -from exareme2 import algorithm_classes -from exareme2 import algorithm_data_loaders +from exareme2 import exareme2_algorithm_classes +from exareme2 import exareme2_algorithm_data_loaders from exareme2.algorithms.exareme2.algorithm import ( InitializationParams as AlgorithmInitParams, ) @@ -247,7 +247,7 @@ def metadata_case_2(worker_landscape_aggregator, algorithm_request_case_2): def algorithm_data_loader_case_1(algorithm_request_case_1): algorithm_name = algorithm_request_case_1[0] algorithm_request_dto = algorithm_request_case_1[1] - algorithm_data_loader = algorithm_data_loaders[algorithm_name]( + algorithm_data_loader = exareme2_algorithm_data_loaders[algorithm_name]( variables=Variables( x=sanitize_request_variable(algorithm_request_dto.inputdata.x), y=sanitize_request_variable(algorithm_request_dto.inputdata.y), @@ -260,7 +260,7 @@ def algorithm_data_loader_case_1(algorithm_request_case_1): def algorithm_data_loader_case_2(algorithm_request_case_2): algorithm_name = algorithm_request_case_2[0] algorithm_request_dto = algorithm_request_case_2[1] - algorithm_data_loader = algorithm_data_loaders[algorithm_name]( + algorithm_data_loader = exareme2_algorithm_data_loaders[algorithm_name]( variables=Variables( x=sanitize_request_variable(algorithm_request_dto.inputdata.x), y=sanitize_request_variable(algorithm_request_dto.inputdata.y), @@ -290,7 +290,7 @@ def algorithm_case_1( algorithm_parameters=algorithm_parameters, datasets=algorithm_request_dto.inputdata.datasets, ) - return algorithm_classes[algorithm_name]( + return exareme2_algorithm_classes[algorithm_name]( initialization_params=init_params, data_loader=algorithm_data_loader_case_1, engine=engine_case_1, @@ -318,7 +318,7 @@ def algorithm_case_2( algorithm_parameters=algorithm_parameters, datasets=algorithm_request_dto.inputdata.datasets, ) - return algorithm_classes[algorithm_name]( + return exareme2_algorithm_classes[algorithm_name]( initialization_params=init_params, data_loader=algorithm_data_loader_case_2, engine=engine_case_2, diff --git a/tests/standalone_tests/controller/test_smpc_worker_tasks.py b/tests/standalone_tests/controller/test_smpc_worker_tasks.py index bbbb0a8bc..4d4980967 100644 --- a/tests/standalone_tests/controller/test_smpc_worker_tasks.py +++ b/tests/standalone_tests/controller/test_smpc_worker_tasks.py @@ -26,8 +26,8 @@ from exareme2.worker_communication import WorkerUDFKeyArguments from exareme2.worker_communication import WorkerUDFPosArguments from exareme2.worker_communication import WorkerUDFResults -from tests.algorithms.orphan_udfs import smpc_global_step -from tests.algorithms.orphan_udfs import smpc_local_step +from tests.algorithms.exareme2.orphan_udfs import smpc_global_step +from tests.algorithms.exareme2.orphan_udfs import smpc_local_step from tests.standalone_tests.conftest import LOCALWORKER1_SMPC_CONFIG_FILE from tests.standalone_tests.conftest import LOCALWORKER2_SMPC_CONFIG_FILE from tests.standalone_tests.conftest import SMPC_COORDINATOR_ADDRESS diff --git a/tests/standalone_tests/test_algorithms_folder.py b/tests/standalone_tests/test_algorithms_folder.py index ab64e04e8..79d1d4c85 100644 --- a/tests/standalone_tests/test_algorithms_folder.py +++ b/tests/standalone_tests/test_algorithms_folder.py @@ -9,25 +9,27 @@ @pytest.fixture def set_default_algorithms_folder(): - if exareme2.ALGORITHM_FOLDERS_ENV_VARIABLE in os.environ: - del os.environ[exareme2.ALGORITHM_FOLDERS_ENV_VARIABLE] + if exareme2.EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE in os.environ: + del os.environ[exareme2.EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE] def test_default_algorithms_folder(set_default_algorithms_folder): assert ( - exareme2.ALGORITHM_FOLDERS + exareme2.EXAREME2_ALGORITHM_FOLDERS == "./exareme2/algorithms/exareme2,./exareme2/algorithms/flower" ) @pytest.fixture def set_test_algorithms_folder(): - os.environ[exareme2.ALGORITHM_FOLDERS_ENV_VARIABLE] = TEST_ALGORITHMS_FOLDER + os.environ[ + exareme2.EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE + ] = TEST_ALGORITHMS_FOLDER yield - del os.environ[exareme2.ALGORITHM_FOLDERS_ENV_VARIABLE] + del os.environ[exareme2.EXAREME2_ALGORITHM_FOLDERS_ENV_VARIABLE] @pytest.mark.slow def test_test_algorithms_folder(set_test_algorithms_folder): importlib.reload(exareme2) - assert exareme2.ALGORITHM_FOLDERS == "./tests/algorithms" + assert exareme2.EXAREME2_ALGORITHM_FOLDERS == "./tests/algorithms"