diff --git a/.github/workflows/task_runner_basic_e2e.yml b/.github/workflows/task_runner_basic_e2e.yml index 694e671056..69b67c3c82 100644 --- a/.github/workflows/task_runner_basic_e2e.yml +++ b/.github/workflows/task_runner_basic_e2e.yml @@ -192,8 +192,9 @@ jobs: id: run_tests run: | python -m pytest -s tests/end_to_end/test_suites/memory_logs_tests.py \ - --model_name ${{ env.MODEL_NAME }} --num_rounds ${{ env.NUM_ROUNDS }} \ - --num_collaborators ${{ env.NUM_COLLABORATORS }} --log_memory_usage + -k test_log_memory_usage_basic --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} \ + --log_memory_usage echo "Task runner memory logs test run completed" - name: Post test run diff --git a/.github/workflows/task_runner_dockerized_ws_e2e.yml b/.github/workflows/task_runner_dockerized_ws_e2e.yml index e1963a584e..a1e939b962 100644 --- a/.github/workflows/task_runner_dockerized_ws_e2e.yml +++ b/.github/workflows/task_runner_dockerized_ws_e2e.yml @@ -150,3 +150,45 @@ jobs: if: ${{ always() }} with: test_type: "tr_no_client_auth_dockerized_ws" + + test_memory_logs_dockerized_ws: + name: tr_tls_memory_logs_dockerized_ws + runs-on: ubuntu-22.04 + timeout-minutes: 15 + strategy: + matrix: + model_name: ["keras_cnn_mnist"] + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests with TLS and memory logs + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/memory_logs_tests.py \ + -k test_log_memory_usage_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} \ + --log_memory_usage + echo "Task runner memory logs test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_tls_memory_logs_dockerized_ws" diff --git a/.github/workflows/tr_docker_native.yml b/.github/workflows/tr_docker_native.yml index 71ee5f4a6c..36dfcf9107 100644 --- a/.github/workflows/tr_docker_native.yml +++ b/.github/workflows/tr_docker_native.yml @@ -5,7 +5,6 @@ on: pull_request: branches: [ develop ] types: [opened, synchronize, reopened, ready_for_review] - workflow_dispatch: permissions: contents: read @@ -29,7 +28,7 @@ jobs: - name: Create workspace image run: | - fx workspace create --prefix example_workspace --template torch_cnn_mnist + fx workspace create --prefix example_workspace --template keras_cnn_mnist cd example_workspace fx plan initialize -a localhost fx workspace dockerize --save --revision https://github.com/${GITHUB_REPOSITORY}.git@${{ github.event.pull_request.head.sha }} diff --git a/tests/end_to_end/test_suites/memory_logs_tests.py b/tests/end_to_end/test_suites/memory_logs_tests.py index 4c4b30e821..2bb41029ab 100644 --- a/tests/end_to_end/test_suites/memory_logs_tests.py +++ b/tests/end_to_end/test_suites/memory_logs_tests.py @@ -6,51 +6,64 @@ import os import json -from tests.end_to_end.utils.common_fixtures import fx_federation_tr +from tests.end_to_end.utils.common_fixtures import fx_federation_tr, fx_federation_tr_dws import tests.end_to_end.utils.constants as constants from tests.end_to_end.utils import federation_helper as fed_helper log = logging.getLogger(__name__) -# NOTE: This test file contains the test cases for logging memory usage in a federated learning setup. - -@pytest.mark.task_runner_basic @pytest.mark.log_memory_usage -def test_log_memory_usage(request, fx_federation_tr): +def test_log_memory_usage_basic(request, fx_federation_tr): """ - This module contains end-to-end tests for logging memory usage in a federated learning setup. - Test Suite: - - test_log_memory_usage: Tests the memory usage logging functionality for the torch_cnn_mnist model. - Functions: - - test_log_memory_usage(request, fx_federation): Test the memory usage logging functionality in a federated learning setup. - Parameters: + Args: - request: The pytest request object containing configuration options. - fx_federation_tr: The fixture representing the federated learning setup. - Steps: - 1. Skip the test if memory usage logging is disabled. - 2. Setup PKI for trusted communication if TLS is enabled. - 3. Start the federation and verify its completion. - 4. Verify the existence of memory usage logs for the aggregator. - 5. Verify the memory usage details for each round. - 6. Verify the existence and details of memory usage logs for each collaborator. - 7. Log the availability of memory usage details for all participants. """ # Skip test if fx_federation.log_memory_usage is False if not request.config.log_memory_usage: pytest.skip("Memory usage logging is disabled") + _log_memory_usage(request, fx_federation_tr) + + +@pytest.mark.log_memory_usage +def test_log_memory_usage_dockerized_ws(request, fx_federation_tr_dws): + """ + Test the memory usage logging functionality in a federated learning setup. + Args: + - request: The pytest request object containing configuration options. + - fx_federation_tr_dws: The fixture representing the federated learning setup with dockerized workspace. + """ + # Skip test if fx_federation.log_memory_usage is False + if not request.config.log_memory_usage: + pytest.skip("Memory usage logging is disabled") + + _log_memory_usage(request, fx_federation_tr_dws) + + +def _log_memory_usage(request, fed_obj): + """ + Test the memory usage logging functionality in a federated learning setup. + Steps: + 1. Setup PKI for trusted communication if TLS is enabled. + 2. Start the federation and verify its completion. + 3. Verify the existence of memory usage logs for the aggregator. + 4. Verify the memory usage details for each round. + 5. Verify the existence and details of memory usage logs for each collaborator. + 6. Log the availability of memory usage details for all participants. + """ # Start the federation - results = fed_helper.run_federation(fx_federation_tr) + results = fed_helper.run_federation(fed_obj) # Verify the completion of the federation run assert fed_helper.verify_federation_run_completion( - fx_federation_tr, results, num_rounds=request.config.num_rounds + fed_obj, results, test_env=request.config.test_env, num_rounds=request.config.num_rounds ), "Federation completion failed" # Verify the aggregator memory logs - aggregator_memory_usage_file = constants.AGG_MEM_USAGE_JSON.format(fx_federation_tr.workspace_path) + aggregator_memory_usage_file = constants.AGG_MEM_USAGE_JSON.format(fed_obj.workspace_path) assert os.path.exists( aggregator_memory_usage_file ), "Aggregator memory usage file is not available" @@ -64,9 +77,9 @@ def test_log_memory_usage(request, fx_federation_tr): ), "Memory usage details are not available for all rounds" # check memory usage entries for each collaborator - for collaborator in fx_federation_tr.collaborators: + for collaborator in fed_obj.collaborators: collaborator_memory_usage_file = constants.COL_MEM_USAGE_JSON.format( - fx_federation_tr.workspace_path, collaborator.name + fed_obj.workspace_path, collaborator.name ) assert os.path.exists( diff --git a/tests/end_to_end/test_suites/sample_tests.py b/tests/end_to_end/test_suites/sample_tests.py index 981b214e49..01d5fd9394 100644 --- a/tests/end_to_end/test_suites/sample_tests.py +++ b/tests/end_to_end/test_suites/sample_tests.py @@ -36,7 +36,10 @@ def test_federation_basic(request, fx_federation_tr): # Verify the completion of the federation run assert fed_helper.verify_federation_run_completion( - fx_federation_tr, results, num_rounds=request.config.num_rounds + fx_federation_tr, + results, + test_env=request.config.test_env, + num_rounds=request.config.num_rounds, ), "Federation completion failed" @@ -54,5 +57,8 @@ def test_federation_via_dockerized_workspace(request, fx_federation_tr_dws): # Verify the completion of the federation run assert fed_helper.verify_federation_run_completion( - fx_federation_tr_dws, results, request.config.num_rounds + fx_federation_tr_dws, + results, + test_env=request.config.test_env, + num_rounds=request.config.num_rounds, ), "Federation completion failed" diff --git a/tests/end_to_end/test_suites/task_runner_tests.py b/tests/end_to_end/test_suites/task_runner_tests.py index fb4200bdec..eb9c344da8 100644 --- a/tests/end_to_end/test_suites/task_runner_tests.py +++ b/tests/end_to_end/test_suites/task_runner_tests.py @@ -4,7 +4,10 @@ import pytest import logging -from tests.end_to_end.utils.common_fixtures import fx_federation_tr, fx_federation_tr_dws +from tests.end_to_end.utils.common_fixtures import ( + fx_federation_tr, + fx_federation_tr_dws, +) from tests.end_to_end.utils import federation_helper as fed_helper log = logging.getLogger(__name__) @@ -23,7 +26,10 @@ def test_federation_via_native(request, fx_federation_tr): # Verify the completion of the federation run assert fed_helper.verify_federation_run_completion( - fx_federation_tr, results, num_rounds=request.config.num_rounds + fx_federation_tr, + results, + test_env=request.config.test_env, + num_rounds=request.config.num_rounds, ), "Federation completion failed" @@ -36,7 +42,14 @@ def test_federation_via_dockerized_workspace(request, fx_federation_tr_dws): fx_federation_tr_dws (Fixture): Pytest fixture for dockerized workspace """ # Start the federation - results = fed_helper.run_federation_for_dws(fx_federation_tr_dws, use_tls=request.config.use_tls) + results = fed_helper.run_federation_for_dws( + fx_federation_tr_dws, use_tls=request.config.use_tls + ) # Verify the completion of the federation run - assert fed_helper.verify_federation_run_completion(fx_federation_tr_dws, results, request.config.num_rounds), "Federation completion failed" + assert fed_helper.verify_federation_run_completion( + fx_federation_tr_dws, + results, + test_env=request.config.test_env, + num_rounds=request.config.num_rounds, + ), "Federation completion failed" diff --git a/tests/end_to_end/utils/common_fixtures.py b/tests/end_to_end/utils/common_fixtures.py index 25aba1c579..0c96eafc91 100644 --- a/tests/end_to_end/utils/common_fixtures.py +++ b/tests/end_to_end/utils/common_fixtures.py @@ -8,12 +8,12 @@ import numpy as np import tests.end_to_end.utils.constants as constants -# from tests.end_to_end.utils.wf_helper import ( -# init_collaborator_private_attr_index, -# init_collaborator_private_attr_name, -# init_collaborate_pvt_attr_np, -# init_agg_pvt_attr_np -# ) +from tests.end_to_end.utils.wf_helper import ( + init_collaborator_private_attr_index, + init_collaborator_private_attr_name, + init_collaborate_pvt_attr_np, + init_agg_pvt_attr_np +) import tests.end_to_end.utils.federation_helper as fh import tests.end_to_end.utils.ssh_helper as ssh from tests.end_to_end.models import aggregator as agg_model, model_owner as mo_model @@ -45,12 +45,7 @@ def fx_federation_tr(request): Note: As this is a function level fixture, thus no import is required at test level. """ - test_env = fh.get_test_env_from_markers(request) - - if test_env != "task_runner_basic": - raise ValueError( - "Fixture fx_federation_tr is only supported for task_runner_basic marker" - ) + request.config.test_env = "task_runner_basic" collaborators = [] executor = concurrent.futures.ThreadPoolExecutor() @@ -143,10 +138,7 @@ def fx_federation_tr_dws(request): Note: As this is a function level fixture, thus no import is required at test level. """ - if fh.get_test_env_from_markers(request) != "task_runner_dockerized_ws": - raise ValueError( - "Fixture fx_federation_tr_dws is only supported for task_runner_dockerized_ws marker" - ) + request.config.test_env = "task_runner_dockerized_ws" collaborators = [] executor = concurrent.futures.ThreadPoolExecutor() diff --git a/tests/end_to_end/utils/federation_helper.py b/tests/end_to_end/utils/federation_helper.py index 93645b30d4..bd94be8fa9 100644 --- a/tests/end_to_end/utils/federation_helper.py +++ b/tests/end_to_end/utils/federation_helper.py @@ -7,7 +7,6 @@ import os import json import re -from functools import lru_cache import tests.end_to_end.utils.constants as constants import tests.end_to_end.utils.docker_helper as dh @@ -330,12 +329,13 @@ def install_dependencies_on_collaborators(fed_obj): raise Exception("Failed to install dependencies on one or more collaborators") -def verify_federation_run_completion(fed_obj, results, num_rounds): +def verify_federation_run_completion(fed_obj, results, test_env, num_rounds): """ Verify the completion of the process for all the participants Args: fed_obj (object): Federation fixture object results (list): List of results + test_env (str): Test environment num_rounds (int): Number of rounds Returns: list: List of response (True or False) for all the participants @@ -350,6 +350,7 @@ def verify_federation_run_completion(fed_obj, results, num_rounds): participant, num_rounds, results[i], + test_env, local_bind_path=fed_obj.local_bind_path, ) for i, participant in enumerate(fed_obj.collaborators + [fed_obj.aggregator]) @@ -365,7 +366,7 @@ def verify_federation_run_completion(fed_obj, results, num_rounds): def _verify_completion_for_participant( - participant, num_rounds, result_file, time_for_each_round=100, local_bind_path=None + participant, num_rounds, result_file, test_env, time_for_each_round=100, local_bind_path=None ): """ Verify the completion of the process for the participant @@ -382,7 +383,6 @@ def _verify_completion_for_participant( # Set timeout based on the number of rounds and time for each round timeout = 600 + (time_for_each_round * num_rounds) # in seconds - test_env = get_test_env_from_markers() # In case of docker environment, get the logs from local path which is mounted to the container if test_env == "task_runner_dockerized_ws": result_file = constants.AGG_COL_RESULT_FILE.format( @@ -433,31 +433,6 @@ def _verify_completion_for_participant( return True -@lru_cache(maxsize=50) -def get_test_env_from_markers(request=None): - """ - Get test environment based on test case markers - Keeping request argument as optional so that it can be used in other functions as well - """ - if os.getenv("TEST_ENV"): - return os.getenv("TEST_ENV") - - # Determine the test type based on the markers - markers = [m.name for m in request.node.iter_markers()] - if "task_runner_basic" in markers: - test_env = "task_runner_basic" - elif "task_runner_dockerized_ws" in markers: - test_env = "task_runner_dockerized_ws" - else: - raise ValueError( - "Invalid test environment. Provide one of the valid markers: " - "task_runner_basic and task_runner_dockerized_ws" - ) - - os.environ["TEST_ENV"] = test_env - return test_env - - def federation_env_setup_and_validate(request): """ Setup the federation environment and validate the configurations @@ -469,7 +444,7 @@ def federation_env_setup_and_validate(request): agg_domain_name = "localhost" # Determine the test type based on the markers - test_env = get_test_env_from_markers(request) + test_env = request.config.test_env # Validate the model name and create the workspace name if not request.config.model_name.upper() in constants.ModelName._member_names_: