diff --git a/exareme2/algorithms/flower/__init__.py b/exareme2/algorithms/flower/__init__.py index e69de29bb..ddf37ecb3 100644 --- a/exareme2/algorithms/flower/__init__.py +++ b/exareme2/algorithms/flower/__init__.py @@ -0,0 +1,23 @@ +import logging +import os + +from flwr.common.logger import FLOWER_LOGGER + +for handler in FLOWER_LOGGER.handlers: + FLOWER_LOGGER.removeHandler(handler) + +FLOWER_LOGGER.setLevel(logging.DEBUG) + +request_id = os.getenv("REQUEST_ID", "NO-REQUEST_ID") +worker_role = os.getenv("WORKER_ROLE", "NO-ROLE") +worker_identifier = os.getenv("WORKER_IDENTIFIER", "NO-IDENTIFIER") + +flower_formatter = logging.Formatter( + f"%(asctime)s - %(levelname)s - FLOWER - {worker_role} - {worker_identifier} - %(module)s - %(funcName)s(%(lineno)d) - {request_id} - %(message)s" +) + +# Configure console logger +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.DEBUG) +console_handler.setFormatter(flower_formatter) +FLOWER_LOGGER.addHandler(console_handler) diff --git a/exareme2/algorithms/flower/flower_data_processing.py b/exareme2/algorithms/flower/flower_data_processing.py index db3463374..286322653 100644 --- a/exareme2/algorithms/flower/flower_data_processing.py +++ b/exareme2/algorithms/flower/flower_data_processing.py @@ -7,6 +7,7 @@ import pandas as pd import pymonetdb import requests +from flwr.common.logger import FLOWER_LOGGER from pydantic import BaseModel from sklearn import preprocessing from sklearn.impute import SimpleImputer @@ -53,7 +54,6 @@ def _fetch_data_from_db(data_model, datasets) -> pd.DataFrame: def _fetch_data_from_csv(data_model, datasets) -> pd.DataFrame: data_folder = Path(f"{os.getenv('DATA_PATH')}/{data_model.split(':')[0]}_v_0_1") - print(f"Loading data from folder: {data_folder}") dataframes = [ pd.read_csv(data_folder / f"{dataset}.csv") for dataset in datasets @@ -85,21 +85,21 @@ def preprocess_data(inputdata, full_data): def error_handling(error): error_msg = {"error": str(error)} - print( + FLOWER_LOGGER.error( f"Error will try to save error message: {error_msg}! Running: {RESULT_URL}..." ) requests.post(RESULT_URL, data=json.dumps(error_msg), headers=HEADERS) def post_result(result: dict) -> None: - print(f"Running: {RESULT_URL}...") + FLOWER_LOGGER.debug(f"Posting result at: {RESULT_URL} ...") response = requests.post(RESULT_URL, data=json.dumps(result), headers=HEADERS) if response.status_code != 200: error_handling(response.text) def get_input() -> Inputdata: - print(f"Running: {INPUT_URL}...") + FLOWER_LOGGER.debug(f"Getting inputdata from: {INPUT_URL} ...") response = requests.get(INPUT_URL) if response.status_code != 200: error_handling(response.text) @@ -109,7 +109,7 @@ def get_input() -> Inputdata: def get_enumerations(data_model: str, variable_name: str) -> list: try: - print(f"Running: {CDES_URL}...") + FLOWER_LOGGER.debug(f"Getting enumerations from: {CDES_URL} ...") response = requests.get(CDES_URL) if response.status_code != 200: error_handling(response.text) @@ -126,8 +126,4 @@ def get_enumerations(data_model: str, variable_name: str) -> list: else: raise KeyError(f"'enumerations' key not found in {variable_name}") except (requests.RequestException, KeyError, json.JSONDecodeError) as e: - error_msg = {"error": str(e)} - print( - f"Error will try to save error message: {error_msg}! Running: {RESULT_URL}..." - ) - requests.post(RESULT_URL, data=json.dumps(error_msg), headers=HEADERS) + error_handling(str(e)) diff --git a/exareme2/algorithms/flower/logistic_regression/server.py b/exareme2/algorithms/flower/logistic_regression/server.py index 4e91a5817..065eddb52 100644 --- a/exareme2/algorithms/flower/logistic_regression/server.py +++ b/exareme2/algorithms/flower/logistic_regression/server.py @@ -27,7 +27,6 @@ def evaluate(server_round, parameters, config): accuracy = model.score(X_test, y_test) if server_round == NUM_OF_ROUNDS: post_result({"accuracy": accuracy}) - print({"accuracy": accuracy}) return loss, {"accuracy": accuracy} return evaluate diff --git a/exareme2/algorithms/flower/mnist_logistic_regression/client.py b/exareme2/algorithms/flower/mnist_logistic_regression/client.py index 3e720bfec..7a34fd612 100644 --- a/exareme2/algorithms/flower/mnist_logistic_regression/client.py +++ b/exareme2/algorithms/flower/mnist_logistic_regression/client.py @@ -3,6 +3,7 @@ import flwr as fl import numpy as np +from flwr.common.logger import FLOWER_LOGGER from sklearn.linear_model import LogisticRegression from sklearn.metrics import log_loss @@ -49,14 +50,13 @@ def fit(self, parameters, config): ] return_data = (params, len(X_train), {"accuracy": accuracy}) except Exception as e: - print(f"Error during model fitting: {e}") + FLOWER_LOGGER.error(f"Error during model fitting: {e}") # On error, default to zero-initialized parameters, no training examples, and zero accuracy zero_params = [ np.zeros_like(param) for param in utils.get_model_parameters(model) ] return_data = (zero_params, 0, {"accuracy": 0.0}) - print(f"Returning from fit: {return_data}") return return_data def evaluate(self, parameters, config): diff --git a/exareme2/algorithms/flower/mnist_logistic_regression/server.py b/exareme2/algorithms/flower/mnist_logistic_regression/server.py index 2eda57067..9028f6c8d 100644 --- a/exareme2/algorithms/flower/mnist_logistic_regression/server.py +++ b/exareme2/algorithms/flower/mnist_logistic_regression/server.py @@ -29,7 +29,6 @@ def evaluate(server_round, parameters: fl.common.NDArrays, config): accuracy = model.score(X_test, y_test) if server_round == NUM_OF_ROUNDS: post_result({"accuracy": accuracy}) - print({"accuracy": accuracy}) return loss, {"accuracy": accuracy} return evaluate diff --git a/exareme2/controller/celery/tasks_handler.py b/exareme2/controller/celery/tasks_handler.py index e7ae6c104..3c4eda394 100644 --- a/exareme2/controller/celery/tasks_handler.py +++ b/exareme2/controller/celery/tasks_handler.py @@ -34,8 +34,8 @@ "get_data_model_cdes": "exareme2.worker.worker_info.worker_info_api.get_data_model_cdes", "get_data_model_attributes": "exareme2.worker.worker_info.worker_info_api.get_data_model_attributes", "healthcheck": "exareme2.worker.worker_info.worker_info_api.healthcheck", - "start_flower_client": "exareme2.worker.flower.starter.flower_api.start_flower_client", - "start_flower_server": "exareme2.worker.flower.starter.flower_api.start_flower_server", + "start_flower_client": "exareme2.worker.flower.starter.starter_api.start_flower_client", + "start_flower_server": "exareme2.worker.flower.starter.starter_api.start_flower_server", "stop_flower_server": "exareme2.worker.flower.cleanup.cleanup_api.stop_flower_server", "stop_flower_client": "exareme2.worker.flower.cleanup.cleanup_api.stop_flower_client", "garbage_collect": "exareme2.worker.flower.cleanup.cleanup_api.garbage_collect", diff --git a/exareme2/worker/flower/starter/flower_api.py b/exareme2/worker/flower/starter/starter_api.py similarity index 71% rename from exareme2/worker/flower/starter/flower_api.py rename to exareme2/worker/flower/starter/starter_api.py index 0d300eb1e..8a85ba561 100644 --- a/exareme2/worker/flower/starter/flower_api.py +++ b/exareme2/worker/flower/starter/starter_api.py @@ -1,11 +1,11 @@ from celery import shared_task -from exareme2.worker.flower.starter import flower_service +from exareme2.worker.flower.starter import starter_service @shared_task def start_flower_client(request_id: str, algorithm_name, server_address) -> int: - return flower_service.start_flower_client( + return starter_service.start_flower_client( request_id, algorithm_name, server_address ) @@ -14,6 +14,6 @@ def start_flower_client(request_id: str, algorithm_name, server_address) -> int: def start_flower_server( request_id: str, algorithm_name: str, number_of_clients: int, server_address ) -> int: - return flower_service.start_flower_server( + return starter_service.start_flower_server( request_id, algorithm_name, number_of_clients, server_address ) diff --git a/exareme2/worker/flower/starter/flower_service.py b/exareme2/worker/flower/starter/starter_service.py similarity index 87% rename from exareme2/worker/flower/starter/flower_service.py rename to exareme2/worker/flower/starter/starter_service.py index 0e848e12b..70cdbabff 100644 --- a/exareme2/worker/flower/starter/flower_service.py +++ b/exareme2/worker/flower/starter/starter_service.py @@ -12,6 +12,9 @@ def start_flower_client(request_id: str, algorithm_name, server_address) -> int: "MONETDB_USERNAME": worker_config.monetdb.local_username, "MONETDB_PASSWORD": worker_config.monetdb.local_password, "MONETDB_DB": worker_config.monetdb.database, + "REQUEST_ID": request_id, + "WORKER_ROLE": worker_config.role, + "WORKER_IDENTIFIER": worker_config.identifier, "SERVER_ADDRESS": server_address, "NUMBER_OF_CLIENTS": worker_config.monetdb.database, "CONTROLLER_IP": worker_config.controller.ip, @@ -32,6 +35,9 @@ def start_flower_server( request_id: str, algorithm_name: str, number_of_clients: int, server_address ) -> int: env_vars = { + "REQUEST_ID": request_id, + "WORKER_ROLE": worker_config.role, + "WORKER_IDENTIFIER": worker_config.identifier, "SERVER_ADDRESS": server_address, "NUMBER_OF_CLIENTS": number_of_clients, "CONTROLLER_IP": worker_config.controller.ip, diff --git a/exareme2/worker/utils/celery_app.py b/exareme2/worker/utils/celery_app.py index 80044af19..7c530b8b4 100644 --- a/exareme2/worker/utils/celery_app.py +++ b/exareme2/worker/utils/celery_app.py @@ -29,7 +29,7 @@ "exareme2.worker.exareme2.udfs.udfs_api", "exareme2.worker.exareme2.smpc.smpc_api", "exareme2.worker.exareme2.cleanup.cleanup_api", - "exareme2.worker.flower.starter.flower_api", + "exareme2.worker.flower.starter.starter_api", "exareme2.worker.flower.cleanup.cleanup_api", ], ) diff --git a/tests/standalone_tests/controller/workers_communication_helper.py b/tests/standalone_tests/controller/workers_communication_helper.py index 1aa62fbb2..296dcacf6 100644 --- a/tests/standalone_tests/controller/workers_communication_helper.py +++ b/tests/standalone_tests/controller/workers_communication_helper.py @@ -20,8 +20,8 @@ "validate_smpc_templates_match": "exareme2.worker.exareme2.smpc.smpc_api.validate_smpc_templates_match", "load_data_to_smpc_client": "exareme2.worker.exareme2.smpc.smpc_api.load_data_to_smpc_client", "get_smpc_result": "exareme2.worker.exareme2.smpc.smpc_api.get_smpc_result", - "start_flower_client": "exareme2.worker.flower.starter.flower_api.start_flower_client", - "start_flower_server": "exareme2.worker.flower.starter.flower_api.start_flower_server", + "start_flower_client": "exareme2.worker.flower.starter.starter_api.start_flower_client", + "start_flower_server": "exareme2.worker.flower.starter.starter_api.start_flower_server", "stop_flower_server": "exareme2.worker.flower.cleanup.cleanup_api.stop_flower_server", "stop_flower_client": "exareme2.worker.flower.cleanup.cleanup_api.stop_flower_client", "garbage_collect": "exareme2.worker.flower.cleanup.cleanup_api.garbage_collect",