Skip to content

Commit

Permalink
Reverted docker native model change
Browse files Browse the repository at this point in the history
Signed-off-by: noopur <[email protected]>
  • Loading branch information
noopurintel committed Dec 18, 2024
1 parent cd37e6c commit c462f8e
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 80 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/task_runner_basic_e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,9 @@ jobs:
id: run_tests
run: |
python -m pytest -s tests/end_to_end/test_suites/memory_logs_tests.py \
--model_name ${{ env.MODEL_NAME }} --num_rounds ${{ env.NUM_ROUNDS }} \
--num_collaborators ${{ env.NUM_COLLABORATORS }} --log_memory_usage
-k test_log_memory_usage_basic --model_name ${{ env.MODEL_NAME }} \
--num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} \
--log_memory_usage
echo "Task runner memory logs test run completed"
- name: Post test run
Expand Down
42 changes: 42 additions & 0 deletions .github/workflows/task_runner_dockerized_ws_e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,45 @@ jobs:
if: ${{ always() }}
with:
test_type: "tr_no_client_auth_dockerized_ws"

test_memory_logs_dockerized_ws:
name: tr_tls_memory_logs_dockerized_ws
runs-on: ubuntu-22.04
timeout-minutes: 15
strategy:
matrix:
model_name: ["keras_cnn_mnist"]
python_version: ["3.10"]
fail-fast: false # do not immediately fail if one of the combinations fail

env:
MODEL_NAME: ${{ matrix.model_name }}
PYTHON_VERSION: ${{ matrix.python_version }}

steps:
- name: Checkout OpenFL repository
id: checkout_openfl
uses: actions/[email protected]
with:
fetch-depth: 2 # needed for detecting changes
submodules: "true"
token: ${{ secrets.GITHUB_TOKEN }}

- name: Pre test run
uses: ./.github/actions/tr_pre_test_run
if: ${{ always() }}

- name: Run Task Runner E2E tests with TLS and memory logs
id: run_tests
run: |
python -m pytest -s tests/end_to_end/test_suites/memory_logs_tests.py \
-k test_log_memory_usage_dockerized_ws --model_name ${{ env.MODEL_NAME }} \
--num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} \
--log_memory_usage
echo "Task runner memory logs test run completed"
- name: Post test run
uses: ./.github/actions/tr_post_test_run
if: ${{ always() }}
with:
test_type: "tr_tls_memory_logs_dockerized_ws"
3 changes: 1 addition & 2 deletions .github/workflows/tr_docker_native.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ on:
pull_request:
branches: [ develop ]
types: [opened, synchronize, reopened, ready_for_review]
workflow_dispatch:

permissions:
contents: read
Expand All @@ -29,7 +28,7 @@ jobs:
- name: Create workspace image
run: |
fx workspace create --prefix example_workspace --template torch_cnn_mnist
fx workspace create --prefix example_workspace --template keras_cnn_mnist
cd example_workspace
fx plan initialize -a localhost
fx workspace dockerize --save --revision https://github.com/${GITHUB_REPOSITORY}.git@${{ github.event.pull_request.head.sha }}
Expand Down
61 changes: 37 additions & 24 deletions tests/end_to_end/test_suites/memory_logs_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,64 @@
import os
import json

from tests.end_to_end.utils.common_fixtures import fx_federation_tr
from tests.end_to_end.utils.common_fixtures import fx_federation_tr, fx_federation_tr_dws
import tests.end_to_end.utils.constants as constants
from tests.end_to_end.utils import federation_helper as fed_helper

log = logging.getLogger(__name__)


# NOTE: This test file contains the test cases for logging memory usage in a federated learning setup.

@pytest.mark.task_runner_basic
@pytest.mark.log_memory_usage
def test_log_memory_usage(request, fx_federation_tr):
def test_log_memory_usage_basic(request, fx_federation_tr):
"""
This module contains end-to-end tests for logging memory usage in a federated learning setup.
Test Suite:
- test_log_memory_usage: Tests the memory usage logging functionality for the torch_cnn_mnist model.
Functions:
- test_log_memory_usage(request, fx_federation):
Test the memory usage logging functionality in a federated learning setup.
Parameters:
Args:
- request: The pytest request object containing configuration options.
- fx_federation_tr: The fixture representing the federated learning setup.
Steps:
1. Skip the test if memory usage logging is disabled.
2. Setup PKI for trusted communication if TLS is enabled.
3. Start the federation and verify its completion.
4. Verify the existence of memory usage logs for the aggregator.
5. Verify the memory usage details for each round.
6. Verify the existence and details of memory usage logs for each collaborator.
7. Log the availability of memory usage details for all participants.
"""
# Skip test if fx_federation.log_memory_usage is False
if not request.config.log_memory_usage:
pytest.skip("Memory usage logging is disabled")

_log_memory_usage(request, fx_federation_tr)


