diff --git a/.github/actions/tr_post_test_run/action.yml b/.github/actions/tr_post_test_run/action.yml new file mode 100644 index 0000000000..04444af4d9 --- /dev/null +++ b/.github/actions/tr_post_test_run/action.yml @@ -0,0 +1,36 @@ +--- +# Composite Action to run post-test functions for task runner end to end tests + +name: 'Post-Test Functions' +description: 'Run post-test functions' +inputs: + test_type: + description: 'Test type' + required: true + +runs: + using: 'composite' + steps: + - name: Print test summary + id: print_test_summary + if: ${{ always() }} + run: | + export PYTHONPATH="$PYTHONPATH:." + python tests/end_to_end/utils/summary_helper.py + echo "Test summary printed" + shell: bash + + - name: Create Tar (exclude cert and data folders) + id: tar_files + if: ${{ always() }} + run: | + tar -cvf result.tar --exclude="cert" --exclude="data" --exclude="__pycache__" $HOME/results + shell: bash + + - name: Upload Artifacts + id: upload_artifacts + uses: actions/upload-artifact@v4 + if: ${{ always() }} + with: + name: ${{ inputs.test_type }}_${{ env.MODEL_NAME }}_python${{ env.PYTHON_VERSION }}_${{ github.run_id }} + path: result.tar diff --git a/.github/actions/tr_pre_test_run/action.yml b/.github/actions/tr_pre_test_run/action.yml new file mode 100644 index 0000000000..2949a5e8cc --- /dev/null +++ b/.github/actions/tr_pre_test_run/action.yml @@ -0,0 +1,22 @@ +--- +# Composite Action to run pre-test functions for task runner end to end tests + +name: 'Pre-Test Functions' +description: 'Run pre-test functions' + +runs: + using: 'composite' + steps: + - name: Set up Python + id: setup_python + uses: actions/setup-python@v3 + with: + python-version: ${{ env.PYTHON_VERSION }} + + - name: Install dependencies + id: install_dependencies + run: | + python -m pip install --upgrade pip + pip install . + pip install -r test-requirements.txt + shell: bash diff --git a/.github/workflows/task_runner_e2e.yml b/.github/workflows/task_runner_basic_e2e.yml similarity index 53% rename from .github/workflows/task_runner_e2e.yml rename to .github/workflows/task_runner_basic_e2e.yml index 3581517c55..c75deb5eb3 100644 --- a/.github/workflows/task_runner_e2e.yml +++ b/.github/workflows/task_runner_basic_e2e.yml @@ -1,9 +1,7 @@ --- -#--------------------------------------------------------------------------- -# Workflow to run Task Runner end to end tests -# Authors - Noopur, Payal Chaurasiya -#--------------------------------------------------------------------------- -name: Task Runner E2E +# Task Runner E2E tests for bare metal approach + +name: Task_Runner_E2E # Please do not modify the name as it is used in the composite action on: schedule: @@ -55,48 +53,23 @@ jobs: submodules: "true" token: ${{ secrets.GITHUB_TOKEN }} - - name: Set up Python - id: setup_python - uses: actions/setup-python@v3 - with: - python-version: ${{ env.PYTHON_VERSION }} - - - name: Install dependencies - id: install_dependencies - run: | - python -m pip install --upgrade pip - pip install . - pip install -r test-requirements.txt + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} - name: Run Task Runner E2E tests with TLS id: run_tests run: | python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \ - -m ${{ env.MODEL_NAME }} --model_name ${{ env.MODEL_NAME }} \ + -m task_runner_basic --model_name ${{ env.MODEL_NAME }} \ --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} echo "Task runner end to end test run completed" - - name: Print test summary - id: print_test_summary - if: ${{ always() }} - run: | - export PYTHONPATH="$PYTHONPATH:." - python tests/end_to_end/utils/summary_helper.py - echo "Test summary printed" - - - name: Create Tar (exclude cert and data folders) - id: tar_files - if: ${{ always() }} - run: | - tar -cvf result.tar --exclude="cert" --exclude="data" --exclude="__pycache__" $HOME/results - - - name: Upload Artifacts - id: upload_artifacts - uses: actions/upload-artifact@v4 + - name: Post test run + uses: ./.github/actions/tr_post_test_run if: ${{ always() }} with: - name: tr_tls_${{ env.MODEL_NAME }}_python${{ env.PYTHON_VERSION }}_${{ github.run_id }} - path: result.tar + test_type: "tr_tls" test_with_non_tls: name: tr_non_tls @@ -123,48 +96,23 @@ jobs: submodules: "true" token: ${{ secrets.GITHUB_TOKEN }} - - name: Set up Python - id: setup_python - uses: actions/setup-python@v3 - with: - python-version: ${{ env.PYTHON_VERSION }} - - - name: Install dependencies - id: install_dependencies - run: | - python -m pip install --upgrade pip - pip install . - pip install -r test-requirements.txt + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} - name: Run Task Runner E2E tests without TLS id: run_tests run: | python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \ - -m ${{ env.MODEL_NAME }} --model_name ${{ env.MODEL_NAME }} \ + -m task_runner_basic --model_name ${{ env.MODEL_NAME }} \ --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_tls echo "Task runner end to end test run completed" - - name: Print test summary - id: print_test_summary - if: ${{ always() }} - run: | - export PYTHONPATH="$PYTHONPATH:." - python tests/end_to_end/utils/summary_helper.py - echo "Test summary printed" - - - name: Create Tar (exclude cert and data folders) - id: tar_files - if: ${{ always() }} - run: | - tar -cvf result.tar --exclude="cert" --exclude="data" --exclude="__pycache__" $HOME/results - - - name: Upload Artifacts - id: upload_artifacts - uses: actions/upload-artifact@v4 + - name: Post test run + uses: ./.github/actions/tr_post_test_run if: ${{ always() }} with: - name: tr_non_tls_${{ env.MODEL_NAME }}_python${{ env.PYTHON_VERSION }}_${{ github.run_id }} - path: result.tar + test_type: "tr_non_tls" test_with_no_client_auth: name: tr_no_client_auth @@ -191,48 +139,23 @@ jobs: submodules: "true" token: ${{ secrets.GITHUB_TOKEN }} - - name: Set up Python - id: setup_python - uses: actions/setup-python@v3 - with: - python-version: ${{ env.PYTHON_VERSION }} - - - name: Install dependencies - id: install_dependencies - run: | - python -m pip install --upgrade pip - pip install . - pip install -r test-requirements.txt + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} - name: Run Task Runner E2E tests without TLS id: run_tests run: | python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \ - -m ${{ env.MODEL_NAME }} --model_name ${{ env.MODEL_NAME }} \ + -m task_runner_basic --model_name ${{ env.MODEL_NAME }} \ --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_client_auth echo "Task runner end to end test run completed" - - name: Print test summary - id: print_test_summary - if: ${{ always() }} - run: | - export PYTHONPATH="$PYTHONPATH:." - python tests/end_to_end/utils/summary_helper.py - echo "Test summary printed" - - - name: Create Tar (exclude cert and data folders) - id: tar_files - if: ${{ always() }} - run: | - tar -cvf result.tar --exclude="cert" --exclude="data" --exclude="__pycache__" $HOME/results - - - name: Upload Artifacts - id: upload_artifacts - uses: actions/upload-artifact@v4 + - name: Post test run + uses: ./.github/actions/tr_post_test_run if: ${{ always() }} with: - name: tr_no_client_auth_${{ env.MODEL_NAME }}_python${{ env.PYTHON_VERSION }}_${{ github.run_id }} - path: result.tar + test_type: "tr_no_client_auth" test_memory_logs: name: tr_tls_memory_logs @@ -259,44 +182,21 @@ jobs: submodules: "true" token: ${{ secrets.GITHUB_TOKEN }} - - name: Set up Python - id: setup_python - uses: actions/setup-python@v3 - with: - python-version: ${{ env.PYTHON_VERSION }} - - - name: Install dependencies - id: install_dependencies - run: | - python -m pip install --upgrade pip - pip install . - pip install -r test-requirements.txt + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} - name: Run Task Runner E2E tests with TLS and memory logs id: run_tests run: | python -m pytest -s tests/end_to_end/test_suites/memory_logs_tests.py \ - --model_name ${{ env.MODEL_NAME }} --num_rounds ${{ env.NUM_ROUNDS }} \ - --num_collaborators ${{ env.NUM_COLLABORATORS }} --log_memory_usage + -k test_log_memory_usage_basic --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} \ + --log_memory_usage echo "Task runner memory logs test run completed" - - name: Print test summary - id: print_test_summary - if: ${{ always() }} - run: | - export PYTHONPATH="$PYTHONPATH:." - python tests/end_to_end/utils/summary_helper.py - echo "Test summary printed" - - - name: Tar files - id: tar_files - if: ${{ always() }} - run: tar -cvf result.tar --exclude="cert" --exclude="data" --exclude="__pycache__" $HOME/results - - - name: Upload Artifacts - id: upload_artifacts - uses: actions/upload-artifact@v4 + - name: Post test run + uses: ./.github/actions/tr_post_test_run if: ${{ always() }} with: - name: tr_tls_memory_logs_${{ env.MODEL_NAME }}_python${{ env.PYTHON_VERSION }}_${{ github.run_id }} - path: result.tar + test_type: "tr_tls_memory_logs" diff --git a/.github/workflows/task_runner_docker_e2e.yml b/.github/workflows/task_runner_docker_e2e.yml deleted file mode 100644 index 023f82323f..0000000000 --- a/.github/workflows/task_runner_docker_e2e.yml +++ /dev/null @@ -1,257 +0,0 @@ ---- -#--------------------------------------------------------------------------- -# Workflow to run Task Runner E2E tests via Docker -# Authors - Noopur, Payal Chaurasiya -#--------------------------------------------------------------------------- -name: Task Runner E2E via Docker - -on: - workflow_dispatch: - inputs: - num_rounds: - description: "Number of rounds to train" - required: false - default: "5" - type: string - num_collaborators: - description: "Number of collaborators" - required: false - default: "2" - type: string - -permissions: - contents: read - -# Environment variables common for all the jobs -env: - NUM_ROUNDS: ${{ inputs.num_rounds || '5' }} - NUM_COLLABORATORS: ${{ inputs.num_collaborators || '2' }} - -jobs: - test_with_tls_docker: - name: tr_tls_docker - runs-on: ubuntu-22.04 - timeout-minutes: 15 - strategy: - matrix: - # There are open issues for some of the models, so excluding them for now: - # model_name: [ "torch_cnn_mnist", "keras_cnn_mnist", "torch_cnn_histology" ] - model_name: ["keras_cnn_mnist"] - python_version: ["3.9"] - fail-fast: false # do not immediately fail if one of the combinations fail - - env: - MODEL_NAME: ${{ matrix.model_name }} - PYTHON_VERSION: ${{ matrix.python_version }} - - steps: - - name: Checkout OpenFL repository - id: checkout_openfl - uses: actions/checkout@v4.1.1 - with: - fetch-depth: 2 # needed for detecting changes - submodules: "true" - token: ${{ secrets.GITHUB_TOKEN }} - - - name: Set up Python - id: setup_python - uses: actions/setup-python@v3 - with: - python-version: ${{ env.PYTHON_VERSION }} - - - name: Install dependencies - id: install_dependencies - run: | - python -m pip install --upgrade pip - pip install . - pip install -r test-requirements.txt - - - name: Create OpenFL image - id: create_openfl_image - run: | - echo "Creating openfl image with current repo and branch. This may take a few minutes..." - cd openfl-docker - docker build . -t openfl -f Dockerfile.base \ - --build-arg OPENFL_REVISION=https://github.com/${{ github.repository }}.git@${{ github.ref_name }} - - - name: Run Task Runner E2E tests with TLS - id: run_tests - run: | - python -m pytest -s tests/end_to_end/test_suites/docker_tests.py \ - -m docker --model_name ${{ env.MODEL_NAME }} \ - --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} - echo "Task runner end to end test run completed" - - - name: Print test summary - id: print_test_summary - if: ${{ always() }} - run: | - export PYTHONPATH="$PYTHONPATH:." - python tests/end_to_end/utils/summary_helper.py - echo "Test summary printed" - - - name: Create Tar (exclude cert and data folders) - id: tar_files - if: ${{ always() }} - run: | - tar -cvf result.tar --exclude="cert" --exclude="data" --exclude="__pycache__" $HOME/results - - - name: Upload Artifacts - id: upload_artifacts - uses: actions/upload-artifact@v4 - if: ${{ always() }} - with: - name: tr_tls_docker_${{ env.MODEL_NAME }}_python${{ env.PYTHON_VERSION }}_${{ github.run_id }} - path: result.tar - - test_with_non_tls_docker: - name: tr_non_tls_docker - runs-on: ubuntu-22.04 - timeout-minutes: 15 - strategy: - matrix: - # Testing non TLS scenario only for keras_cnn_mnist model and python 3.10 - # If required, this can be extended to other models and python versions - model_name: ["keras_cnn_mnist"] - python_version: ["3.10"] - fail-fast: false # do not immediately fail if one of the combinations fail - - env: - MODEL_NAME: ${{ matrix.model_name }} - PYTHON_VERSION: ${{ matrix.python_version }} - - steps: - - name: Checkout OpenFL repository - id: checkout_openfl - uses: actions/checkout@v4.1.1 - with: - fetch-depth: 2 # needed for detecting changes - submodules: "true" - token: ${{ secrets.GITHUB_TOKEN }} - - - name: Set up Python - id: setup_python - uses: actions/setup-python@v3 - with: - python-version: ${{ env.PYTHON_VERSION }} - - - name: Install dependencies - id: install_dependencies - run: | - python -m pip install --upgrade pip - pip install . - pip install -r test-requirements.txt - - - name: Create OpenFL image - id: create_openfl_image - run: | - echo "Creating openfl image with current repo and branch. This may take a few minutes..." - cd openfl-docker - docker build . -t openfl -f Dockerfile.base \ - --build-arg OPENFL_REVISION=https://github.com/${{ github.repository }}.git@${{ github.ref_name }} - - - name: Run Task Runner E2E tests without TLS - id: run_tests - run: | - python -m pytest -s tests/end_to_end/test_suites/docker_tests.py \ - -m docker --model_name ${{ env.MODEL_NAME }} \ - --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_tls - echo "Task runner end to end test run completed" - - - name: Print test summary - id: print_test_summary - if: ${{ always() }} - run: | - export PYTHONPATH="$PYTHONPATH:." - python tests/end_to_end/utils/summary_helper.py - echo "Test summary printed" - - - name: Create Tar (exclude cert and data folders) - id: tar_files - if: ${{ always() }} - run: | - tar -cvf result.tar --exclude="cert" --exclude="data" --exclude="__pycache__" $HOME/results - - - name: Upload Artifacts - id: upload_artifacts - uses: actions/upload-artifact@v4 - if: ${{ always() }} - with: - name: tr_non_tls_docker_${{ env.MODEL_NAME }}_python${{ env.PYTHON_VERSION }}_${{ github.run_id }} - path: result.tar - - test_with_no_client_auth_docker: - name: tr_no_client_auth_docker - runs-on: ubuntu-22.04 - timeout-minutes: 15 - strategy: - matrix: - # Testing non TLS scenario only for keras_cnn_mnist model and python 3.10 - # If required, this can be extended to other models and python versions - model_name: ["keras_cnn_mnist"] - python_version: ["3.11"] - fail-fast: false # do not immediately fail if one of the combinations fail - - env: - MODEL_NAME: ${{ matrix.model_name }} - PYTHON_VERSION: ${{ matrix.python_version }} - - steps: - - name: Checkout OpenFL repository - id: checkout_openfl - uses: actions/checkout@v4.1.1 - with: - fetch-depth: 2 # needed for detecting changes - submodules: "true" - token: ${{ secrets.GITHUB_TOKEN }} - - - name: Set up Python - id: setup_python - uses: actions/setup-python@v3 - with: - python-version: ${{ env.PYTHON_VERSION }} - - - name: Install dependencies - id: install_dependencies - run: | - python -m pip install --upgrade pip - pip install . - pip install -r test-requirements.txt - - - name: Create OpenFL image - id: create_openfl_image - run: | - echo "Creating openfl image with current repo and branch. This may take a few minutes..." - cd openfl-docker - docker build . -t openfl -f Dockerfile.base \ - --build-arg OPENFL_REVISION=https://github.com/${{ github.repository }}.git@${{ github.ref_name }} - - - name: Run Task Runner E2E tests without Client Auth - id: run_tests - run: | - python -m pytest -s tests/end_to_end/test_suites/docker_tests.py \ - -m docker --model_name ${{ env.MODEL_NAME }} \ - --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_client_auth - echo "Task runner end to end test run completed" - - - name: Print test summary - id: print_test_summary - if: ${{ always() }} - run: | - export PYTHONPATH="$PYTHONPATH:." - python tests/end_to_end/utils/summary_helper.py - echo "Test summary printed" - - - name: Create Tar (exclude cert and data folders) - id: tar_files - if: ${{ always() }} - run: | - tar -cvf result.tar --exclude="cert" --exclude="data" --exclude="__pycache__" $HOME/results - - - name: Upload Artifacts - id: upload_artifacts - uses: actions/upload-artifact@v4 - if: ${{ always() }} - with: - name: tr_no_client_auth_docker_${{ env.MODEL_NAME }}_python${{ env.PYTHON_VERSION }}_${{ github.run_id }} - path: result.tar diff --git a/.github/workflows/task_runner_dockerized_ws_e2e.yml b/.github/workflows/task_runner_dockerized_ws_e2e.yml new file mode 100644 index 0000000000..f48ae32471 --- /dev/null +++ b/.github/workflows/task_runner_dockerized_ws_e2e.yml @@ -0,0 +1,192 @@ +--- +# Task Runner E2E tests for dockerized approach + +name: Task_Runner_Dockerized_E2E # Please do not modify the name as it is used in the composite action + +on: + workflow_dispatch: + inputs: + num_rounds: + description: "Number of rounds to train" + required: false + default: "5" + type: string + num_collaborators: + description: "Number of collaborators" + required: false + default: "2" + type: string + +permissions: + contents: read + +# Environment variables common for all the jobs +env: + NUM_ROUNDS: ${{ inputs.num_rounds || '5' }} + NUM_COLLABORATORS: ${{ inputs.num_collaborators || '2' }} + +jobs: + test_with_tls_dockerized_ws: + name: tr_tls_dockerized_ws + runs-on: ubuntu-22.04 + timeout-minutes: 15 + strategy: + matrix: + model_name: ["keras_cnn_mnist"] + python_version: ["3.9", "3.10", "3.11"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests with TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \ + -m task_runner_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_tls_dockerized_ws" + + test_with_non_tls_dockerized_ws: + name: tr_non_tls_dockerized_ws + runs-on: ubuntu-22.04 + timeout-minutes: 15 + strategy: + matrix: + model_name: ["keras_cnn_mnist"] + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \ + -m task_runner_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_tls + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_non_tls_dockerized_ws" + + test_with_no_client_auth_dockerized_ws: + name: tr_no_client_auth_dockerized_ws + runs-on: ubuntu-22.04 + timeout-minutes: 15 + strategy: + matrix: + model_name: ["keras_cnn_mnist"] + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests without TLS + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \ + -m task_runner_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} --disable_client_auth + echo "Task runner end to end test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_no_client_auth_dockerized_ws" + + test_memory_logs_dockerized_ws: + name: tr_tls_memory_logs_dockerized_ws + runs-on: ubuntu-22.04 + timeout-minutes: 15 + strategy: + matrix: + model_name: ["keras_cnn_mnist"] + python_version: ["3.10"] + fail-fast: false # do not immediately fail if one of the combinations fail + + env: + MODEL_NAME: ${{ matrix.model_name }} + PYTHON_VERSION: ${{ matrix.python_version }} + + steps: + - name: Checkout OpenFL repository + id: checkout_openfl + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Pre test run + uses: ./.github/actions/tr_pre_test_run + if: ${{ always() }} + + - name: Run Task Runner E2E tests with TLS and memory logs + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/memory_logs_tests.py \ + -k test_log_memory_usage_dockerized_ws --model_name ${{ env.MODEL_NAME }} \ + --num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} \ + --log_memory_usage + echo "Task runner memory logs test run completed" + + - name: Post test run + uses: ./.github/actions/tr_post_test_run + if: ${{ always() }} + with: + test_type: "tr_tls_memory_logs_dockerized_ws" diff --git a/.github/workflows/tr_docker_native.yml b/.github/workflows/tr_docker_native.yml index f97b268636..36dfcf9107 100644 --- a/.github/workflows/tr_docker_native.yml +++ b/.github/workflows/tr_docker_native.yml @@ -44,7 +44,7 @@ jobs: fx collaborator create -d 1 -n charlie --silent fx collaborator generate-cert-request -n charlie --silent fx collaborator certify --request-pkg col_charlie_to_agg_cert_request.zip --silent - + # Pack the collaborator's private key, signed cert, and data.yaml into a tarball tarfiles="plan/data.yaml agg_to_col_charlie_signed_cert.zip" for entry in cert/client/*; do diff --git a/.github/workflows/wf_functional_e2e.yml b/.github/workflows/wf_functional_e2e.yml index f5deac471a..c51b7acc03 100644 --- a/.github/workflows/wf_functional_e2e.yml +++ b/.github/workflows/wf_functional_e2e.yml @@ -1,9 +1,7 @@ --- -#--------------------------------------------------------------------------- -# Workflow to run Task Runner E2E tests via Docker -# Authors - Noopur, Payal Chaurasiya -#--------------------------------------------------------------------------- -name: Workflow Functional E2E +# Workflow functional E2E tests + +name: Workflow_Functional_E2E on: pull_request: diff --git a/.gitignore b/.gitignore index f9de4ad409..e13041af29 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,7 @@ venv/* eggs/* *.pyi .metaflow/* -results/* \ No newline at end of file +results/* +**MNIST/ +**cert/ +.history/ diff --git a/openfl-workspace/torch_cnn_histology_fedcurv/README.md b/openfl-workspace/torch_cnn_histology_fedcurv/README.md new file mode 100644 index 0000000000..de6f18e42d --- /dev/null +++ b/openfl-workspace/torch_cnn_histology_fedcurv/README.md @@ -0,0 +1,34 @@ +# Pytorch CNN Histology Dataset Training with Fedcurv aggregation +The example code in this directory is used to train a Convolutional Neural Network using the Colorectal Histology dataset. +It uses the Pytorch framework and OpenFL's TaskTunner API. +The federation aggregates intermediate models using the [Fedcurv](https://arxiv.org/pdf/1910.07796) +aggregation algorithm, which performs well (Compared to [FedAvg](https://arxiv.org/abs/2104.11375)) when the datasets are not independent and identically distributed (IID) among collaborators. + +Note that this example is similar to the one present in the `torch_cnn_histology` directory and is here to demonstrate the usage of a different aggregation algorithm using OpenFL's Taskrunner API. + +The differenece between the two examples lies both in the `PyTorchCNNWithFedCurv` class which is used to define a stateful training method which uses an existing `FedCurv` object, +and in the `plan.yaml` file in which the training task is explicitly defined with a non-default aggregation method - `FedCurvWeightedAverage`. + +## Running an example federation +The following instructions can be used to run the federation: +``` +# Copy the workspace template, create collaborators and aggregator +fx workspace create --template torch_cnn_histology_fedcurv --prefix fedcurv +cd fedcurv fx workspace certify +fx aggregator generate-cert-request +fx aggregator certify --silent +fx plan initialize + +fx collaborator create -n collaborator1 -d 1 +fx collaborator generate-cert-request -n collaborator1 +fx collaborator certify -n collaborator1 --silent + +fx collaborator create -n collaborator2 -d 2 +fx collaborator generate-cert-request -n collaborator2 +fx collaborator certify -n collaborator2 --silent + +# Run aggregator and collaborators +fx aggregator start & +fx collaborator start -n collaborator1 & +fx collaborator start -n collaborator2 +``` \ No newline at end of file diff --git a/openfl-workspace/torch_cnn_histology_fedcurv/plan/cols.yaml b/openfl-workspace/torch_cnn_histology_fedcurv/plan/cols.yaml new file mode 100644 index 0000000000..a6d1b1b922 --- /dev/null +++ b/openfl-workspace/torch_cnn_histology_fedcurv/plan/cols.yaml @@ -0,0 +1,4 @@ +# Copyright (C) 2020-2021 Intel Corporation +# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you. + +collaborators: \ No newline at end of file diff --git a/openfl-workspace/torch_cnn_histology_fedcurv/plan/data.yaml b/openfl-workspace/torch_cnn_histology_fedcurv/plan/data.yaml new file mode 100644 index 0000000000..8e59641e48 --- /dev/null +++ b/openfl-workspace/torch_cnn_histology_fedcurv/plan/data.yaml @@ -0,0 +1,5 @@ +# Copyright (C) 2020-2021 Intel Corporation +# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you. + +one,1 +two,2 diff --git a/openfl-workspace/torch_cnn_histology_fedcurv/plan/plan.yaml b/openfl-workspace/torch_cnn_histology_fedcurv/plan/plan.yaml new file mode 100644 index 0000000000..d4c4184bae --- /dev/null +++ b/openfl-workspace/torch_cnn_histology_fedcurv/plan/plan.yaml @@ -0,0 +1,48 @@ +# Copyright (C) 2020-2021 Intel Corporation +# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you. + +aggregator : + defaults : plan/defaults/aggregator.yaml + template : openfl.component.Aggregator + settings : + init_state_path : save/torch_cnn_histology_init.pbuf + best_state_path : save/torch_cnn_histology_best.pbuf + last_state_path : save/torch_cnn_histology_last.pbuf + rounds_to_train : 20 + +collaborator : + defaults : plan/defaults/collaborator.yaml + template : openfl.component.Collaborator + settings : + delta_updates : false + opt_treatment : RESET + +data_loader : + template : src.dataloader.PyTorchHistologyInMemory + settings : + collaborator_count : 2 + data_group_name : histology + batch_size : 32 + +task_runner: + defaults : plan/defaults/task_runner.yaml + template: src.taskrunner.PyTorchCNNWithFedCurv + +network: + defaults: plan/defaults/network.yaml + +tasks: + defaults: plan/defaults/tasks_torch.yaml + train: + function: train_task + aggregation_type: + template: openfl.interface.aggregation_functions.FedCurvWeightedAverage + kwargs: + metrics: + - loss + +assigner: + defaults: plan/defaults/assigner.yaml + +compression_pipeline : + defaults : plan/defaults/compression_pipeline.yaml diff --git a/openfl-workspace/torch_cnn_histology_fedcurv/requirements.txt b/openfl-workspace/torch_cnn_histology_fedcurv/requirements.txt new file mode 100644 index 0000000000..58b9f15677 --- /dev/null +++ b/openfl-workspace/torch_cnn_histology_fedcurv/requirements.txt @@ -0,0 +1,2 @@ +torch==2.3.1 +torchvision==0.18.1 diff --git a/openfl-workspace/torch_cnn_histology_fedcurv/src/__init__.py b/openfl-workspace/torch_cnn_histology_fedcurv/src/__init__.py new file mode 100644 index 0000000000..f1410b1298 --- /dev/null +++ b/openfl-workspace/torch_cnn_histology_fedcurv/src/__init__.py @@ -0,0 +1,3 @@ +# Copyright (C) 2020-2021 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""You may copy this file as the starting point of your own model.""" diff --git a/openfl-workspace/torch_cnn_histology_fedcurv/src/dataloader.py b/openfl-workspace/torch_cnn_histology_fedcurv/src/dataloader.py new file mode 100644 index 0000000000..dc7dd4e0c6 --- /dev/null +++ b/openfl-workspace/torch_cnn_histology_fedcurv/src/dataloader.py @@ -0,0 +1,180 @@ +# Copyright (C) 2020-2021 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""You may copy this file as the starting point of your own model.""" + +from collections.abc import Iterable +from logging import getLogger +from os import makedirs +from pathlib import Path +from urllib.request import urlretrieve +from zipfile import ZipFile + +from openfl.federated import PyTorchDataLoader +import numpy as np +import torch +from torch.utils.data import random_split +from torchvision.datasets import ImageFolder +from torchvision.transforms import ToTensor +from tqdm import tqdm + +from openfl.utilities import validate_file_hash + +logger = getLogger(__name__) + + +class PyTorchHistologyInMemory(PyTorchDataLoader): + """PyTorch data loader for Histology dataset.""" + + def __init__(self, data_path, batch_size, **kwargs): + """Instantiate the data object. + + Args: + data_path: The file path to the data + batch_size: The batch size of the data loader + **kwargs: Additional arguments, passed to super init + and load_mnist_shard + """ + super().__init__(batch_size, random_seed=0, **kwargs) + + _, num_classes, X_train, y_train, X_valid, y_valid = load_histology_shard( + shard_num=int(data_path), **kwargs) + + self.X_train = X_train + self.y_train = y_train + self.X_valid = X_valid + self.y_valid = y_valid + + self.num_classes = num_classes + + +class HistologyDataset(ImageFolder): + """Colorectal Histology Dataset.""" + + URL = ('https://zenodo.org/record/53169/files/Kather_' + 'texture_2016_image_tiles_5000.zip?download=1') + FILENAME = 'Kather_texture_2016_image_tiles_5000.zip' + FOLDER_NAME = 'Kather_texture_2016_image_tiles_5000' + ZIP_SHA384 = ('7d86abe1d04e68b77c055820c2a4c582a1d25d2983e38ab724e' + 'ac75affce8b7cb2cbf5ba68848dcfd9d84005d87d6790') + DEFAULT_PATH = Path.cwd().absolute() / 'data' + + def __init__(self, root: Path = DEFAULT_PATH, **kwargs) -> None: + """Initialize.""" + makedirs(root, exist_ok=True) + filepath = root / HistologyDataset.FILENAME + if not filepath.is_file(): + self.pbar = tqdm(total=None) + urlretrieve(HistologyDataset.URL, filepath, self.report_hook) # nosec + validate_file_hash(filepath, HistologyDataset.ZIP_SHA384) + with ZipFile(filepath, 'r') as f: + f.extractall(root) + + super(HistologyDataset, self).__init__(root / HistologyDataset.FOLDER_NAME, **kwargs) + + def report_hook(self, count, block_size, total_size): + """Update progressbar.""" + if self.pbar.total is None and total_size: + self.pbar.total = total_size + progress_bytes = count * block_size + self.pbar.update(progress_bytes - self.pbar.n) + + def __getitem__(self, index): + """Allow getting items by slice index.""" + if isinstance(index, Iterable): + return [super(HistologyDataset, self).__getitem__(i) for i in index] + else: + return super(HistologyDataset, self).__getitem__(index) + + +def one_hot(labels, classes): + """ + One Hot encode a vector. + + Args: + labels (list): List of labels to onehot encode + classes (int): Total number of categorical classes + + Returns: + np.array: Matrix of one-hot encoded labels + """ + return np.eye(classes)[labels] + + +def _load_raw_datashards(shard_num, collaborator_count, train_split_ratio=0.8): + """ + Load the raw data by shard. + + Returns tuples of the dataset shard divided into training and validation. + + Args: + shard_num (int): The shard number to use + collaborator_count (int): The number of collaborators in the federation + + Returns: + 2 tuples: (image, label) of the training, validation dataset + """ + dataset = HistologyDataset(transform=ToTensor()) + n_train = int(train_split_ratio * len(dataset)) + n_valid = len(dataset) - n_train + ds_train, ds_val = random_split( + dataset, lengths=[n_train, n_valid], generator=torch.manual_seed(0)) + + # create the shards + X_train, y_train = list(zip(*ds_train[shard_num::collaborator_count])) + X_train, y_train = np.stack(X_train), np.array(y_train) + + X_valid, y_valid = list(zip(*ds_val[shard_num::collaborator_count])) + X_valid, y_valid = np.stack(X_valid), np.array(y_valid) + + return (X_train, y_train), (X_valid, y_valid) + + +def load_histology_shard(shard_num, collaborator_count, + categorical=False, channels_last=False, **kwargs): + """ + Load the Histology dataset. + + Args: + shard_num (int): The shard to use from the dataset + collaborator_count (int): The number of collaborators in the federation + categorical (bool): True = convert the labels to one-hot encoded + vectors (Default = True) + channels_last (bool): True = The input images have the channels + last (Default = True) + **kwargs: Additional parameters to pass to the function + + Returns: + list: The input shape + int: The number of classes + numpy.ndarray: The training data + numpy.ndarray: The training labels + numpy.ndarray: The validation data + numpy.ndarray: The validation labels + """ + img_rows, img_cols = 150, 150 + num_classes = 8 + + (X_train, y_train), (X_valid, y_valid) = _load_raw_datashards( + shard_num, collaborator_count) + + if channels_last: + X_train = X_train.reshape(X_train.shape[0], img_rows, img_cols, 3) + X_valid = X_valid.reshape(X_valid.shape[0], img_rows, img_cols, 3) + input_shape = (img_rows, img_cols, 3) + else: + X_train = X_train.reshape(X_train.shape[0], 3, img_rows, img_cols) + X_valid = X_valid.reshape(X_valid.shape[0], 3, img_rows, img_cols) + input_shape = (3, img_rows, img_cols) + + logger.info(f'Histology > X_train Shape : {X_train.shape}') + logger.info(f'Histology > y_train Shape : {y_train.shape}') + logger.info(f'Histology > Train Samples : {X_train.shape[0]}') + logger.info(f'Histology > Valid Samples : {X_valid.shape[0]}') + + if categorical: + # convert class vectors to binary class matrices + y_train = one_hot(y_train, num_classes) + y_valid = one_hot(y_valid, num_classes) + + return input_shape, num_classes, X_train, y_train, X_valid, y_valid diff --git a/openfl-workspace/torch_cnn_histology_fedcurv/src/taskrunner.py b/openfl-workspace/torch_cnn_histology_fedcurv/src/taskrunner.py new file mode 100644 index 0000000000..4047dfddfe --- /dev/null +++ b/openfl-workspace/torch_cnn_histology_fedcurv/src/taskrunner.py @@ -0,0 +1,146 @@ +# Copyright (C) 2020-2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""You may copy this file as the starting point of your own model.""" +import numpy as np +import torch +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim +from typing import Iterator, Tuple +from openfl.utilities.fedcurv.torch.fedcurv import FedCurv + +from openfl.federated import PyTorchTaskRunner +from openfl.utilities import Metric + + +class PyTorchCNNWithFedCurv(PyTorchTaskRunner): + """ + Simple CNN for classification. + + PyTorchTaskRunner inherits from nn.module, so you can define your model + in the same way that you would for PyTorch + """ + + def __init__(self, device="cpu", **kwargs): + """Initialize. + + Args: + device: The hardware device to use for training (Default = "cpu") + **kwargs: Additional arguments to pass to the function + + """ + super().__init__(device=device, **kwargs) + + # Define the model + channel = self.data_loader.get_feature_shape()[0] # (channel, dim1, dim2) + self.conv1 = nn.Conv2d(channel, 16, kernel_size=3, stride=1, padding=1) + self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1) + self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1) + self.conv4 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1) + self.conv5 = nn.Conv2d(128 + 32, 256, kernel_size=3, stride=1, padding=1) + self.conv6 = nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1) + self.conv7 = nn.Conv2d(512 + 128 + 32, 256, kernel_size=3, stride=1, padding=1) + self.conv8 = nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1) + self.fc1 = nn.Linear(1184 * 9 * 9, 128) + self.fc2 = nn.Linear(128, 8) + + # `self.optimizer` must be set for optimizer weights to be federated + self.optimizer = optim.Adam(self.parameters(), lr=1e-3) + self._fedcurv = FedCurv(self, importance=1e7) + + # Set the loss function + self.loss_fn = F.cross_entropy + + def forward(self, x): + """Forward pass of the model. + + Args: + x: Data input to the model for the forward pass + """ + x = F.relu(self.conv1(x)) + x = F.relu(self.conv2(x)) + maxpool = F.max_pool2d(x, 2, 2) + + x = F.relu(self.conv3(maxpool)) + x = F.relu(self.conv4(x)) + concat = torch.cat([maxpool, x], dim=1) + maxpool = F.max_pool2d(concat, 2, 2) + + x = F.relu(self.conv5(maxpool)) + x = F.relu(self.conv6(x)) + concat = torch.cat([maxpool, x], dim=1) + maxpool = F.max_pool2d(concat, 2, 2) + + x = F.relu(self.conv7(maxpool)) + x = F.relu(self.conv8(x)) + concat = torch.cat([maxpool, x], dim=1) + maxpool = F.max_pool2d(concat, 2, 2) + + x = maxpool.flatten(start_dim=1) + x = F.dropout(self.fc1(x), p=0.5) + x = self.fc2(x) + return x + + def train_( + self, train_dataloader: Iterator[Tuple[np.ndarray, np.ndarray]] + ) -> Metric: + """Train single epoch. + + Override this function in order to use custom training. + + Args: + batch_generator: Train dataset batch generator. Yields (samples, targets) tuples of + size = `self.data_loader.batch_size`. + Returns: + Metric: An object containing name and np.ndarray value. + """ + losses = [] + model = self + self._fedcurv.on_train_begin(model) + + for data, target in train_dataloader: + data, target = torch.tensor(data).to(self.device), torch.tensor(target).to( + self.device + ) + self.optimizer.zero_grad() + output = self(data) + loss = self.loss_fn(output, target) + self._fedcurv.get_penalty(model) + loss.backward() + self.optimizer.step() + losses.append(loss.detach().cpu().numpy()) + break + loss = np.mean(losses) + return Metric(name=self.loss_fn.__name__, value=np.array(loss)) + + def validate_( + self, validation_dataloader: Iterator[Tuple[np.ndarray, np.ndarray]] + ) -> Metric: + """ + Perform validation on PyTorch Model + + Override this function for your own custom validation function + + Args: + validation_data_loader: Validation dataset batch generator. + Yields (samples, targets) tuples. + Returns: + Metric: An object containing name and np.ndarray value + """ + + total_samples = 0 + val_score = 0 + with torch.no_grad(): + for data, target in validation_dataloader: + samples = target.shape[0] + total_samples += samples + data, target = torch.tensor(data).to(self.device), torch.tensor( + target + ).to(self.device, dtype=torch.int64) + output = self(data) + # get the index of the max log-probability + pred = output.argmax(dim=1) + val_score += pred.eq(target).sum().cpu().numpy() + + accuracy = val_score / total_samples + return Metric(name="accuracy", value=np.array(accuracy)) diff --git a/openfl/federated/task/fl_model.py b/openfl/federated/task/fl_model.py index 70bc537324..5fc2f21c92 100644 --- a/openfl/federated/task/fl_model.py +++ b/openfl/federated/task/fl_model.py @@ -32,6 +32,8 @@ class FederatedModel(TaskRunner): pytorch). tensor_dict_split_fn_kwargs (dict): Keyword arguments for the tensor dict split function. + data_loader (FederatedDataSet): A dataset to distribute among the collaborators, + see TaskRunner for more details """ def __init__(self, build_model, optimizer=None, loss_fn=None, **kwargs): @@ -75,10 +77,17 @@ def __init__(self, build_model, optimizer=None, loss_fn=None, **kwargs): self.runner.validate = lambda *args, **kwargs: build_model.validate( self.runner, *args, **kwargs ) + if hasattr(self.model, "train_epoch"): self.runner.train_epoch = lambda *args, **kwargs: build_model.train_epoch( self.runner, *args, **kwargs ) + + # Used to hook the training function when debugging locally + if hasattr(self.model, "train_"): + self.runner.train_ = lambda *args, **kwargs: build_model.train_( + self.runner, *args, **kwargs + ) self.runner.model = self.model self.runner.optimizer = self.optimizer self.loss_fn = loss_fn diff --git a/openfl/interface/workspace.py b/openfl/interface/workspace.py index 52129a967e..d3cb1713c5 100644 --- a/openfl/interface/workspace.py +++ b/openfl/interface/workspace.py @@ -79,10 +79,10 @@ def create_temp(prefix, template): template: The template to use for creating the workspace. """ - echo("Creating Workspace Templates") - + src = template if os.path.isabs(template) else WORKSPACE / template + echo(f"Creating Workspace Templates from {src} in {prefix}") shutil.copytree( - src=WORKSPACE / template, + src=src, dst=prefix, dirs_exist_ok=True, ignore=shutil.ignore_patterns("__pycache__"), diff --git a/tests/end_to_end/README.md b/tests/end_to_end/README.md index 040378804e..191cfd0db4 100644 --- a/tests/end_to_end/README.md +++ b/tests/end_to_end/README.md @@ -49,12 +49,24 @@ Below parameters are available for modification: 4. --disable_tls - to disable TLS communication (by default it is enabled) 5. --disable_client_auth - to disable the client authentication (by default it is enabled) -For example, to run Task runner with - torch_cnn_mnist model, 3 collaborators, 5 rounds and non-TLS scenario: +For example, to run Task runner (bare metal approach) with - torch_cnn_mnist model, 3 collaborators, 5 rounds and non-TLS scenario: ```sh -python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py --num_rounds 5 --num_collaborators 3 --model_name torch_cnn_mnist --disable_tls +python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py -m task_runner_basic --num_rounds 5 --num_collaborators 3 --model_name torch_cnn_mnist --disable_tls ``` +And, to run Task runner (via dockerized workspace) with keras_cnn_mnist, 2 collaborators, 3 rounds: + +```sh +python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py -m task_runner_dockerized_ws --num_rounds 3 --num_collaborators 2 --model_name keras_cnn_mnist +``` + +### Fixture and marker mapping: + +- `fx_federation_tr` - for task_runner_basic +- `fx_federation_tr_dws` - for task_runner_dockerized_ws + + ### Output Structure ``` diff --git a/tests/end_to_end/conftest.py b/tests/end_to_end/conftest.py index 10020c5d45..6d1ad45b75 100644 --- a/tests/end_to_end/conftest.py +++ b/tests/end_to_end/conftest.py @@ -7,7 +7,7 @@ import shutil import xml.etree.ElementTree as ET import logging - +from pathlib import Path from tests.end_to_end.utils.logger import configure_logging from tests.end_to_end.utils.logger import logger as log @@ -60,7 +60,7 @@ def setup_logging(pytestconfig): tmp_results_dir = pytestconfig.getini("results_dir") log_level = pytestconfig.getini("log_level") - results_dir = os.path.join(os.getenv("HOME"), tmp_results_dir) + results_dir = os.path.join(Path().home(), tmp_results_dir) if not os.path.exists(results_dir): os.makedirs(results_dir) diff --git a/tests/end_to_end/models/aggregator.py b/tests/end_to_end/models/aggregator.py index 46dd45871a..795a15bed3 100644 --- a/tests/end_to_end/models/aggregator.py +++ b/tests/end_to_end/models/aggregator.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 import logging +import os import tests.end_to_end.utils.exceptions as ex import tests.end_to_end.utils.federation_helper as fh @@ -49,24 +50,27 @@ def generate_sign_request(self): except Exception as e: raise ex.CSRGenerationException(f"Failed to generate sign request for {self.name}: {e}") - def start(self, res_file): + def start(self, res_file, with_docker=False): """ Start the aggregator Args: res_file (str): Result file to track the logs + with_docker (bool): Flag specific to dockerized workspace scenario. Default is False. Returns: str: Path to the log file """ try: log.info(f"Starting {self.name}") + res_file = res_file if not with_docker else os.path.basename(res_file) error_msg = "Failed to start the aggregator" fh.run_command( "fx aggregator start", error_msg=error_msg, container_id=self.container_id, - workspace_path=self.workspace_path, + workspace_path=self.workspace_path if not with_docker else "", run_in_background=True, bg_file=res_file, + with_docker=with_docker ) log.info( f"Started {self.name} and tracking the logs in {res_file}." diff --git a/tests/end_to_end/models/collaborator.py b/tests/end_to_end/models/collaborator.py index 82a5408a4b..51cbfeba0a 100644 --- a/tests/end_to_end/models/collaborator.py +++ b/tests/end_to_end/models/collaborator.py @@ -84,11 +84,12 @@ def create_collaborator(self): log.error(f"{error_msg}: {e}") raise e - def import_pki(self, zip_name): + def import_pki(self, zip_name, with_docker=False): """ Import and certify the CSR for the collaborator Args: - agg_workspace_path (str): Workspace path of model owner or aggregator + zip_name (str): Zip file name + with_docker (bool): Flag specific to dockerized workspace scenario. Default is False. Returns: bool: True if successful, else False """ @@ -100,7 +101,8 @@ def import_pki(self, zip_name): cmd, error_msg=error_msg, container_id=self.container_id, - workspace_path=self.workspace_path, + workspace_path=self.workspace_path if not with_docker else "", + with_docker=with_docker, ) fh.verify_cmd_output( output, return_code, error, error_msg, @@ -112,24 +114,27 @@ def import_pki(self, zip_name): raise e return True - def start(self, res_file): + def start(self, res_file, with_docker=False): """ Start the collaborator Args: res_file (str): Result file to track the logs + with_docker (bool): Flag to run the collaborator inside a docker container Returns: str: Path to the log file """ try: log.info(f"Starting {self.collaborator_name}") + res_file = res_file if not with_docker else os.path.basename(res_file) error_msg = f"Failed to start {self.collaborator_name}" fh.run_command( f"fx collaborator start -n {self.collaborator_name}", error_msg=error_msg, container_id=self.container_id, - workspace_path=self.workspace_path, + workspace_path=self.workspace_path if not with_docker else "", run_in_background=True, bg_file=res_file, + with_docker=with_docker ) log.info( f"Started {self.name} and tracking the logs in {res_file}." diff --git a/tests/end_to_end/models/model_owner.py b/tests/end_to_end/models/model_owner.py index f0fc337b66..8e66968ea9 100644 --- a/tests/end_to_end/models/model_owner.py +++ b/tests/end_to_end/models/model_owner.py @@ -8,6 +8,7 @@ import tests.end_to_end.utils.constants as constants import tests.end_to_end.utils.exceptions as ex import tests.end_to_end.utils.federation_helper as fh +import tests.end_to_end.utils.ssh_helper as ssh log = logging.getLogger(__name__) @@ -51,14 +52,13 @@ def create_workspace(self): log.info(f"Creating workspace for model {self.model_name} at the path: {self.workspace_path}") error_msg = "Failed to create the workspace" - # Docker environment requires the path to be relative - ws_path = self.workspace_path.lstrip('/') if os.getenv("TEST_ENV") == "docker" else self.workspace_path + ws_path = self.workspace_path return_code, output, error = fh.run_command( f"fx workspace create --prefix {ws_path} --template {self.model_name}", + workspace_path="", # No workspace path required for this command error_msg=error_msg, container_id=self.container_id, - workspace_path="", # No workspace path required for this command ) fh.verify_cmd_output( output, @@ -104,9 +104,9 @@ def certify_collaborator(self, collaborator_name, zip_name): error_msg = f"Failed to sign the CSR {zip_name}" return_code, output, error = fh.run_command( cmd, + workspace_path=self.workspace_path, error_msg=error_msg, container_id=self.container_id, - workspace_path=self.workspace_path, ) fh.verify_cmd_output( @@ -122,35 +122,33 @@ def certify_collaborator(self, collaborator_name, zip_name): raise e return True - def modify_plan(self, plan_path, new_rounds=None, num_collaborators=None, disable_client_auth=False, disable_tls=False): + def modify_plan(self, param_config, plan_path): """ Modify the plan to train the model Args: + param_config (object): Config object containing various params to be modified plan_path (str): Path to the plan file - new_rounds (int): Number of rounds to train - num_collaborators (int): Number of collaborators - disable_client_auth (bool): Disable client authentication - disable_tls (bool): Disable TLS communication """ # Copy the cols.yaml file from remote machine to local machine for docker environment plan_file = os.path.join(plan_path, "plan.yaml") # Open the file and modify the entries - self.rounds_to_train = new_rounds if new_rounds else self.rounds_to_train - self.num_collaborators = num_collaborators if num_collaborators else self.num_collaborators + self.rounds_to_train = param_config.num_rounds if param_config.num_rounds else self.rounds_to_train + self.num_collaborators = param_config.num_collaborators if param_config.num_collaborators else self.num_collaborators try: with open(plan_file) as fp: data = yaml.load(fp, Loader=yaml.FullLoader) + # NOTE: If more parameters need to be modified, add them here data["aggregator"]["settings"]["rounds_to_train"] = int(self.rounds_to_train) # Memory Leak related data["aggregator"]["settings"]["log_memory_usage"] = self.log_memory_usage data["collaborator"]["settings"]["log_memory_usage"] = self.log_memory_usage data["data_loader"]["settings"]["collaborator_count"] = int(self.num_collaborators) - data["network"]["settings"]["require_client_auth"] = not disable_client_auth - data["network"]["settings"]["use_tls"] = not disable_tls + data["network"]["settings"]["require_client_auth"] = param_config.require_client_auth + data["network"]["settings"]["use_tls"] = param_config.use_tls with open(plan_file, "w+") as write_file: yaml.dump(data, write_file) @@ -172,9 +170,9 @@ def initialize_plan(self, agg_domain_name): error_msg="Failed to initialize the plan" return_code, output, error = fh.run_command( cmd, + workspace_path=self.workspace_path, error_msg=error_msg, container_id=self.container_id, - workspace_path=self.workspace_path ) fh.verify_cmd_output( output, @@ -185,7 +183,7 @@ def initialize_plan(self, agg_domain_name): ) except Exception as e: - raise ex.PlanInitializationException(f"Failed to initialize the plan: {e}") + raise ex.PlanInitializationException(f"{error_msg}: {e}") def certify_workspace(self): """ @@ -199,9 +197,9 @@ def certify_workspace(self): error_msg = "Failed to certify the workspace" return_code, output, error = fh.run_command( cmd, - error_msg="Failed to certify the workspace", - container_id=self.container_id, workspace_path=self.workspace_path, + error_msg=error_msg, + container_id=self.container_id, ) fh.verify_cmd_output( output, @@ -212,7 +210,45 @@ def certify_workspace(self): ) except Exception as e: - raise ex.WorkspaceCertificationException(f"Failed to certify the workspace: {e}") + raise ex.WorkspaceCertificationException(f"{error_msg}: {e}") + + def dockerize_workspace(self): + """ + Dockerize the workspace. It internally uses workspace name as the image name + """ + log.info("Dockerizing the workspace. It will take some time to complete..") + try: + if not os.getenv("GITHUB_REPOSITORY") or not os.getenv("GITHUB_BRANCH"): + repo, branch = ssh.get_git_repo_and_branch() + else: + repo = os.getenv("GITHUB_REPOSITORY") + branch = os.getenv("GITHUB_BRANCH") + + cmd = f"fx workspace dockerize --save --revision {repo}@{branch}" + error_msg = "Failed to dockerize the workspace" + return_code, output, error = fh.run_command( + cmd, + workspace_path=self.workspace_path, + error_msg=error_msg, + container_id=self.container_id, + ) + fh.verify_cmd_output(output, return_code, error, error_msg, "Workspace dockerized successfully") + + except Exception as e: + raise ex.WorkspaceDockerizationException(f"{error_msg}: {e}") + + def load_workspace(self, workspace_tar_name): + """ + Load the workspace + """ + log.info("Loading the workspace..") + try: + return_code, output, error = ssh.run_command(f"docker load -i {workspace_tar_name}", work_dir=self.workspace_path) + if return_code != 0: + raise Exception(f"Failed to load the workspace: {error}") + + except Exception as e: + raise ex.WorkspaceLoadException(f"Error loading workspace: {e}") def register_collaborators(self, plan_path, num_collaborators=None): """ @@ -262,9 +298,9 @@ def certify_aggregator(self, agg_domain_name): error_msg = "Failed to certify the aggregator request" return_code, output, error = fh.run_command( cmd, + workspace_path=self.workspace_path, error_msg=error_msg, container_id=self.container_id, - workspace_path=self.workspace_path, ) fh.verify_cmd_output(output, return_code, error, error_msg, "CA signed the request from aggregator") @@ -280,9 +316,9 @@ def export_workspace(self): error_msg = "Failed to export the workspace" return_code, output, error = fh.run_command( cmd, + workspace_path=self.workspace_path, error_msg=error_msg, container_id=self.container_id, - workspace_path=self.workspace_path, ) fh.verify_cmd_output(output, return_code, error, error_msg, "Workspace exported successfully") diff --git a/tests/end_to_end/pytest.ini b/tests/end_to_end/pytest.ini index 16923172c4..ed865c99c6 100644 --- a/tests/end_to_end/pytest.ini +++ b/tests/end_to_end/pytest.ini @@ -5,8 +5,8 @@ junit_family = xunit2 results_dir = results log_level = INFO markers = - torch_cnn_mnist: mark a test as a torch CNN MNIST test. - keras_cnn_mnist: mark a test as a Keras CNN MNIST test. - torch_cnn_histology: mark a test as a torch CNN histology test. + log_memory_usage: mark a test as a log memory usage test. + task_runner_basic: mark a test as a task runner basic test. + task_runner_dockerized_ws: mark a test as a task runner dockerized workspace test. asyncio_mode=auto asyncio_default_fixture_loop_scope="function" diff --git a/tests/end_to_end/test_suites/docker_tests.py b/tests/end_to_end/test_suites/docker_tests.py deleted file mode 100644 index 6ed69b976e..0000000000 --- a/tests/end_to_end/test_suites/docker_tests.py +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright 2020-2023 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import pytest -import logging - -from tests.end_to_end.utils.common_fixtures import fx_federation -from tests.end_to_end.utils import federation_helper as fed_helper - -log = logging.getLogger(__name__) - - -@pytest.mark.docker -def test_federation_via_docker(request, fx_federation): - """ - Test federation via docker. - Args: - request (Fixture): Pytest fixture - fx_federation (Fixture): Pytest fixture - """ - # Setup PKI for trusted communication within the federation - if request.config.use_tls: - assert fed_helper.setup_pki(fx_federation), "Failed to setup PKI for trusted communication" - - # Start the federation - results = fed_helper.run_federation(fx_federation) - - # Verify the completion of the federation run - assert fed_helper.verify_federation_run_completion(fx_federation, results, request.config.num_rounds), "Federation completion failed" diff --git a/tests/end_to_end/test_suites/memory_logs_tests.py b/tests/end_to_end/test_suites/memory_logs_tests.py index 1287da3409..b152cd0852 100644 --- a/tests/end_to_end/test_suites/memory_logs_tests.py +++ b/tests/end_to_end/test_suites/memory_logs_tests.py @@ -6,58 +6,73 @@ import os import json -from tests.end_to_end.utils.common_fixtures import fx_federation -from tests.end_to_end.utils import federation_helper as fed_helper +from tests.end_to_end.utils.common_fixtures import fx_federation_tr, fx_federation_tr_dws +import tests.end_to_end.utils.constants as constants +from tests.end_to_end.utils import federation_helper as fed_helper, ssh_helper as ssh log = logging.getLogger(__name__) @pytest.mark.log_memory_usage -def test_log_memory_usage(request, fx_federation): +def test_log_memory_usage_basic(request, fx_federation_tr): """ - This module contains end-to-end tests for logging memory usage in a federated learning setup. - Test Suite: - - test_log_memory_usage: Tests the memory usage logging functionality for the torch_cnn_mnist model. - Functions: - - test_log_memory_usage(request, fx_federation): Test the memory usage logging functionality in a federated learning setup. - Parameters: + Args: - request: The pytest request object containing configuration options. - - fx_federation: The fixture representing the federated learning setup. - Steps: - 1. Skip the test if memory usage logging is disabled. - 2. Setup PKI for trusted communication if TLS is enabled. - 3. Start the federation and verify its completion. - 4. Verify the existence of memory usage logs for the aggregator. - 5. Verify the memory usage details for each round. - 6. Verify the existence and details of memory usage logs for each collaborator. - 7. Log the availability of memory usage details for all participants. + - fx_federation_tr: The fixture representing the federated learning setup. + """ + if not request.config.log_memory_usage: + pytest.skip("Memory usage logging is disabled") + + _log_memory_usage(request, fx_federation_tr) + + +@pytest.mark.log_memory_usage +def test_log_memory_usage_dockerized_ws(request, fx_federation_tr_dws): + """ + Test the memory usage logging functionality in a federated learning setup. + Args: + - request: The pytest request object containing configuration options. + - fx_federation_tr_dws: The fixture representing the federated learning setup with dockerized workspace. """ - # Skip test if fx_federation.log_memory_usage is False if not request.config.log_memory_usage: pytest.skip("Memory usage logging is disabled") - # Setup PKI for trusted communication within the federation - if request.config.use_tls: - assert fed_helper.setup_pki( - fx_federation - ), "Failed to setup PKI for trusted communication" + _log_memory_usage(request, fx_federation_tr_dws) + +def _log_memory_usage(request, fed_obj): + """ + Test the memory usage logging functionality in a federated learning setup. + Steps: + 1. Setup PKI for trusted communication if TLS is enabled. + 2. Start the federation and verify its completion. + 3. Verify the existence of memory usage logs for the aggregator. + 4. Verify the memory usage details for each round. + 5. Verify the existence and details of memory usage logs for each collaborator. + 6. Log the availability of memory usage details for all participants. + """ # Start the federation - results = fed_helper.run_federation(fx_federation) + if request.config.test_env == "task_runner_basic": + results = fed_helper.run_federation(fed_obj) + else: + results = fed_helper.run_federation_for_dws( + fed_obj, use_tls=request.config.use_tls + ) # Verify the completion of the federation run assert fed_helper.verify_federation_run_completion( - fx_federation, results, num_rounds=request.config.num_rounds + fed_obj, results, test_env=request.config.test_env, num_rounds=request.config.num_rounds ), "Federation completion failed" + # Verify the aggregator memory logs - aggregator_memory_usage_file = os.path.join( - fx_federation.workspace_path, - "aggregator", - "workspace", - "logs", - "aggregator_memory_usage.json", - ) + aggregator_memory_usage_file = constants.AGG_MEM_USAGE_JSON.format(fed_obj.workspace_path) + + if request.config.test_env == "task_runner_dockerized_ws": + ssh.copy_file_from_docker( + "aggregator", f"/workspace/logs/aggregator_memory_usage.json", aggregator_memory_usage_file + ) + assert os.path.exists( aggregator_memory_usage_file ), "Aggregator memory usage file is not available" @@ -71,15 +86,14 @@ def test_log_memory_usage(request, fx_federation): ), "Memory usage details are not available for all rounds" # check memory usage entries for each collaborator - for collaborator in fx_federation.collaborators: - collaborator_memory_usage_file = os.path.join( - fx_federation.workspace_path, - collaborator.name, - "workspace", - "logs", - f"{collaborator.collaborator_name}_memory_usage.json", + for collaborator in fed_obj.collaborators: + collaborator_memory_usage_file = constants.COL_MEM_USAGE_JSON.format( + fed_obj.workspace_path, collaborator.name ) - + if request.config.test_env == "task_runner_dockerized_ws": + ssh.copy_file_from_docker( + collaborator.name, f"/workspace/logs/{collaborator.name}_memory_usage.json", collaborator_memory_usage_file + ) assert os.path.exists( collaborator_memory_usage_file ), f"Memory usage file for collaborator {collaborator.collaborator_name} is not available" diff --git a/tests/end_to_end/test_suites/sample_tests.py b/tests/end_to_end/test_suites/sample_tests.py index b5a102aa20..01d5fd9394 100644 --- a/tests/end_to_end/test_suites/sample_tests.py +++ b/tests/end_to_end/test_suites/sample_tests.py @@ -4,7 +4,10 @@ import pytest import logging -from tests.end_to_end.utils.common_fixtures import fx_federation +from tests.end_to_end.utils.common_fixtures import ( + fx_federation_tr, + fx_federation_tr_dws, +) from tests.end_to_end.utils import federation_helper as fed_helper log = logging.getLogger(__name__) @@ -13,24 +16,49 @@ # Task Runner API Test function for federation run using sample_model # 1. Create OpenFL workspace, if not present for the model and add relevant dataset and its path in plan/data.yaml # 2. Append the model name to ModelName enum in tests/end_to_end/utils/constants.py -# 3. Add the model name to tests/end_to_end/pytest.ini marker, if not present -# 4. Use fx_federation fixture in the test function - it will provide the federation object. -# 5. Fixture will contain - model_owner, aggregator, collaborators, model_name, workspace_path, results_dir -# 6. Setup PKI for trusted communication within the federation -# 7. Start the federation using aggregator and given no of collaborators. -# 8. Verify the completion of the federation run. - -@pytest.mark.sample_model_name -def test_sample_model_name(fx_federation): +# 3. a. Use fx_federation_tr fixture for task runner with bare metal or docker approach. +# 3. b. Use fx_federation_tr_dws fixture for task runner with dockerized workspace approach. +# 4. Fixture will contain - model_owner, aggregator, collaborators, workspace_path, local_bind_path +# 5. Setup PKI for trusted communication within the federation based on TLS flag. +# 6. Start the federation using aggregator and given no of collaborators. +# 7. Verify the completion of the federation run. + + +@pytest.mark.task_runner_basic +def test_federation_basic(request, fx_federation_tr): + """ + Add a proper docstring here. + """ + log.info(f"Running sample model test {fx_federation_tr}") + + # Start the federation + results = fed_helper.run_federation(fx_federation_tr) + + # Verify the completion of the federation run + assert fed_helper.verify_federation_run_completion( + fx_federation_tr, + results, + test_env=request.config.test_env, + num_rounds=request.config.num_rounds, + ), "Federation completion failed" + + +@pytest.mark.task_runner_dockerized_ws +def test_federation_via_dockerized_workspace(request, fx_federation_tr_dws): """ Add a proper docstring here. """ - log.info(f"Running sample model test {fx_federation.model_name}") - # Setup PKI for trusted communication within the federation - assert fed_helper.setup_pki(fx_federation), "Failed to setup PKI" + log.info(f"Running sample model test {fx_federation_tr_dws}") # Start the federation - results = fed_helper.run_federation(fx_federation) + results = fed_helper.run_federation( + fx_federation_tr_dws, use_tls=request.config.use_tls + ) # Verify the completion of the federation run - assert fed_helper.verify_federation_run_completion(fx_federation, results), "Federation completion failed" + assert fed_helper.verify_federation_run_completion( + fx_federation_tr_dws, + results, + test_env=request.config.test_env, + num_rounds=request.config.num_rounds, + ), "Federation completion failed" diff --git a/tests/end_to_end/test_suites/task_runner_tests.py b/tests/end_to_end/test_suites/task_runner_tests.py index dd43f74275..eb9c344da8 100644 --- a/tests/end_to_end/test_suites/task_runner_tests.py +++ b/tests/end_to_end/test_suites/task_runner_tests.py @@ -4,61 +4,52 @@ import pytest import logging -from tests.end_to_end.utils.common_fixtures import fx_federation +from tests.end_to_end.utils.common_fixtures import ( + fx_federation_tr, + fx_federation_tr_dws, +) from tests.end_to_end.utils import federation_helper as fed_helper log = logging.getLogger(__name__) -@pytest.mark.torch_cnn_mnist -def test_torch_cnn_mnist(request, fx_federation): +@pytest.mark.task_runner_basic +def test_federation_via_native(request, fx_federation_tr): """ - Test for torch_cnn_mnist model. + Test federation via native task runner. + Args: + request (Fixture): Pytest fixture + fx_federation_tr (Fixture): Pytest fixture for native task runner """ - log.info("Testing torch_cnn_mnist model") - - # Setup PKI for trusted communication within the federation - if request.config.use_tls: - assert fed_helper.setup_pki(fx_federation), "Failed to setup PKI for trusted communication" - # Start the federation - results = fed_helper.run_federation(fx_federation) + results = fed_helper.run_federation(fx_federation_tr) # Verify the completion of the federation run - assert fed_helper.verify_federation_run_completion(fx_federation, results, - num_rounds=request.config.num_rounds), "Federation completion failed" - + assert fed_helper.verify_federation_run_completion( + fx_federation_tr, + results, + test_env=request.config.test_env, + num_rounds=request.config.num_rounds, + ), "Federation completion failed" -@pytest.mark.keras_cnn_mnist -def test_keras_cnn_mnist(request, fx_federation): - log.info("Testing keras_cnn_mnist model") - - # Setup PKI for trusted communication within the federation - if request.config.use_tls: - assert fed_helper.setup_pki(fx_federation), "Failed to setup PKI for trusted communication" - - # Start the federation - results = fed_helper.run_federation(fx_federation) - # Verify the completion of the federation run - assert fed_helper.verify_federation_run_completion(fx_federation, results, - num_rounds=request.config.num_rounds), "Federation completion failed" - - -@pytest.mark.torch_cnn_histology -def test_torch_cnn_histology(request, fx_federation): +@pytest.mark.task_runner_dockerized_ws +def test_federation_via_dockerized_workspace(request, fx_federation_tr_dws): """ - Test for torch_cnn_histology model + Test federation via dockerized workspace. + Args: + request (Fixture): Pytest fixture + fx_federation_tr_dws (Fixture): Pytest fixture for dockerized workspace """ - log.info("Testing torch_cnn_histology model") - - # Setup PKI for trusted communication within the federation - if request.config.use_tls: - assert fed_helper.setup_pki(fx_federation), "Failed to setup PKI for trusted communication" - # Start the federation - results = fed_helper.run_federation(fx_federation) + results = fed_helper.run_federation_for_dws( + fx_federation_tr_dws, use_tls=request.config.use_tls + ) # Verify the completion of the federation run - assert fed_helper.verify_federation_run_completion(fx_federation, results, - num_rounds=request.config.num_rounds), "Federation completion failed" + assert fed_helper.verify_federation_run_completion( + fx_federation_tr_dws, + results, + test_env=request.config.test_env, + num_rounds=request.config.num_rounds, + ), "Federation completion failed" diff --git a/tests/end_to_end/utils/common_fixtures.py b/tests/end_to_end/utils/common_fixtures.py index 03b62263c8..951670bc69 100644 --- a/tests/end_to_end/utils/common_fixtures.py +++ b/tests/end_to_end/utils/common_fixtures.py @@ -4,24 +4,17 @@ import pytest import collections import concurrent.futures -import os import logging import numpy as np -import tests.end_to_end.utils.docker_helper as dh -from tests.end_to_end.utils.wf_helper import ( - init_collaborator_private_attr_index, - init_collaborator_private_attr_name, - init_collaborate_pvt_attr_np, - init_agg_pvt_attr_np -) +import tests.end_to_end.utils.constants as constants import tests.end_to_end.utils.federation_helper as fh +import tests.end_to_end.utils.ssh_helper as ssh from tests.end_to_end.models import aggregator as agg_model, model_owner as mo_model - - log = logging.getLogger(__name__) + # Define a named tuple to store the objects for model owner, aggregator, and collaborators federation_fixture = collections.namedtuple( "federation_fixture", @@ -34,7 +27,7 @@ ) @pytest.fixture(scope="function") -def fx_federation(request): +def fx_federation_tr(request): """ Fixture for federation. This fixture is used to create the model owner, aggregator, and collaborators. It also creates workspace. @@ -46,40 +39,35 @@ def fx_federation(request): Note: As this is a function level fixture, thus no import is required at test level. """ + request.config.test_env = "task_runner_basic" + collaborators = [] executor = concurrent.futures.ThreadPoolExecutor() - test_env, model_name, workspace_path, local_bind_path, agg_domain_name = fh.federation_env_setup_and_validate(request) - agg_workspace_path = os.path.join(workspace_path, "aggregator", "workspace") + model_name, workspace_path, local_bind_path, agg_domain_name = ( + fh.federation_env_setup_and_validate(request) + ) + + agg_workspace_path = constants.AGG_WORKSPACE_PATH.format(workspace_path) # Create model owner object and the workspace for the model # Workspace name will be same as the model name - model_owner = mo_model.ModelOwner(model_name, request.config.log_memory_usage, workspace_path=agg_workspace_path) + model_owner = mo_model.ModelOwner( + model_name, request.config.log_memory_usage, workspace_path=agg_workspace_path + ) # Create workspace for given model name fh.create_persistent_store(model_owner.name, local_bind_path) - # Start the docker container for aggregator in case of docker environment - if test_env == "docker": - container = dh.start_docker_container( - container_name="aggregator", - workspace_path=workspace_path, - local_bind_path=local_bind_path, - ) - model_owner.container_id = container.id - model_owner.create_workspace() fh.add_local_workspace_permission(local_bind_path) # Modify the plan - plan_path = os.path.join(local_bind_path, "aggregator", "workspace", "plan") - model_owner.modify_plan( - plan_path=plan_path, - new_rounds=request.config.num_rounds, - num_collaborators=request.config.num_collaborators, - disable_client_auth=not request.config.require_client_auth, - disable_tls=not request.config.use_tls, - ) + plan_path = constants.AGG_PLAN_PATH.format(local_bind_path) + model_owner.modify_plan(param_config=request.config, plan_path=plan_path) + + # Initialize the plan + model_owner.initialize_plan(agg_domain_name=agg_domain_name) # Certify the workspace in case of TLS # Register the collaborators in case of non-TLS @@ -88,16 +76,13 @@ def fx_federation(request): else: model_owner.register_collaborators(plan_path, request.config.num_collaborators) - # Initialize the plan - model_owner.initialize_plan(agg_domain_name=agg_domain_name) - # Create the objects for aggregator and collaborators # Workspace path for aggregator is uniform in case of docker or task_runner # But, for collaborators, it is different aggregator = agg_model.Aggregator( agg_domain_name=agg_domain_name, workspace_path=agg_workspace_path, - container_id=model_owner.container_id, # None in case of non-docker environment + container_id=model_owner.container_id, # None in case of non-docker environment ) # Generate the sign request and certify the aggregator in case of TLS @@ -120,6 +105,128 @@ def fx_federation(request): ] collaborators = [f.result() for f in futures] + if request.config.use_tls: + fh.setup_pki_for_collaborators(collaborators, model_owner, local_bind_path) + fh.import_pki_for_collaborators(collaborators, local_bind_path) + + # Return the federation fixture + return federation_fixture( + model_owner=model_owner, + aggregator=aggregator, + collaborators=collaborators, + workspace_path=workspace_path, + local_bind_path=local_bind_path, + ) + + +@pytest.fixture(scope="function") +def fx_federation_tr_dws(request): + """ + Fixture for federation in case of dockerized workspace. This fixture is used to create the model owner, aggregator, and collaborators. + It also creates workspace. + Assumption: OpenFL workspace is present for the model being tested. + Args: + request: pytest request object. Model name is passed as a parameter to the fixture from test cases. + Returns: + federation_fixture: Named tuple containing the objects for model owner, aggregator, and collaborators + + Note: As this is a function level fixture, thus no import is required at test level. + """ + request.config.test_env = "task_runner_dockerized_ws" + + collaborators = [] + executor = concurrent.futures.ThreadPoolExecutor() + + model_name, workspace_path, local_bind_path, agg_domain_name = ( + fh.federation_env_setup_and_validate(request) + ) + + agg_workspace_path = constants.AGG_WORKSPACE_PATH.format(workspace_path) + + # Create model owner object and the workspace for the model + # Workspace name will be same as the model name + model_owner = mo_model.ModelOwner( + model_name, request.config.log_memory_usage, workspace_path=agg_workspace_path + ) + + # Create workspace for given model name + fh.create_persistent_store(model_owner.name, local_bind_path) + + model_owner.create_workspace() + fh.add_local_workspace_permission(local_bind_path) + + # Modify the plan + plan_path = constants.AGG_PLAN_PATH.format(local_bind_path) + model_owner.modify_plan(param_config=request.config, plan_path=plan_path) + + # Initialize the plan + model_owner.initialize_plan(agg_domain_name=agg_domain_name) + + # Command 'fx workspace dockerize --save ..' will use the workspace name for image name + # which is 'workspace' in this case. + model_owner.dockerize_workspace() + image_name = "workspace" + + # Certify the workspace in case of TLS + # Register the collaborators in case of non-TLS + if request.config.use_tls: + model_owner.certify_workspace() + else: + model_owner.register_collaborators(plan_path, request.config.num_collaborators) + + # Create the objects for aggregator and collaborators + # Workspace path for aggregator is uniform in case of docker or task_runner + # But, for collaborators, it is different + aggregator = agg_model.Aggregator( + agg_domain_name=agg_domain_name, + workspace_path=agg_workspace_path, + container_id=model_owner.container_id, # None in case of non-docker environment + ) + + futures = [ + executor.submit( + fh.setup_collaborator, + count=i, + workspace_path=workspace_path, + local_bind_path=local_bind_path, + ) + for i in range(request.config.num_collaborators) + ] + collaborators = [f.result() for f in futures] + + if request.config.use_tls: + fh.setup_pki_for_collaborators(collaborators, model_owner, local_bind_path) + + # Note: In case of multiple machines setup, scp the created tar for collaborators to the other machine(s) + fh.create_tarball_for_collaborators( + collaborators, local_bind_path, use_tls=request.config.use_tls + ) + + # Generate the sign request and certify the aggregator in case of TLS + if request.config.use_tls: + aggregator.generate_sign_request() + model_owner.certify_aggregator(agg_domain_name) + + local_agg_ws_path = constants.AGG_WORKSPACE_PATH.format(local_bind_path) + + # Note: In case of multiple machines setup, scp this tar to the other machine(s) + return_code, output, error = ssh.run_command( + f"tar -cf cert_agg.tar plan cert save", work_dir=local_agg_ws_path + ) + if return_code != 0: + raise Exception(f"Failed to create tar for aggregator: {error}") + + # Note: In case of multiple machines setup, scp this workspace tar + # to the other machine(s) so that docker load can load the image. + model_owner.load_workspace(workspace_tar_name=f"{image_name}.tar") + + fh.start_docker_containers_for_dws( + participants=[aggregator] + collaborators, + workspace_path=workspace_path, + local_bind_path=local_bind_path, + image_name=image_name, + ) + # Return the federation fixture return federation_fixture( model_owner=model_owner, @@ -129,6 +236,7 @@ def fx_federation(request): local_bind_path=local_bind_path, ) + @pytest.fixture(scope="function") def fx_local_federated_workflow(request): """ @@ -144,18 +252,22 @@ def fx_local_federated_workflow(request): LocalRuntime: An instance of `LocalRuntime` configured with the aggregator, collaborators, and backend. """ - # Import is done inline because Task Runner does not support importing below penfl packages - + # Import is done inline because Task Runner does not support importing below openfl packages from openfl.experimental.workflow.interface import Aggregator, Collaborator from openfl.experimental.workflow.runtime import LocalRuntime - + from tests.end_to_end.utils.wf_helper import ( + init_collaborator_private_attr_index, + init_collaborator_private_attr_name, + init_collaborate_pvt_attr_np, + init_agg_pvt_attr_np + ) collab_callback_func = request.param[0] if hasattr(request, 'param') and request.param else None collab_value = request.param[1] if hasattr(request, 'param') and request.param else None agg_callback_func = request.param[2] if hasattr(request, 'param') and request.param else None - # Get the callback functions from the globals using string - collab_callback_func_name = globals()[collab_callback_func] if collab_callback_func else None - agg_callback_func_name = globals()[agg_callback_func] if agg_callback_func else None + # Get the callback functions from the locals using string + collab_callback_func_name = locals()[collab_callback_func] if collab_callback_func else None + agg_callback_func_name = locals()[agg_callback_func] if agg_callback_func else None collaborators_list = [] if agg_callback_func_name: @@ -203,19 +315,24 @@ def fx_local_federated_workflow_prvt_attr(request): LocalRuntime: An instance of `LocalRuntime` configured with the aggregator, collaborators, and backend. """ - # Import is done inline because Task Runner does not support importing below penfl packages - + # Import is done inline because Task Runner does not support importing below openfl packages from openfl.experimental.workflow.interface import Aggregator, Collaborator from openfl.experimental.workflow.runtime import LocalRuntime - + from tests.end_to_end.utils.wf_helper import ( + init_collaborator_private_attr_index, + init_collaborator_private_attr_name, + init_collaborate_pvt_attr_np, + init_agg_pvt_attr_np + ) collab_callback_func = request.param[0] if hasattr(request, 'param') and request.param else None collab_value = request.param[1] if hasattr(request, 'param') and request.param else None agg_callback_func = request.param[2] if hasattr(request, 'param') and request.param else None - # Get the callback functions from the globals using string - collab_callback_func_name = globals()[collab_callback_func] if collab_callback_func else None - agg_callback_func_name = globals()[agg_callback_func] if agg_callback_func else None + # Get the callback functions from the locals using string + collab_callback_func_name = locals()[collab_callback_func] if collab_callback_func else None + agg_callback_func_name = locals()[agg_callback_func] if agg_callback_func else None collaborators_list = [] + # Setup aggregator if agg_callback_func_name: aggregator = Aggregator(name="agg", diff --git a/tests/end_to_end/utils/constants.py b/tests/end_to_end/utils/constants.py index 9f603d3604..a7cbb29312 100644 --- a/tests/end_to_end/utils/constants.py +++ b/tests/end_to_end/utils/constants.py @@ -25,3 +25,16 @@ class ModelName(Enum): REMOVE_OPENFL_NW = "docker network rm" DOCKER_NETWORK_NAME = "openfl" DEFAULT_OPENFL_IMAGE = "openfl:latest" + +AGG_WORKSPACE_PATH = "{}/aggregator/workspace" # example - /tmp/my_federation/aggregator/workspace +COL_WORKSPACE_PATH = "{}/{}/workspace" # example - /tmp/my_federation/collaborator1/workspace +AGG_PLAN_PATH = "{}/aggregator/workspace/plan" # example - /tmp/my_federation/aggregator/workspace/plan +COL_PLAN_PATH = "{}/{}/workspace/plan" # example - /tmp/my_federation/collaborator1/workspace/plan + +AGG_COL_RESULT_FILE = "{0}/{1}/workspace/{1}.log" # example - /tmp/my_federation/aggregator/workspace/aggregator.log + +AGG_WORKSPACE_ZIP_NAME = "workspace.zip" + +# Memory logs related +AGG_MEM_USAGE_JSON = "{}/aggregator/workspace/logs/aggregator_memory_usage.json" # example - /tmp/my_federation/aggregator/workspace/logs/aggregator_memory_usage.json +COL_MEM_USAGE_JSON = "{0}/{1}/workspace/logs/{1}_memory_usage.json" # example - /tmp/my_federation/collaborator1/workspace/logs/collaborator1_memory_usage.json diff --git a/tests/end_to_end/utils/docker_helper.py b/tests/end_to_end/utils/docker_helper.py index 7951d5b7e2..bfb8c214cb 100644 --- a/tests/end_to_end/utils/docker_helper.py +++ b/tests/end_to_end/utils/docker_helper.py @@ -60,6 +60,10 @@ def start_docker_container( workspace_path, local_bind_path, image=constants.DEFAULT_OPENFL_IMAGE, + network=constants.DOCKER_NETWORK_NAME, + env_keyval_list=None, + security_opt=None, + mount_mapping=None, ): """ Start the docker container with provided name. @@ -68,22 +72,41 @@ def start_docker_container( workspace_path: Workspace path local_bind_path: Local bind path image: Docker image to use + network: Docker network to use (default is openfl) + env_keyval_list: List of environment variables to set. + Provide in key=val format. For example ["KERAS_HOME=/tmp"] + security_opt: Security options for the container + mount_mapping: Mapping of local path to docker path. Format ["local_path:docker_path"] Returns: container: Docker container object """ try: client = get_docker_client() - # Local bind path - local_participant_path = os.path.join(local_bind_path, container_name, "workspace") - - # Docker container bind path - docker_participant_path = f"{workspace_path}/{container_name}/workspace" + # Set Local bind path and Docker container bind path + if mount_mapping: + local_participant_path = mount_mapping[0].split(":")[0] + docker_participant_path = mount_mapping[0].split(":")[1] + else: + local_participant_path = os.path.join(local_bind_path, container_name, "workspace") + docker_participant_path = f"{workspace_path}/{container_name}/workspace" volumes = { local_participant_path: {"bind": docker_participant_path, "mode": "rw"}, } + log.debug(f"Volumes for {container_name}: {volumes}") + + environment = { + "WORKSPACE_PATH": docker_participant_path, + "NO_PROXY": "aggregator", + "no_proxy": "aggregator" + } + if env_keyval_list: + for keyval in env_keyval_list: + key, val = keyval.split("=") + environment[key] = val + log.debug(f"Environment variables for {container_name}: {environment}") # Start a container from the image container = client.containers.run( image, @@ -92,13 +115,10 @@ def start_docker_container( auto_remove=False, tty=True, name=container_name, - network="openfl", + network=network, + security_opt=security_opt, volumes=volumes, - environment={ - "WORKSPACE_PATH": docker_participant_path, - "NO_PROXY": "aggregator", - "no_proxy": "aggregator", - }, + environment=environment, use_config_proxy=False, # Do not use proxy for docker container ) log.info(f"Container for {container_name} started with ID: {container.id}") diff --git a/tests/end_to_end/utils/exceptions.py b/tests/end_to_end/utils/exceptions.py index 0a81717203..4cccce0e5f 100644 --- a/tests/end_to_end/utils/exceptions.py +++ b/tests/end_to_end/utils/exceptions.py @@ -57,6 +57,17 @@ class CollaboratorCreationException(Exception): """Exception for aggregator creation""" pass + +class WorkspaceDockerizationException(Exception): + """Exception for workspace dockerization""" + pass + + +class WorkspaceLoadException(Exception): + """Exception for workspace load""" + pass + + class ReferenceFlowException(Exception): """Exception for reference flow""" pass diff --git a/tests/end_to_end/utils/federation_helper.py b/tests/end_to_end/utils/federation_helper.py index 4a78aecc4c..efbe92e3b2 100644 --- a/tests/end_to_end/utils/federation_helper.py +++ b/tests/end_to_end/utils/federation_helper.py @@ -7,27 +7,30 @@ import os import json import re +from pathlib import Path import tests.end_to_end.utils.constants as constants import tests.end_to_end.utils.docker_helper as dh import tests.end_to_end.utils.exceptions as ex -import tests.end_to_end.utils.ssh_helper as sh +import tests.end_to_end.utils.ssh_helper as ssh from tests.end_to_end.models import collaborator as col_model log = logging.getLogger(__name__) -def setup_pki(fed_obj): +def setup_pki_for_collaborators(collaborators, model_owner, local_bind_path): """ Setup PKI for trusted communication within the federation Args: - fed_obj (object): Federation fixture object + collaborators (list): List of collaborator objects + model_owner (object): Model owner object + local_bind_path (str): Local bind path Returns: bool: True if successful, else False """ # PKI setup for aggregator is done at fixture level - local_agg_ws_path = os.path.join(fed_obj.local_bind_path, "aggregator", "workspace") + local_agg_ws_path = constants.AGG_WORKSPACE_PATH.format(local_bind_path) executor = concurrent.futures.ThreadPoolExecutor() @@ -40,10 +43,12 @@ def setup_pki(fed_obj): executor.submit( collaborator.generate_sign_request, ) - for collaborator in fed_obj.collaborators + for collaborator in collaborators ] if not all([f.result() for f in results]): - raise Exception("Failed to generate sign request for one or more collaborators") + raise Exception( + "Failed to generate sign request for one or more collaborators" + ) except Exception as e: raise e @@ -53,24 +58,28 @@ def setup_pki(fed_obj): results = [ executor.submit( copy_file_between_participants, - local_src_path=os.path.join(fed_obj.local_bind_path, collaborator.name, "workspace"), + local_src_path=constants.COL_WORKSPACE_PATH.format( + local_bind_path, collaborator.name + ), local_dest_path=local_agg_ws_path, - file_name=f"col_{collaborator.name}_to_agg_cert_request.zip" + file_name=f"col_{collaborator.name}_to_agg_cert_request.zip", ) - for collaborator in fed_obj.collaborators + for collaborator in collaborators ] if not all([f.result() for f in results]): - raise Exception("Failed to copy sign request zip from one or more collaborators to aggregator") + raise Exception( + "Failed to copy sign request zip from one or more collaborators to aggregator" + ) except Exception as e: raise e # Certify the collaborator sign requests # DO NOT run this in parallel as it causes command to fail with FileNotFoundError for a different collaborator - for collaborator in fed_obj.collaborators: + for collaborator in collaborators: try: - fed_obj.model_owner.certify_collaborator( + model_owner.certify_collaborator( collaborator_name=collaborator.name, - zip_name=f"col_{collaborator.name}_to_agg_cert_request.zip" + zip_name=f"col_{collaborator.name}_to_agg_cert_request.zip", ) except Exception as e: log.error(f"Failed to certify sign request for {collaborator.name}: {e}") @@ -82,27 +91,89 @@ def setup_pki(fed_obj): executor.submit( copy_file_between_participants, local_src_path=local_agg_ws_path, - local_dest_path=os.path.join(fed_obj.local_bind_path, collaborator.name, "workspace"), - file_name=f"agg_to_col_{collaborator.name}_signed_cert.zip" + local_dest_path=constants.COL_WORKSPACE_PATH.format( + local_bind_path, collaborator.name + ), + file_name=f"agg_to_col_{collaborator.name}_signed_cert.zip", + ) + for collaborator in collaborators + ] + if not all([f.result() for f in results]): + raise Exception( + "Failed to copy signed certificates from aggregator to one or more collaborators" + ) + except Exception as e: + raise e + + return True + + +def create_tarball_for_collaborators(collaborators, local_bind_path, use_tls): + """ + Create tarball for all the collaborators + Args: + collaborators (list): List of collaborator objects + local_bind_path (str): Local bind path + use_tls (bool): Use TLS or not (default is True) + """ + executor = concurrent.futures.ThreadPoolExecutor() + try: + + def _create_tarball(collaborator_name, local_bind_path): + local_col_ws_path = constants.COL_WORKSPACE_PATH.format( + local_bind_path, collaborator_name + ) + client_cert_entries = "" + tarfiles = f"cert_col_{collaborator_name}.tar plan/data.yaml" + # If TLS is enabled, client certificates and signed certificates are also included + if use_tls: + client_cert_entries = [ + f"cert/client/{f}" for f in os.listdir(f"{local_col_ws_path}/cert/client") if f.endswith(".key") + ] + client_certs = " ".join(client_cert_entries) if client_cert_entries else "" + tarfiles += f" agg_to_col_{collaborator_name}_signed_cert.zip {client_certs}" + + return_code, output, error = ssh.run_command( + f"tar -cf {tarfiles}", work_dir=local_col_ws_path + ) + if return_code != 0: + raise Exception( + f"Failed to create tarball for {collaborator_name}: {error}" + ) + return True + + results = [ + executor.submit( + _create_tarball, collaborator.name, local_bind_path=local_bind_path ) - for collaborator in fed_obj.collaborators + for collaborator in collaborators ] if not all([f.result() for f in results]): - raise Exception("Failed to copy signed certificates from aggregator to one or more collaborators") + raise Exception("Failed to create tarball for one or more collaborators") except Exception as e: raise e - # Import and certify the CSR for all the collaborators + return True + + +def import_pki_for_collaborators(collaborators, local_bind_path): + """ + Import and certify the CSR for the collaborators + """ + executor = concurrent.futures.ThreadPoolExecutor() + local_agg_ws_path = constants.AGG_WORKSPACE_PATH.format(local_bind_path) try: results = [ executor.submit( collaborator.import_pki, - zip_name=f"agg_to_col_{collaborator.name}_signed_cert.zip" + zip_name=f"agg_to_col_{collaborator.name}_signed_cert.zip", ) - for collaborator in fed_obj.collaborators + for collaborator in collaborators ] if not all([f.result() for f in results]): - raise Exception("Failed to import and certify the CSR for one or more collaborators") + raise Exception( + "Failed to import and certify the CSR for one or more collaborators" + ) except Exception as e: raise e @@ -113,21 +184,27 @@ def setup_pki(fed_obj): executor.submit( copy_file_between_participants, local_src_path=os.path.join(local_agg_ws_path, "plan"), - local_dest_path=os.path.join(fed_obj.local_bind_path, collaborator.name, "workspace", "plan"), + local_dest_path=constants.COL_PLAN_PATH.format( + local_bind_path, collaborator.name + ), file_name="cols.yaml", run_with_sudo=True, ) - for collaborator in fed_obj.collaborators + for collaborator in collaborators ] if not all([f.result() for f in results]): - raise Exception("Failed to copy cols.yaml file from aggregator to one or more collaborators") + raise Exception( + "Failed to copy cols.yaml file from aggregator to one or more collaborators" + ) except Exception as e: raise e return True -def copy_file_between_participants(local_src_path, local_dest_path, file_name, run_with_sudo=False): +def copy_file_between_participants( + local_src_path, local_dest_path, file_name, run_with_sudo=False +): """ Copy file between participants Args: @@ -138,43 +215,38 @@ def copy_file_between_participants(local_src_path, local_dest_path, file_name, r """ cmd = "sudo cp" if run_with_sudo else "cp" cmd += f" {local_src_path}/{file_name} {local_dest_path}" - return_code, output, error = sh.run_command(cmd) + return_code, output, error = ssh.run_command(cmd) if return_code != 0: log.error(f"Failed to copy file: {error}") raise Exception(f"Failed to copy file: {error}") - log.info(f"File {file_name} copied successfully from {local_src_path} to {local_dest_path}") + log.info( + f"File {file_name} copied successfully from {local_src_path} to {local_dest_path}" + ) return True -def run_federation(fed_obj): +def run_federation(fed_obj, install_dependencies=True, with_docker=False): """ Start the federation Args: fed_obj (object): Federation fixture object + install_dependencies (bool): Install dependencies on collaborators (default is True) + with_docker (bool): Flag specific to dockerized workspace scenario. Default is False. Returns: list: List of response files for all the participants """ executor = concurrent.futures.ThreadPoolExecutor() - # Install dependencies on collaborators - # This is a time taking process, thus doing at this stage after all verification is done - log.info("Installing dependencies on collaborators. This might take some time...") - futures = [ - executor.submit( - participant.install_dependencies - ) - for participant in fed_obj.collaborators - ] - results = [f.result() for f in futures] - log.info(f"Results from all the collaborators for installation of dependencies: {results}") - - if not all(results): - raise Exception("Failed to install dependencies on one or more collaborators") + if install_dependencies: + install_dependencies_on_collaborators(fed_obj) # As the collaborators will wait for aggregator to start, we need to start them in parallel. futures = [ executor.submit( participant.start, - os.path.join(fed_obj.workspace_path, participant.name, "workspace", f"{participant.name}.log") + constants.AGG_COL_RESULT_FILE.format( + fed_obj.workspace_path, participant.name + ), + with_docker=with_docker, ) for participant in fed_obj.collaborators + [fed_obj.aggregator] ] @@ -186,12 +258,85 @@ def run_federation(fed_obj): return results -def verify_federation_run_completion(fed_obj, results, num_rounds): +def run_federation_for_dws(fed_obj, use_tls): + """ + Start the federation + Args: + fed_obj (object): Federation fixture object + use_tls (bool): Use TLS or not (default is True) + Returns: + list: List of response files for all the participants + """ + executor = concurrent.futures.ThreadPoolExecutor() + + try: + results = [ + executor.submit( + run_command, + command=f"tar -xf /workspace/certs.tar", + workspace_path="", + error_msg=f"Failed to extract certificates for {participant.name}", + container_id=participant.container_id, + with_docker=True, + ) + for participant in [fed_obj.aggregator] + fed_obj.collaborators + ] + if not all([f.result() for f in results]): + raise Exception( + "Failed to extract certificates for one or more participants" + ) + except Exception as e: + raise e + + if use_tls: + try: + results = [ + executor.submit( + collaborator.import_pki, + zip_name=f"agg_to_col_{collaborator.name}_signed_cert.zip", + with_docker=True, + ) + for collaborator in fed_obj.collaborators + ] + if not all([f.result() for f in results]): + raise Exception( + "Failed to import and certify the CSR for one or more collaborators" + ) + except Exception as e: + raise e + + # Start federation run for all the participants + return run_federation(fed_obj, with_docker=True) + + +def install_dependencies_on_collaborators(fed_obj): + """ + Install dependencies on all the collaborators + """ + executor = concurrent.futures.ThreadPoolExecutor() + # Install dependencies on collaborators + # This is a time taking process, thus doing at this stage after all verification is done + log.info("Installing dependencies on collaborators. This might take some time...") + futures = [ + executor.submit(participant.install_dependencies) + for participant in fed_obj.collaborators + ] + results = [f.result() for f in futures] + log.info( + f"Results from all the collaborators for installation of dependencies: {results}" + ) + + if not all(results): + raise Exception("Failed to install dependencies on one or more collaborators") + + +def verify_federation_run_completion(fed_obj, results, test_env, num_rounds): """ Verify the completion of the process for all the participants Args: fed_obj (object): Federation fixture object results (list): List of results + test_env (str): Test environment num_rounds (int): Number of rounds Returns: list: List of response (True or False) for all the participants @@ -206,7 +351,8 @@ def verify_federation_run_completion(fed_obj, results, num_rounds): participant, num_rounds, results[i], - local_bind_path=fed_obj.local_bind_path + test_env, + local_bind_path=fed_obj.local_bind_path, ) for i, participant in enumerate(fed_obj.collaborators + [fed_obj.aggregator]) ] @@ -220,7 +366,9 @@ def verify_federation_run_completion(fed_obj, results, num_rounds): return all(results) -def _verify_completion_for_participant(participant, num_rounds, result_file, time_for_each_round=100, local_bind_path=None): +def _verify_completion_for_participant( + participant, num_rounds, result_file, test_env, time_for_each_round=100, local_bind_path=None +): """ Verify the completion of the process for the participant Args: @@ -232,15 +380,18 @@ def _verify_completion_for_participant(participant, num_rounds, result_file, tim Returns: bool: True if successful, else False """ - time.sleep(20) # Wait for some time before checking the log file + time.sleep(20) # Wait for some time before checking the log file # Set timeout based on the number of rounds and time for each round - timeout = 600 + ( time_for_each_round * num_rounds ) # in seconds - log.info(f"Printing the last line of the log file for {participant.name} to track the progress") + timeout = 600 + (time_for_each_round * num_rounds) # in seconds # In case of docker environment, get the logs from local path which is mounted to the container - if os.getenv("TEST_ENV") == "docker": - result_file_name = os.path.basename(result_file) - result_file = os.path.join(local_bind_path, participant.name, "workspace", result_file_name) + if test_env == "task_runner_dockerized_ws": + result_file = constants.AGG_COL_RESULT_FILE.format( + local_bind_path, participant.name + ) + ssh.copy_file_from_docker( + participant.name, f"/workspace/{participant.name}.log", result_file + ) log.info(f"Result file is: {result_file}") @@ -252,22 +403,34 @@ def _verify_completion_for_participant(participant, num_rounds, result_file, tim while ( constants.SUCCESS_MARKER not in content and time.time() - start_time < timeout ): - with open(result_file, 'r') as file: + with open(result_file, "r") as file: lines = [line.strip() for line in file.readlines()] content = list(filter(str.rstrip, lines))[-1:] # Print last line of the log file on screen to track the progress - log.info(f"{participant.name}: {content}") + log.info(f"Last line in {participant.name} log: {content}") if constants.SUCCESS_MARKER in content: break log.info(f"Process is yet to complete for {participant.name}") time.sleep(45) + # Copy the log file from docker container to local machine everytime to get the latest logs + if test_env == "task_runner_dockerized_ws": + ssh.copy_file_from_docker( + participant.name, + f"/workspace/{participant.name}.log", + constants.AGG_COL_RESULT_FILE.format(local_bind_path, participant.name), + ) + if constants.SUCCESS_MARKER not in content: - log.error(f"Process failed/is incomplete for {participant.name} after timeout of {timeout} seconds") + log.error( + f"Process failed/is incomplete for {participant.name} after timeout of {timeout} seconds" + ) return False else: - log.info(f"Process completed for {participant.name} in {time.time() - start_time} seconds") + log.info( + f"Process completed for {participant.name} in {time.time() - start_time} seconds" + ) return True @@ -277,37 +440,32 @@ def federation_env_setup_and_validate(request): Args: request (object): Request object Returns: - tuple: Test environment, model name, workspace path, aggregator domain name + tuple: Model name, workspace path, local bind path, aggregator domain name """ + agg_domain_name = "localhost" + # Determine the test type based on the markers - markers = [m.name for m in request.node.iter_markers()] - os.environ["TEST_ENV"] = test_env = "docker" if "docker" in markers else "task_runner" + test_env = request.config.test_env # Validate the model name and create the workspace name if not request.config.model_name.upper() in constants.ModelName._member_names_: raise ValueError(f"Invalid model name: {request.config.model_name}") # Set the workspace path - home_dir = os.getenv("HOME") + home_dir = Path().home() + local_bind_path = os.path.join( + home_dir, request.config.results_dir, request.config.model_name + ) + workspace_path = local_bind_path - if test_env == "docker": - # First check if openfl image is available - dh.check_docker_image() + if test_env == "task_runner_dockerized_ws": + + agg_domain_name = "aggregator" # Cleanup docker containers dh.cleanup_docker_containers() dh.remove_docker_network() - - # Create docker network openfl dh.create_docker_network() - local_bind_path = os.path.join(home_dir, request.config.results_dir, request.config.model_name) - # Absolute path is required for docker - workspace_path = os.path.join("/", request.config.results_dir, request.config.model_name) - agg_domain_name = "aggregator" - else: - local_bind_path = workspace_path = os.path.join(home_dir, request.config.results_dir, request.config.model_name) - agg_domain_name = "localhost" - log.info( f"Running federation setup using {test_env} API on single machine with below configurations:\n" f"\tNumber of collaborators: {request.config.num_collaborators}\n" @@ -319,7 +477,7 @@ def federation_env_setup_and_validate(request): f"\tResults directory: {request.config.results_dir}\n" f"\tWorkspace path: {workspace_path}" ) - return test_env, request.config.model_name, workspace_path, local_bind_path, agg_domain_name + return request.config.model_name, workspace_path, local_bind_path, agg_domain_name def add_local_workspace_permission(local_bind_path): @@ -330,7 +488,7 @@ def add_local_workspace_permission(local_bind_path): agg_container_id (str): Container ID """ try: - agg_workspace_path = os.path.join(local_bind_path, "aggregator", "workspace") + agg_workspace_path = constants.AGG_WORKSPACE_PATH.format(local_bind_path) return_code, output, error = run_command( f"sudo chmod -R 777 {agg_workspace_path}", workspace_path=local_bind_path, @@ -338,7 +496,9 @@ def add_local_workspace_permission(local_bind_path): if return_code != 0: raise Exception(f"Failed to add local permission to workspace: {error}") - log.debug(f"Recursive permission added to workspace on local machine: {agg_workspace_path}") + log.debug( + f"Recursive permission added to workspace on local machine: {agg_workspace_path}" + ) except Exception as e: log.error(f"Failed to add local permission to workspace: {e}") raise e @@ -355,14 +515,14 @@ def create_persistent_store(participant_name, local_bind_path): # Create persistent store error_msg = f"Failed to create persistent store for {participant_name}" cmd_persistent_store = ( - f"export WORKING_DIRECTORY={local_bind_path}; " \ - f"mkdir -p $WORKING_DIRECTORY/{participant_name}/workspace; " \ + f"export WORKING_DIRECTORY={local_bind_path}; " + f"mkdir -p $WORKING_DIRECTORY/{participant_name}/workspace; " "sudo chmod -R 755 $WORKING_DIRECTORY" ) log.debug(f"Creating persistent store") return_code, output, error = run_command( cmd_persistent_store, - workspace_path=os.getenv("HOME"), + workspace_path=Path().home(), ) if error: raise ex.PersistentStoreCreationException(f"{error_msg}: {error}") @@ -373,7 +533,16 @@ def create_persistent_store(participant_name, local_bind_path): raise ex.PersistentStoreCreationException(f"{error_msg}: {e}") -def run_command(command, workspace_path, error_msg=None, container_id=None, run_in_background=False, bg_file=None, print_output=False): +def run_command( + command, + workspace_path, + error_msg=None, + container_id=None, + run_in_background=False, + bg_file=None, + print_output=False, + with_docker=False, +): """ Run the command Args: @@ -383,13 +552,14 @@ def run_command(command, workspace_path, error_msg=None, container_id=None, run_ run_in_background (bool): Run the command in background bg_file (str): Background file (with path) print_output (bool): Print the output + with_docker (bool): Flag specific to dockerized workspace scenario. Default is False. Returns: tuple: Return code, output and error """ return_code, output, error = 0, None, None error_msg = error_msg or "Failed to run the command" - is_docker = True if os.getenv("TEST_ENV") == "docker" else False - if is_docker and container_id: + + if with_docker and container_id: log.debug("Running command in docker container") if len(workspace_path): docker_command = f"docker exec -w {workspace_path} {container_id} sh -c " @@ -411,17 +581,16 @@ def run_command(command, workspace_path, error_msg=None, container_id=None, run_ if print_output: log.info(f"Running command: {command}") - log.debug("Running command on local machine") - if run_in_background and not is_docker: + if run_in_background and not with_docker: bg_file = open(bg_file, "w", buffering=1) - sh.run_command_background( + ssh.run_command_background( command, work_dir=workspace_path, redirect_to_file=bg_file, check_sleep=60, ) else: - return_code, output, error = sh.run_command(command) + return_code, output, error = ssh.run_command(command) if return_code != 0: log.error(f"{error_msg}: {error}") raise Exception(f"{error_msg}: {error}") @@ -433,7 +602,9 @@ def run_command(command, workspace_path, error_msg=None, container_id=None, run_ # This functionality is common across multiple participants, thus moved to a common function -def verify_cmd_output(output, return_code, error, error_msg, success_msg, raise_exception=True): +def verify_cmd_output( + output, return_code, error, error_msg, success_msg, raise_exception=True +): """ Verify the output of fx command run Assumption - it will have '✔️ OK' in the output if the command is successful @@ -457,15 +628,9 @@ def verify_cmd_output(output, return_code, error, error_msg, success_msg, raise_ def setup_collaborator(count, workspace_path, local_bind_path): """ Setup the collaborator - Args: - count (int): Count of collaborator - workspace_path (str): Workspace path - local_bind_path (str): Local bind path - Returns: - object: Collaborator object + Includes - creation of collaborator objects, starting docker container, importing workspace, creating collaborator """ - local_agg_ws_path = os.path.join(local_bind_path, "aggregator", "workspace") - collaborator = None + local_agg_ws_path = constants.AGG_WORKSPACE_PATH.format(local_bind_path) try: collaborator = col_model.Collaborator( @@ -476,25 +641,22 @@ def setup_collaborator(count, workspace_path, local_bind_path): create_persistent_store(collaborator.name, local_bind_path) except Exception as e: - raise ex.PersistentStoreCreationException(f"Failed to create persistent store for {collaborator.name}: {e}") - - try: - if os.getenv("TEST_ENV") == "docker": - container = dh.start_docker_container( - container_name=collaborator.name, - workspace_path=workspace_path, - local_bind_path=local_bind_path, - ) - collaborator.container_id = container.id - except Exception as e: - raise ex.DockerException(f"Failed to start {collaborator.name} docker environment: {e}") + raise ex.PersistentStoreCreationException( + f"Failed to create persistent store for {collaborator.name}: {e}" + ) try: - local_col_ws_path = os.path.join(local_bind_path, collaborator.name, "workspace") - copy_file_between_participants(local_agg_ws_path, local_col_ws_path, "workspace.zip") + local_col_ws_path = constants.COL_WORKSPACE_PATH.format( + local_bind_path, collaborator.name + ) + copy_file_between_participants( + local_agg_ws_path, local_col_ws_path, constants.AGG_WORKSPACE_ZIP_NAME + ) collaborator.import_workspace() except Exception as e: - raise ex.WorkspaceImportException(f"Failed to import workspace for {collaborator.name}: {e}") + raise ex.WorkspaceImportException( + f"Failed to import workspace for {collaborator.name}: {e}" + ) try: collaborator.create_collaborator() @@ -518,7 +680,7 @@ def extract_memory_usage(log_file): Exception: If memory usage data is not found in the log file. """ try: - with open(log_file, 'r') as file: + with open(log_file, "r") as file: content = file.read() pattern = r"Publish memory usage: (\[.*?\])" @@ -526,8 +688,8 @@ def extract_memory_usage(log_file): if match: memory_usage_data = match.group(1) - memory_usage_data = re.sub(r'\S+\.py:\d+', '', memory_usage_data) - memory_usage_data = memory_usage_data.replace('\n', '').replace(' ', '') + memory_usage_data = re.sub(r"\S+\.py:\d+", "", memory_usage_data) + memory_usage_data = memory_usage_data.replace("\n", "").replace(" ", "") memory_usage_data = memory_usage_data.replace("'", '"') memory_usage_dict = json.loads(memory_usage_data) return memory_usage_dict @@ -548,8 +710,45 @@ def write_memory_usage_to_file(memory_usage_dict, output_file): output_file (str): The path to the output file to which to write the memory usage data. """ try: - with open(output_file, 'w') as file: + with open(output_file, "w") as file: json.dump(memory_usage_dict, file, indent=4) except Exception as e: log.error(f"An error occurred while writing memory usage data to file: {e}") raise e + + +def start_docker_containers_for_dws( + participants, workspace_path, local_bind_path, image_name +): + """ + Start docker containers for the participants + Args: + participants (list): List of participant objects (collaborators and aggregator) + workspace_path (str): Workspace path + local_bind_path (str): Local bind path + image_name (str): Docker image name + """ + for participant in participants: + try: + if participant.name == "aggregator": + local_ws_path = f"{local_bind_path}/aggregator/workspace" + local_cert_tar = "cert_agg.tar" + else: + local_ws_path = f"{local_bind_path}/{participant.name}/workspace" + local_cert_tar = f"cert_col_{participant.name}.tar" + + # In case of dockerized workspace, the workspace gets created inside folder with image name + container = dh.start_docker_container( + container_name=participant.name, + workspace_path=workspace_path, + local_bind_path=local_bind_path, + image=image_name, + mount_mapping=[ + f"{local_ws_path}/{local_cert_tar}:/{image_name}/certs.tar" + ], + ) + participant.container_id = container.id + except Exception as e: + raise ex.DockerException( + f"Failed to start {participant.name} docker environment: {e}" + ) diff --git a/tests/end_to_end/utils/ssh_helper.py b/tests/end_to_end/utils/ssh_helper.py index 6b88db827e..2bd7c34ddf 100644 --- a/tests/end_to_end/utils/ssh_helper.py +++ b/tests/end_to_end/utils/ssh_helper.py @@ -129,3 +129,37 @@ def run_command( raise subprocess.CalledProcessError(returncode=result.returncode, cmd=cmd, stderr=result.stderr) return result.returncode, output, error + + +def get_git_repo_and_branch(): + """ + Get the current repository URL and branch name. + """ + try: + # Get the current repository URL + repo_url = subprocess.check_output(['git', 'config', '--get', 'remote.origin.url']).strip().decode('utf-8') + + # Get the current branch name + branch_name = subprocess.check_output(['git', 'rev-parse', '--abbrev-ref', 'HEAD']).strip().decode('utf-8') + + return repo_url, branch_name + except subprocess.CalledProcessError as e: + print(f"Error: {e}") + return None, None + + +def copy_file_from_docker(container_name, container_file_path, local_file_path): + """ + Copy a file from a Docker container to the local machine. + Args: + container_name (str): Name of the Docker container + container_file_path (str): Path of the file in the Docker container + local_file_path (str): Path of the file on the local machine + """ + try: + command = ['sudo', 'docker', 'cp', f'{container_name}:{container_file_path}', local_file_path] + subprocess.check_call(command) + log.info(f"Copied {container_file_path} from {container_name} to {local_file_path}.") + except subprocess.CalledProcessError as e: + log.error(f"Error: {e}") + raise e diff --git a/tests/end_to_end/utils/summary_helper.py b/tests/end_to_end/utils/summary_helper.py index d71cdd15f4..a832a281c7 100644 --- a/tests/end_to_end/utils/summary_helper.py +++ b/tests/end_to_end/utils/summary_helper.py @@ -4,13 +4,14 @@ import xml.etree.ElementTree as ET from lxml import etree import os +from pathlib import Path import tests.end_to_end.utils.constants as constants # Initialize the XML parser -parser = etree.XMLParser(recover=True, encoding='utf-8') +parser = etree.XMLParser(recover=True, encoding="utf-8") -result_path = os.path.join(os.getenv("HOME"), "results") +result_path = os.path.join(Path().home(), "results") result_xml = os.path.join(result_path, "results.xml") if not os.path.exists(result_xml): print(f"Results XML file not found at {result_xml}. Exiting...") @@ -32,7 +33,9 @@ def get_aggregated_accuracy(agg_log_file): """ agg_accuracy = "Not Found" if not os.path.exists(agg_log_file): - print(f"Aggregator log file {agg_log_file} not found. Cannot get aggregated accuracy") + print( + f"Aggregator log file {agg_log_file} not found. Cannot get aggregated accuracy" + ) return agg_accuracy # Example line(s) containing spaces and special characters: @@ -44,11 +47,13 @@ def get_aggregated_accuracy(agg_log_file): with open(agg_log_file, 'r') as f: for line in f: if "'metric_origin': 'aggregator'" in line and "aggregated_model_validation" in line: + # In Python versions < 3.11, aggregator.py file name appears in the line + # whereas in Python version 3.11, it is utils.py line = line.split("aggregator.py:")[0].strip() + line = line.split("utils.py:")[0].strip() # If the line does not contain closing bracket "}", then concatenate the next line reqd_line = line if "}" in line else line + next(f).strip() agg_accuracy = eval(reqd_line.split("METRIC")[1].strip('"'))["metric_value"] - break except Exception as e: # Do not fail the test if the accuracy cannot be fetched print(f"Error while reading aggregator log file: {e}") @@ -118,8 +123,20 @@ def main(): """ result = get_testcase_result() - if not all([os.getenv(var) for var in ["NUM_COLLABORATORS", "NUM_ROUNDS", "MODEL_NAME", "GITHUB_STEP_SUMMARY"]]): - print("One or more environment variables not set. Skipping writing to GitHub step summary") + if not all( + [ + os.getenv(var) + for var in [ + "NUM_COLLABORATORS", + "NUM_ROUNDS", + "MODEL_NAME", + "GITHUB_STEP_SUMMARY", + ] + ] + ): + print( + "One or more environment variables not set. Skipping writing to GitHub step summary" + ) return num_cols = os.getenv("NUM_COLLABORATORS") @@ -129,21 +146,34 @@ def main(): # Validate the model name and create the workspace name if not model_name.upper() in constants.ModelName._member_names_: - print(f"Invalid model name: {model_name}. Skipping writing to GitHub step summary") + print( + f"Invalid model name: {model_name}. Skipping writing to GitHub step summary" + ) return # Assumption - result directory is present in the home directory - agg_log_file = os.path.join(result_path, model_name, "aggregator", "workspace", "aggregator.log") + agg_log_file = os.path.join( + result_path, model_name, "aggregator", "workspace", "aggregator.log" + ) agg_accuracy = get_aggregated_accuracy(agg_log_file) # Write the results to GitHub step summary file # This file is created at runtime by the GitHub action, thus we cannot verify its existence beforehand - with open(summary_file, 'a') as fh: + with open(summary_file, "a") as fh: # DO NOT change the print statements - print("| Name | Time (in seconds) | Result | Error (if any) | Collaborators | Rounds to train | Score (if applicable) |", file=fh) - print("| ------------- | ------------- | ------------- | ------------- | ------------- | ------------- | ------------- |", file=fh) + print( + "| Name | Time (in seconds) | Result | Error (if any) | Collaborators | Rounds to train | Score (if applicable) |", + file=fh, + ) + print( + "| ------------- | ------------- | ------------- | ------------- | ------------- | ------------- | ------------- |", + file=fh, + ) for item in result: - print(f"| {item['name']} | {item['time']} | {item['result']} | {item['err_msg']} | {num_cols} | {num_rounds} | {agg_accuracy} |", file=fh) + print( + f"| {item['name']} | {item['time']} | {item['result']} | {item['err_msg']} | {num_cols} | {num_rounds} | {agg_accuracy} |", + file=fh, + ) if __name__ == "__main__": diff --git a/tests/end_to_end/utils/wf_helper.py b/tests/end_to_end/utils/wf_helper.py index 019d906ff5..fcde1118d0 100644 --- a/tests/end_to_end/utils/wf_helper.py +++ b/tests/end_to_end/utils/wf_helper.py @@ -1,9 +1,13 @@ +# Copyright 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + from metaflow import Flow import logging import numpy as np log = logging.getLogger(__name__) + def validate_flow(flow_obj, expected_flow_steps): """ Validate: