diff --git a/.github/workflows/task_runner_e2e.yml b/.github/workflows/task_runner_e2e.yml index 567cce7ba7..4801ab6fad 100644 --- a/.github/workflows/task_runner_e2e.yml +++ b/.github/workflows/task_runner_e2e.yml @@ -230,3 +230,70 @@ jobs: with: name: tr_no_client_auth_${{ env.MODEL_NAME }}_python${{ env.PYTHON_VERSION }}_${{ github.run_id }} path: result.tar + + test_memory_logs: + name: tr_tls_memory_logs + runs-on: ubuntu-22.04 + timeout-minutes: 15 + strategy: + matrix: + # Testing non TLS scenario only for torch_cnn_mnist model and python 3.10 + # If required, this can be extended to other models and python versions + model_name: ["torch_cnn_mnist"] + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Set up Python + id: setup_python + uses: actions/setup-python@v3 + with: + python-version: ${{ env.PYTHON_VERSION }} + + - name: Install dependencies + id: install_dependencies + run: | + python -m pip install --upgrade pip + pip install . + pip install -r test-requirements.txt + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/memory_logs_tests.py \ + --model_name ${{ env.MODEL_NAME }} --num_rounds ${{ env.NUM_ROUNDS }} \ + --num_collaborators ${{ env.NUM_COLLABORATORS }} --log_memory_usage + echo "Task runner memory logs test run completed" + + - name: Print test summary + id: print_test_summary + if: ${{ always() }} + run: | + export PYTHONPATH="$PYTHONPATH:." + python tests/end_to_end/utils/summary_helper.py + echo "Test summary printed" + + - name: Tar files + id: tar_files + if: ${{ always() }} + run: tar -cvf result.tar results + + - name: Upload Artifacts + id: upload_artifacts + uses: actions/upload-artifact@v4 + if: ${{ always() }} + with: + name: tr_tls_memory_logs_${{ env.MODEL_NAME }}_python${{ env.PYTHON_VERSION }}_${{ github.run_id }} + path: result.tar diff --git a/.github/workflows/ubuntu.yml b/.github/workflows/ubuntu.yml index 70a195e70b..747ca067a4 100644 --- a/.github/workflows/ubuntu.yml +++ b/.github/workflows/ubuntu.yml @@ -13,7 +13,6 @@ env: jobs: pytest-coverage: # from pytest_coverage.yml - needs: lint runs-on: ubuntu-latest timeout-minutes: 15 @@ -35,7 +34,7 @@ jobs: coverage report cli: - needs: [lint, pytest-coverage] + needs: [pytest-coverage] runs-on: ubuntu-latest timeout-minutes: 15 diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index 041022e300..1e34aa7e92 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -15,7 +15,7 @@ from openfl.pipelines import NoCompressionPipeline, TensorCodec from openfl.protocols import base_pb2, utils from openfl.utilities import TaskResultKey, TensorKey, change_tags -from openfl.utilities.logs import get_memory_usage, write_metric +from openfl.utilities.logs import get_memory_usage, write_memory_usage_to_file, write_metric class Aggregator: @@ -1013,6 +1013,7 @@ def _end_of_round_check(self): if self._time_to_quit(): if self.log_memory_usage: self.logger.info(f"Publish memory usage: {self.memory_details}") + write_memory_usage_to_file(self.memory_details, "aggregator_memory_usage.json") self.logger.info("Experiment Completed. Cleaning up...") else: self.logger.info("Starting round %s...", self.round_number) diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index 119efd5eee..08f19b9d94 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -13,7 +13,7 @@ from openfl.pipelines import NoCompressionPipeline, TensorCodec from openfl.protocols import utils from openfl.utilities import TensorKey -from openfl.utilities.logs import get_memory_usage +from openfl.utilities.logs import get_memory_usage, write_memory_usage_to_file class DevicePolicy(Enum): @@ -181,6 +181,9 @@ def run(self): memory_details.append(memory_detail) if self.log_memory_usage: self.logger.info(f"Publish memory usage: {memory_details}") + write_memory_usage_to_file( + memory_details, f"{self.collaborator_name}_memory_usage.json" + ) self.logger.info("End of Federation reached. Exiting...") diff --git a/openfl/utilities/logs.py b/openfl/utilities/logs.py index a17f0742fc..ce64b5f7fb 100644 --- a/openfl/utilities/logs.py +++ b/openfl/utilities/logs.py @@ -4,6 +4,7 @@ """Logs utilities.""" +import json import logging import os @@ -92,3 +93,19 @@ def get_memory_usage() -> dict: }, } return memory_usage + + +def write_memory_usage_to_file(memory_usage_dict, file_name): + """ + Write memory usage details to a file. + + Args: + memory_usage_dict (dict): The memory usage details to write. + file_name (str): The name of the file to write to. + + Returns: + None + """ + file_path = os.path.join("logs", file_name) + with open(file_path, "w") as f: + json.dump(memory_usage_dict, f, indent=4) diff --git a/tests/end_to_end/conftest.py b/tests/end_to_end/conftest.py index 22eed9d101..098d1fbe61 100644 --- a/tests/end_to_end/conftest.py +++ b/tests/end_to_end/conftest.py @@ -17,7 +17,7 @@ # Define a named tuple to store the objects for model owner, aggregator, and collaborators federation_fixture = collections.namedtuple( "federation_fixture", - "model_owner, aggregator, collaborators, model_name, require_client_auth, use_tls, workspace_path, results_dir, num_rounds", + "model_owner, aggregator, collaborators, workspace_path", ) def pytest_addoption(parser): @@ -180,6 +180,24 @@ def pytest_sessionfinish(session, exitstatus): log.debug(f"Cleared .pytest_cache directory at {cache_dir}") +def pytest_configure(config): + """ + Configure the pytest plugin. + Args: + config: pytest config object + """ + # Declare some global variables + args = parse_arguments() + # Use the model name from the test case name if not provided as a command line argument + config.model_name = args.model_name + config.num_collaborators = args.num_collaborators + config.num_rounds = args.num_rounds + config.require_client_auth = not args.disable_client_auth + config.use_tls = not args.disable_tls + config.log_memory_usage = args.log_memory_usage + config.results_dir = config.getini("results_dir") + + @pytest.fixture(scope="function") def fx_federation(request, pytestconfig): """ @@ -196,38 +214,26 @@ def fx_federation(request, pytestconfig): """ collaborators = [] agg_domain_name = "localhost" - - # Parse the command line arguments - args = parse_arguments() - # Use the model name from the test case name if not provided as a command line argument - model_name = args.model_name if args.model_name else request.node.name.split("test_")[1] - results_dir = pytestconfig.getini("results_dir") - num_collaborators = args.num_collaborators - num_rounds = args.num_rounds - require_client_auth = not args.disable_client_auth - use_tls = not args.disable_tls - log_memory_usage = args.log_memory_usage - log.info( f"Running federation setup using Task Runner API on single machine with below configurations:\n" - f"\tNumber of collaborators: {num_collaborators}\n" - f"\tNumber of rounds: {num_rounds}\n" - f"\tModel name: {model_name}\n" - f"\tClient authentication: {require_client_auth}\n" - f"\tTLS: {use_tls}\n" - f"\tMemory Logs: {log_memory_usage}" + f"\tNumber of collaborators: {request.config.num_collaborators}\n" + f"\tNumber of rounds: {request.config.num_rounds}\n" + f"\tModel name: {request.config.model_name}\n" + f"\tClient authentication: {request.config.require_client_auth}\n" + f"\tTLS: {request.config.use_tls}\n" + f"\tMemory Logs: {request.config.log_memory_usage}" ) # Validate the model name and create the workspace name - if not model_name.upper() in constants.ModelName._member_names_: - raise ValueError(f"Invalid model name: {model_name}") + if not request.config.model_name.upper() in constants.ModelName._member_names_: + raise ValueError(f"Invalid model name: {request.config.model_name}") - workspace_name = f"workspace_{model_name}" + workspace_name = request.config.model_name # Create model owner object and the workspace for the model - model_owner = participants.ModelOwner(workspace_name, model_name, log_memory_usage) + model_owner = participants.ModelOwner(workspace_name, request.config.model_name, request.config.log_memory_usage) try: - workspace_path = model_owner.create_workspace(results_dir=results_dir) + workspace_path = model_owner.create_workspace(results_dir=request.config.results_dir) except Exception as e: log.error(f"Failed to create the workspace: {e}") raise e @@ -235,19 +241,19 @@ def fx_federation(request, pytestconfig): # Modify the plan try: model_owner.modify_plan( - new_rounds=num_rounds, - num_collaborators=num_collaborators, - require_client_auth=require_client_auth, - use_tls=use_tls, + new_rounds=request.config.num_rounds, + num_collaborators=request.config.num_collaborators, + require_client_auth=request.config.require_client_auth, + use_tls=request.config.use_tls, ) except Exception as e: log.error(f"Failed to modify the plan: {e}") raise e - if not use_tls: + if not request.config.use_tls: log.info("Disabling TLS for communication") try: - model_owner.register_collaborators(num_collaborators) + model_owner.register_collaborators(request.config.num_collaborators) except Exception as e: log.error(f"Failed to register the collaborators: {e}") raise e @@ -271,7 +277,7 @@ def fx_federation(request, pytestconfig): agg_domain_name=agg_domain_name, workspace_path=workspace_path ) - for i in range(num_collaborators): + for i in range(request.config.num_collaborators): collaborator = participants.Collaborator( collaborator_name=f"collaborator{i+1}", data_directory_path=i + 1, @@ -285,10 +291,5 @@ def fx_federation(request, pytestconfig): model_owner=model_owner, aggregator=aggregator, collaborators=collaborators, - model_name=model_name, - require_client_auth=require_client_auth, - use_tls=use_tls, workspace_path=workspace_path, - results_dir=results_dir, - num_rounds=num_rounds, ) diff --git a/tests/end_to_end/test_suites/memory_logs_tests.py b/tests/end_to_end/test_suites/memory_logs_tests.py new file mode 100644 index 0000000000..d763c419d0 --- /dev/null +++ b/tests/end_to_end/test_suites/memory_logs_tests.py @@ -0,0 +1,73 @@ +# Copyright 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import logging +import os +import json + +from tests.end_to_end.utils import federation_helper as fed_helper + +log = logging.getLogger(__name__) + + +@pytest.mark.log_memory_usage +def test_log_memory_usage(request, fx_federation): + """ + This module contains end-to-end tests for logging memory usage in a federated learning setup. + Test Suite: + - test_log_memory_usage: Tests the memory usage logging functionality for the torch_cnn_mnist model. + Functions: + - test_log_memory_usage(request, fx_federation): + Test the memory usage logging functionality in a federated learning setup. + Parameters: + - request: The pytest request object containing configuration options. + - fx_federation: The fixture representing the federated learning setup. + Steps: + 1. Skip the test if memory usage logging is disabled. + 2. Setup PKI for trusted communication if TLS is enabled. + 3. Start the federation and verify its completion. + 4. Verify the existence of memory usage logs for the aggregator. + 5. Verify the memory usage details for each round. + 6. Verify the existence and details of memory usage logs for each collaborator. + 7. Log the availability of memory usage details for all participants. + """ + # Skip test if fx_federation.log_memory_usage is False + if not request.config.log_memory_usage: + pytest.skip("Memory usage logging is disabled") + + # Setup PKI for trusted communication within the federation + if request.config.use_tls: + assert fed_helper.setup_pki(fx_federation), "Failed to setup PKI for trusted communication" + + # Start the federation + results = fed_helper.run_federation(fx_federation) + + # Verify the completion of the federation run + assert fed_helper.verify_federation_run_completion(fx_federation, results, \ + num_rounds=request.config.num_rounds), "Federation completion failed" + # Verify the aggregator memory logs + aggregator_memory_usage_file = os.path.join(fx_federation.workspace_path, "logs", "aggregator_memory_usage.json") + assert os.path.exists(aggregator_memory_usage_file), "Aggregator memory usage file is not available" + + # Log the aggregator memory usage details + memory_usage_dict = json.load(open(aggregator_memory_usage_file)) + + # check memory usage entries for each round + assert len(memory_usage_dict) == request.config.num_rounds, \ + "Memory usage details are not available for all rounds" + + # check memory usage entries for each collaborator + for collaborator in fx_federation.collaborators: + collaborator_memory_usage_file = os.path.join(fx_federation.workspace_path, + "logs", + f"{collaborator.collaborator_name}_memory_usage.json") + + assert os.path.exists(collaborator_memory_usage_file), f"Memory usage file for collaborator {collaborator.collaborator_name} is not available" + + memory_usage_dict = json.load(open(collaborator_memory_usage_file)) + + assert len(memory_usage_dict) == request.config.num_rounds, \ + f"Memory usage details are not available for all rounds for collaborator {collaborator.collaborator_name}" + + log.info("Memory usage details are available for all participants") diff --git a/tests/end_to_end/test_suites/task_runner_tests.py b/tests/end_to_end/test_suites/task_runner_tests.py index 43946d61fb..077ed642dd 100644 --- a/tests/end_to_end/test_suites/task_runner_tests.py +++ b/tests/end_to_end/test_suites/task_runner_tests.py @@ -10,51 +10,54 @@ @pytest.mark.torch_cnn_mnist -def test_torch_cnn_mnist(fx_federation): +def test_torch_cnn_mnist(request, fx_federation): """ Test for torch_cnn_mnist model. """ log.info("Testing torch_cnn_mnist model") # Setup PKI for trusted communication within the federation - if fx_federation.use_tls: + if request.config.use_tls: assert fed_helper.setup_pki(fx_federation), "Failed to setup PKI for trusted communication" # Start the federation results = fed_helper.run_federation(fx_federation) # Verify the completion of the federation run - assert fed_helper.verify_federation_run_completion(fx_federation, results), "Federation completion failed" + assert fed_helper.verify_federation_run_completion(fx_federation, results, + num_rounds=request.config.num_rounds), "Federation completion failed" @pytest.mark.keras_cnn_mnist -def test_keras_cnn_mnist(fx_federation): +def test_keras_cnn_mnist(request, fx_federation): log.info("Testing keras_cnn_mnist model") # Setup PKI for trusted communication within the federation - if fx_federation.use_tls: + if request.config.use_tls: assert fed_helper.setup_pki(fx_federation), "Failed to setup PKI for trusted communication" # Start the federation results = fed_helper.run_federation(fx_federation) # Verify the completion of the federation run - assert fed_helper.verify_federation_run_completion(fx_federation, results), "Federation completion failed" + assert fed_helper.verify_federation_run_completion(fx_federation, results, + num_rounds=request.config.num_rounds), "Federation completion failed" @pytest.mark.torch_cnn_histology -def test_torch_cnn_histology(fx_federation): +def test_torch_cnn_histology(request, fx_federation): """ Test for torch_cnn_histology model """ log.info("Testing torch_cnn_histology model") # Setup PKI for trusted communication within the federation - if fx_federation.use_tls: + if request.config.use_tls: assert fed_helper.setup_pki(fx_federation), "Failed to setup PKI for trusted communication" # Start the federation results = fed_helper.run_federation(fx_federation) # Verify the completion of the federation run - assert fed_helper.verify_federation_run_completion(fx_federation, results), "Federation completion failed" + assert fed_helper.verify_federation_run_completion(fx_federation, results, + num_rounds=request.config.num_rounds), "Federation completion failed" diff --git a/tests/end_to_end/utils/federation_helper.py b/tests/end_to_end/utils/federation_helper.py index d87667077e..dc39323fa9 100644 --- a/tests/end_to_end/utils/federation_helper.py +++ b/tests/end_to_end/utils/federation_helper.py @@ -4,6 +4,8 @@ import time import concurrent.futures import logging +import json +import re from tests.end_to_end.utils.constants import SUCCESS_MARKER @@ -68,12 +70,13 @@ def run_federation(fed_obj): return results -def verify_federation_run_completion(fed_obj, results): +def verify_federation_run_completion(fed_obj, results, num_rounds): """ Verify the completion of the process for all the participants Args: fed_obj (object): Federation fixture object results (list): List of results + num_rounds (int): Number of rounds Returns: list: List of response (True or False) for all the participants """ @@ -85,7 +88,7 @@ def verify_federation_run_completion(fed_obj, results): executor.submit( _verify_completion_for_participant, participant, - fed_obj.num_rounds, + num_rounds, results[i] ) for i, participant in enumerate(fed_obj.collaborators + [fed_obj.aggregator]) @@ -133,3 +136,53 @@ def _verify_completion_for_participant(participant, num_rounds, result_file, tim else: log.info(f"Process completed for {participant.name} in {time.time() - start_time} seconds") return True + + +def extract_memory_usage(log_file): + """ + Extracts memory usage data from a log file. + This function reads the content of the specified log file, searches for memory usage data + using a regular expression pattern, and returns the extracted data as a dictionary. + Args: + log_file (str): The path to the log file from which to extract memory usage data. + Returns: + dict: A dictionary containing the memory usage data. + Raises: + json.JSONDecodeError: If there is an error decoding the JSON data. + Exception: If memory usage data is not found in the log file. + """ + try: + with open(log_file, 'r') as file: + content = file.read() + + pattern = r"Publish memory usage: (\[.*?\])" + match = re.search(pattern, content, re.DOTALL) + + if match: + memory_usage_data = match.group(1) + memory_usage_data = re.sub(r'\S+\.py:\d+', '', memory_usage_data) + memory_usage_data = memory_usage_data.replace('\n', '').replace(' ', '') + memory_usage_data = memory_usage_data.replace("'", '"') + memory_usage_dict = json.loads(memory_usage_data) + return memory_usage_dict + else: + log.error("Memory usage data not found in the log file") + raise Exception("Memory usage data not found in the log file") + except Exception as e: + log.error(f"An error occurred while extracting memory usage: {e}") + raise e + +def write_memory_usage_to_file(memory_usage_dict, output_file): + """ + Writes memory usage data to a file. + This function writes the specified memory usage data to the specified output file. + Args: + memory_usage_dict (dict): A dictionary containing the memory usage data. + output_file (str): The path to the output file to which to write the memory usage data. + """ + try: + with open(output_file, 'w') as file: + json.dump(memory_usage_dict, file, indent=4) + except Exception as e: + log.error(f"An error occurred while writing memory usage data to file: {e}") + raise e