@pytest.mark.log_memory_usage
def test_log_memory_usage_dockerized_ws(request, fx_federation_tr_dws):
"""
Test the memory usage logging functionality in a federated learning setup.
Args:
- request: The pytest request object containing configuration options.
- fx_federation_tr_dws: The fixture representing the federated learning setup with dockerized workspace.
"""
# Skip test if fx_federation.log_memory_usage is False
if not request.config.log_memory_usage:
pytest.skip("Memory usage logging is disabled")

_log_memory_usage(request, fx_federation_tr_dws)


def _log_memory_usage(request, fed_obj):
"""
Test the memory usage logging functionality in a federated learning setup.
Steps:
1. Setup PKI for trusted communication if TLS is enabled.
2. Start the federation and verify its completion.
3. Verify the existence of memory usage logs for the aggregator.
4. Verify the memory usage details for each round.
5. Verify the existence and details of memory usage logs for each collaborator.
6. Log the availability of memory usage details for all participants.
"""
# Start the federation
results = fed_helper.run_federation(fx_federation_tr)
results = fed_helper.run_federation(fed_obj)

# Verify the completion of the federation run
assert fed_helper.verify_federation_run_completion(
fx_federation_tr, results, num_rounds=request.config.num_rounds
fed_obj, results, test_env=request.config.test_env, num_rounds=request.config.num_rounds
), "Federation completion failed"

# Verify the aggregator memory logs
aggregator_memory_usage_file = constants.AGG_MEM_USAGE_JSON.format(fx_federation_tr.workspace_path)
aggregator_memory_usage_file = constants.AGG_MEM_USAGE_JSON.format(fed_obj.workspace_path)
assert os.path.exists(
aggregator_memory_usage_file
), "Aggregator memory usage file is not available"
Expand All @@ -64,9 +77,9 @@ def test_log_memory_usage(request, fx_federation_tr):
), "Memory usage details are not available for all rounds"

# check memory usage entries for each collaborator
for collaborator in fx_federation_tr.collaborators:
for collaborator in fed_obj.collaborators:
collaborator_memory_usage_file = constants.COL_MEM_USAGE_JSON.format(
fx_federation_tr.workspace_path, collaborator.name
fed_obj.workspace_path, collaborator.name
)

