From 39a249dce0433bc9d4a3d470930c777eaa55cb9d Mon Sep 17 00:00:00 2001 From: "K.Filippopolitis" <56073635+KFilippopolitis@users.noreply.github.com> Date: Sat, 5 Oct 2024 15:08:27 +0300 Subject: [PATCH] Dev/mip 941/validate proper parallel execution (#496) * Fixed a bug on logistic regression's client. * Added a test that validates the parallel execution of flower algorithms. --- .../flower/logistic_regression/client.py | 2 +- .../test_parallel_algorithm_requests.py | 60 +++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 tests/standalone_tests/controller/services/flower/test_parallel_algorithm_requests.py diff --git a/exareme2/algorithms/flower/logistic_regression/client.py b/exareme2/algorithms/flower/logistic_regression/client.py index f2fdf73c6..eb5649938 100644 --- a/exareme2/algorithms/flower/logistic_regression/client.py +++ b/exareme2/algorithms/flower/logistic_regression/client.py @@ -55,7 +55,7 @@ def evaluate(self, parameters, config): fl.client.start_client( server_address=os.environ["SERVER_ADDRESS"], client=client.to_client() ) - FLOWER_LOGGER.debug("Connection successful on attempt", attempts + 1) + FLOWER_LOGGER.debug(f"Connection successful on attempt: {attempts + 1}") break except Exception as e: FLOWER_LOGGER.warning( diff --git a/tests/standalone_tests/controller/services/flower/test_parallel_algorithm_requests.py b/tests/standalone_tests/controller/services/flower/test_parallel_algorithm_requests.py new file mode 100644 index 000000000..eb75eac93 --- /dev/null +++ b/tests/standalone_tests/controller/services/flower/test_parallel_algorithm_requests.py @@ -0,0 +1,60 @@ +import concurrent.futures +import json + +import pytest +import requests + +from tests.standalone_tests.conftest import ALGORITHMS_URL + + +@pytest.mark.slow +def test_parallel_requests_in_algorithm_flow( + globalworker_worker_service, + localworker1_worker_service, + load_data_localworker1, + load_test_data_globalworker, + controller_service_with_localworker1, +): + algorithm_name = "logistic_regression" + request_dict = { + "inputdata": { + "y": ["gender"], + "x": ["lefthippocampus"], + "data_model": "dementia:0.1", + "datasets": [ + "ppmi0", + "ppmi1", + "ppmi2", + "ppmi3", + ], + "validation_datasets": ["ppmi_test"], + "filters": None, + }, + "type": "flower", + } + + algorithm_url = ALGORITHMS_URL + "/" + algorithm_name + + headers = {"Content-type": "application/json", "Accept": "text/plain"} + + num_requests = 5 # Number of parallel requests + + def send_request(): + response = requests.post( + algorithm_url, + data=json.dumps(request_dict), + headers=headers, + ) + assert response.status_code == 200 + return response.json() + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [executor.submit(send_request) for _ in range(num_requests)] + results = [ + future.result() for future in concurrent.futures.as_completed(futures) + ] + + expected_result = {"accuracy": 0.63} + + for result in results: + assert result == expected_result