assert os.path.exists(
Expand Down
10 changes: 8 additions & 2 deletions tests/end_to_end/test_suites/sample_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ def test_federation_basic(request, fx_federation_tr):

# Verify the completion of the federation run
assert fed_helper.verify_federation_run_completion(
fx_federation_tr, results, num_rounds=request.config.num_rounds
fx_federation_tr,
results,
test_env=request.config.test_env,
num_rounds=request.config.num_rounds,
), "Federation completion failed"


Expand All @@ -54,5 +57,8 @@ def test_federation_via_dockerized_workspace(request, fx_federation_tr_dws):

# Verify the completion of the federation run
assert fed_helper.verify_federation_run_completion(
fx_federation_tr_dws, results, request.config.num_rounds
fx_federation_tr_dws,
results,
test_env=request.config.test_env,
num_rounds=request.config.num_rounds,
), "Federation completion failed"
21 changes: 17 additions & 4 deletions tests/end_to_end/test_suites/task_runner_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import pytest
import logging

from tests.end_to_end.utils.common_fixtures import fx_federation_tr, fx_federation_tr_dws
from tests.end_to_end.utils.common_fixtures import (
fx_federation_tr,
fx_federation_tr_dws,
)
from tests.end_to_end.utils import federation_helper as fed_helper

log = logging.getLogger(__name__)
Expand All @@ -23,7 +26,10 @@ def test_federation_via_native(request, fx_federation_tr):

# Verify the completion of the federation run
assert fed_helper.verify_federation_run_completion(
fx_federation_tr, results, num_rounds=request.config.num_rounds
fx_federation_tr,
results,
test_env=request.config.test_env,
num_rounds=request.config.num_rounds,
), "Federation completion failed"


Expand All @@ -36,7 +42,14 @@ def test_federation_via_dockerized_workspace(request, fx_federation_tr_dws):
fx_federation_tr_dws (Fixture): Pytest fixture for dockerized workspace
"""
# Start the federation
results = fed_helper.run_federation_for_dws(fx_federation_tr_dws, use_tls=request.config.use_tls)
results = fed_helper.run_federation_for_dws(
fx_federation_tr_dws, use_tls=request.config.use_tls
)

# Verify the completion of the federation run
assert fed_helper.verify_federation_run_completion(fx_federation_tr_dws, results, request.config.num_rounds), "Federation completion failed"
assert fed_helper.verify_federation_run_completion(
fx_federation_tr_dws,
results,
test_env=request.config.test_env,
num_rounds=request.config.num_rounds,
), "Federation completion failed"
24 changes: 8 additions & 16 deletions tests/end_to_end/utils/common_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
import numpy as np

import tests.end_to_end.utils.constants as constants
# from tests.end_to_end.utils.wf_helper import (
# init_collaborator_private_attr_index,
# init_collaborator_private_attr_name,
# init_collaborate_pvt_attr_np,
# init_agg_pvt_attr_np
# )
from tests.end_to_end.utils.wf_helper import (
init_collaborator_private_attr_index,
init_collaborator_private_attr_name,
init_collaborate_pvt_attr_np,
init_agg_pvt_attr_np
)
import tests.end_to_end.utils.federation_helper as fh
import tests.end_to_end.utils.ssh_helper as ssh
from tests.end_to_end.models import aggregator as agg_model, model_owner as mo_model
Expand Down Expand Up @@ -45,12 +45,7 @@ def fx_federation_tr(request):
Note: As this is a function level fixture, thus no import is required at test level.
"""
test_env = fh.get_test_env_from_markers(request)

if test_env != "task_runner_basic":
raise ValueError(
"Fixture fx_federation_tr is only supported for task_runner_basic marker"
)
request.config.test_env = "task_runner_basic"

collaborators = []
executor = concurrent.futures.ThreadPoolExecutor()
Expand Down Expand Up @@ -143,10 +138,7 @@ def fx_federation_tr_dws(request):
Note: As this is a function level fixture, thus no import is required at test level.
"""
if fh.get_test_env_from_markers(request) != "task_runner_dockerized_ws":
raise ValueError(
"Fixture fx_federation_tr_dws is only supported for task_runner_dockerized_ws marker"
)
request.config.test_env = "task_runner_dockerized_ws"

collaborators = []
executor = concurrent.futures.ThreadPoolExecutor()
Expand Down
35 changes: 5 additions & 30 deletions tests/end_to_end/utils/federation_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
import json
import re
from functools import lru_cache

import tests.end_to_end.utils.constants as constants
import tests.end_to_end.utils.docker_helper as dh
Expand Down Expand Up @@ -330,12 +329,13 @@ def install_dependencies_on_collaborators(fed_obj):
raise Exception("Failed to install dependencies on one or more collaborators")


def verify_federation_run_completion(fed_obj, results, num_rounds):
def verify_federation_run_completion(fed_obj, results, test_env, num_rounds):
"""
Verify the completion of the process for all the participants
Args:
fed_obj (object): Federation fixture object
results (list): List of results
test_env (str): Test environment
num_rounds (int): Number of rounds
Returns:
list: List of response (True or False) for all the participants
Expand All @@ -350,6 +350,7 @@ def verify_federation_run_completion(fed_obj, results, num_rounds):
participant,
num_rounds,
results[i],
test_env,
local_bind_path=fed_obj.local_bind_path,
)
for i, participant in enumerate(fed_obj.collaborators + [fed_obj.aggregator])
Expand All @@ -365,7 +366,7 @@ def verify_federation_run_completion(fed_obj, results, num_rounds):


def _verify_completion_for_participant(
participant, num_rounds, result_file, time_for_each_round=100, local_bind_path=None
participant, num_rounds, result_file, test_env, time_for_each_round=100, local_bind_path=None
):
"""
Verify the completion of the process for the participant
Expand All @@ -382,7 +383,6 @@ def _verify_completion_for_participant(
# Set timeout based on the number of rounds and time for each round
timeout = 600 + (time_for_each_round * num_rounds) # in seconds

test_env = get_test_env_from_markers()
# In case of docker environment, get the logs from local path which is mounted to the container
if test_env == "task_runner_dockerized_ws":
result_file = constants.AGG_COL_RESULT_FILE.format(
Expand Down Expand Up @@ -433,31 +433,6 @@ def _verify_completion_for_participant(
return True


@lru_cache(maxsize=50)
def get_test_env_from_markers(request=None):
"""
Get test environment based on test case markers
Keeping request argument as optional so that it can be used in other functions as well
"""
if os.getenv("TEST_ENV"):
return os.getenv("TEST_ENV")

# Determine the test type based on the markers
markers = [m.name for m in request.node.iter_markers()]
if "task_runner_basic" in markers:
test_env = "task_runner_basic"
elif "task_runner_dockerized_ws" in markers:
test_env = "task_runner_dockerized_ws"
else:
raise ValueError(
"Invalid test environment. Provide one of the valid markers: "
"task_runner_basic and task_runner_dockerized_ws"
)

os.environ["TEST_ENV"] = test_env
return test_env


def federation_env_setup_and_validate(request):
"""
Setup the federation environment and validate the configurations
Expand All @@ -469,7 +444,7 @@ def federation_env_setup_and_validate(request):
agg_domain_name = "localhost"

# Determine the test type based on the markers
test_env = get_test_env_from_markers(request)
test_env = request.config.test_env

# Validate the model name and create the workspace name
if not request.config.model_name.upper() in constants.ModelName._member_names_:
Expand Down

0 comments on commit c462f8e

Please sign in to comment.