Skip to content

Commit

Permalink
Merge branch 'develop' into karansh1/callbacks_api
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterSkepticista committed Dec 10, 2024
2 parents d63ced5 + 44614b3 commit 9ce8983
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 49 deletions.
67 changes: 67 additions & 0 deletions .github/workflows/task_runner_e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,70 @@ jobs:
with:
name: tr_no_client_auth_${{ env.MODEL_NAME }}_python${{ env.PYTHON_VERSION }}_${{ github.run_id }}
path: result.tar

test_memory_logs:
name: tr_tls_memory_logs
runs-on: ubuntu-22.04
timeout-minutes: 15
strategy:
matrix:
# Testing non TLS scenario only for torch_cnn_mnist model and python 3.10
# If required, this can be extended to other models and python versions
model_name: ["torch_cnn_mnist"]
python_version: ["3.10"]
fail-fast: false # do not immediately fail if one of the combinations fail

env:
MODEL_NAME: ${{ matrix.model_name }}
PYTHON_VERSION: ${{ matrix.python_version }}

steps:
- name: Checkout OpenFL repository
id: checkout_openfl
uses: actions/[email protected]
with:
fetch-depth: 2 # needed for detecting changes
submodules: "true"
token: ${{ secrets.GITHUB_TOKEN }}

- name: Set up Python
id: setup_python
uses: actions/setup-python@v3
with:
python-version: ${{ env.PYTHON_VERSION }}

- name: Install dependencies
id: install_dependencies
run: |
python -m pip install --upgrade pip
pip install .
pip install -r test-requirements.txt
- name: Run Task Runner E2E tests without TLS
id: run_tests
run: |
python -m pytest -s tests/end_to_end/test_suites/memory_logs_tests.py \
--model_name ${{ env.MODEL_NAME }} --num_rounds ${{ env.NUM_ROUNDS }} \
--num_collaborators ${{ env.NUM_COLLABORATORS }} --log_memory_usage
echo "Task runner memory logs test run completed"
- name: Print test summary
id: print_test_summary
if: ${{ always() }}
run: |
export PYTHONPATH="$PYTHONPATH:."
python tests/end_to_end/utils/summary_helper.py
echo "Test summary printed"
- name: Tar files
id: tar_files
if: ${{ always() }}
run: tar -cvf result.tar results

- name: Upload Artifacts
id: upload_artifacts
uses: actions/upload-artifact@v4
if: ${{ always() }}
with:
name: tr_tls_memory_logs_${{ env.MODEL_NAME }}_python${{ env.PYTHON_VERSION }}_${{ github.run_id }}
path: result.tar
3 changes: 1 addition & 2 deletions .github/workflows/ubuntu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ env:

jobs:
pytest-coverage: # from pytest_coverage.yml
needs: lint
runs-on: ubuntu-latest
timeout-minutes: 15

Expand All @@ -35,7 +34,7 @@ jobs:
coverage report
cli:
needs: [lint, pytest-coverage]
needs: [pytest-coverage]
runs-on: ubuntu-latest
timeout-minutes: 15

Expand Down
1 change: 1 addition & 0 deletions openfl/utilities/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

"""Logs utilities."""

import json
import logging

from rich.console import Console
Expand Down
73 changes: 37 additions & 36 deletions tests/end_to_end/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Define a named tuple to store the objects for model owner, aggregator, and collaborators
federation_fixture = collections.namedtuple(
"federation_fixture",
"model_owner, aggregator, collaborators, model_name, require_client_auth, use_tls, workspace_path, results_dir, num_rounds",
"model_owner, aggregator, collaborators, workspace_path",
)

def pytest_addoption(parser):
Expand Down Expand Up @@ -180,6 +180,24 @@ def pytest_sessionfinish(session, exitstatus):
log.debug(f"Cleared .pytest_cache directory at {cache_dir}")


def pytest_configure(config):
"""
Configure the pytest plugin.
Args:
config: pytest config object
"""
# Declare some global variables
args = parse_arguments()
# Use the model name from the test case name if not provided as a command line argument
config.model_name = args.model_name
config.num_collaborators = args.num_collaborators
config.num_rounds = args.num_rounds
config.require_client_auth = not args.disable_client_auth
config.use_tls = not args.disable_tls
config.log_memory_usage = args.log_memory_usage
config.results_dir = config.getini("results_dir")


@pytest.fixture(scope="function")
def fx_federation(request, pytestconfig):
"""
Expand All @@ -196,58 +214,46 @@ def fx_federation(request, pytestconfig):
"""
collaborators = []
agg_domain_name = "localhost"

# Parse the command line arguments
args = parse_arguments()
# Use the model name from the test case name if not provided as a command line argument
model_name = args.model_name if args.model_name else request.node.name.split("test_")[1]
results_dir = pytestconfig.getini("results_dir")
num_collaborators = args.num_collaborators
num_rounds = args.num_rounds
require_client_auth = not args.disable_client_auth
use_tls = not args.disable_tls
log_memory_usage = args.log_memory_usage

log.info(
f"Running federation setup using Task Runner API on single machine with below configurations:\n"
f"\tNumber of collaborators: {num_collaborators}\n"
f"\tNumber of rounds: {num_rounds}\n"
f"\tModel name: {model_name}\n"
f"\tClient authentication: {require_client_auth}\n"
f"\tTLS: {use_tls}\n"
f"\tMemory Logs: {log_memory_usage}"
f"\tNumber of collaborators: {request.config.num_collaborators}\n"
f"\tNumber of rounds: {request.config.num_rounds}\n"
f"\tModel name: {request.config.model_name}\n"
f"\tClient authentication: {request.config.require_client_auth}\n"
f"\tTLS: {request.config.use_tls}\n"
f"\tMemory Logs: {request.config.log_memory_usage}"
)

# Validate the model name and create the workspace name
if not model_name.upper() in constants.ModelName._member_names_:
raise ValueError(f"Invalid model name: {model_name}")
if not request.config.model_name.upper() in constants.ModelName._member_names_:
raise ValueError(f"Invalid model name: {request.config.model_name}")

workspace_name = f"workspace_{model_name}"
workspace_name = request.config.model_name

# Create model owner object and the workspace for the model
model_owner = participants.ModelOwner(workspace_name, model_name, log_memory_usage)
model_owner = participants.ModelOwner(workspace_name, request.config.model_name, request.config.log_memory_usage)
try:
workspace_path = model_owner.create_workspace(results_dir=results_dir)
workspace_path = model_owner.create_workspace(results_dir=request.config.results_dir)
except Exception as e:
log.error(f"Failed to create the workspace: {e}")
raise e

# Modify the plan
try:
model_owner.modify_plan(
new_rounds=num_rounds,
num_collaborators=num_collaborators,
require_client_auth=require_client_auth,
use_tls=use_tls,
new_rounds=request.config.num_rounds,
num_collaborators=request.config.num_collaborators,
require_client_auth=request.config.require_client_auth,
use_tls=request.config.use_tls,
)
except Exception as e:
log.error(f"Failed to modify the plan: {e}")
raise e

if not use_tls:
if not request.config.use_tls:
log.info("Disabling TLS for communication")
try:
model_owner.register_collaborators(num_collaborators)
model_owner.register_collaborators(request.config.num_collaborators)
except Exception as e:
log.error(f"Failed to register the collaborators: {e}")
raise e
Expand All @@ -271,7 +277,7 @@ def fx_federation(request, pytestconfig):
agg_domain_name=agg_domain_name, workspace_path=workspace_path
)

for i in range(num_collaborators):
for i in range(request.config.num_collaborators):
collaborator = participants.Collaborator(
collaborator_name=f"collaborator{i+1}",
data_directory_path=i + 1,
Expand All @@ -285,10 +291,5 @@ def fx_federation(request, pytestconfig):
model_owner=model_owner,
aggregator=aggregator,
collaborators=collaborators,
model_name=model_name,
require_client_auth=require_client_auth,
use_tls=use_tls,
workspace_path=workspace_path,
results_dir=results_dir,
num_rounds=num_rounds,
)
73 changes: 73 additions & 0 deletions tests/end_to_end/test_suites/memory_logs_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright 2020-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import pytest
import logging
import os
import json

from tests.end_to_end.utils import federation_helper as fed_helper

log = logging.getLogger(__name__)


@pytest.mark.log_memory_usage
def test_log_memory_usage(request, fx_federation):
"""
This module contains end-to-end tests for logging memory usage in a federated learning setup.
Test Suite:
- test_log_memory_usage: Tests the memory usage logging functionality for the torch_cnn_mnist model.
Functions:
- test_log_memory_usage(request, fx_federation):
Test the memory usage logging functionality in a federated learning setup.
Parameters:
- request: The pytest request object containing configuration options.
- fx_federation: The fixture representing the federated learning setup.
Steps:
1. Skip the test if memory usage logging is disabled.
2. Setup PKI for trusted communication if TLS is enabled.
3. Start the federation and verify its completion.
4. Verify the existence of memory usage logs for the aggregator.
5. Verify the memory usage details for each round.
6. Verify the existence and details of memory usage logs for each collaborator.
7. Log the availability of memory usage details for all participants.
"""
# Skip test if fx_federation.log_memory_usage is False
if not request.config.log_memory_usage:
pytest.skip("Memory usage logging is disabled")

# Setup PKI for trusted communication within the federation
if request.config.use_tls:
assert fed_helper.setup_pki(fx_federation), "Failed to setup PKI for trusted communication"

# Start the federation
results = fed_helper.run_federation(fx_federation)

# Verify the completion of the federation run
assert fed_helper.verify_federation_run_completion(fx_federation, results, \
num_rounds=request.config.num_rounds), "Federation completion failed"
# Verify the aggregator memory logs
aggregator_memory_usage_file = os.path.join(fx_federation.workspace_path, "logs", "aggregator_memory_usage.json")
assert os.path.exists(aggregator_memory_usage_file), "Aggregator memory usage file is not available"

# Log the aggregator memory usage details
memory_usage_dict = json.load(open(aggregator_memory_usage_file))

# check memory usage entries for each round
assert len(memory_usage_dict) == request.config.num_rounds, \
"Memory usage details are not available for all rounds"

# check memory usage entries for each collaborator
for collaborator in fx_federation.collaborators:
collaborator_memory_usage_file = os.path.join(fx_federation.workspace_path,
"logs",
f"{collaborator.collaborator_name}_memory_usage.json")

assert os.path.exists(collaborator_memory_usage_file), f"Memory usage file for collaborator {collaborator.collaborator_name} is not available"

memory_usage_dict = json.load(open(collaborator_memory_usage_file))

assert len(memory_usage_dict) == request.config.num_rounds, \
f"Memory usage details are not available for all rounds for collaborator {collaborator.collaborator_name}"

log.info("Memory usage details are available for all participants")
21 changes: 12 additions & 9 deletions tests/end_to_end/test_suites/task_runner_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,51 +10,54 @@


@pytest.mark.torch_cnn_mnist
def test_torch_cnn_mnist(fx_federation):
def test_torch_cnn_mnist(request, fx_federation):
"""
Test for torch_cnn_mnist model.
"""
log.info("Testing torch_cnn_mnist model")

# Setup PKI for trusted communication within the federation
if fx_federation.use_tls:
if request.config.use_tls:
assert fed_helper.setup_pki(fx_federation), "Failed to setup PKI for trusted communication"

# Start the federation
results = fed_helper.run_federation(fx_federation)

# Verify the completion of the federation run
assert fed_helper.verify_federation_run_completion(fx_federation, results), "Federation completion failed"
assert fed_helper.verify_federation_run_completion(fx_federation, results,
num_rounds=request.config.num_rounds), "Federation completion failed"


@pytest.mark.keras_cnn_mnist
def test_keras_cnn_mnist(fx_federation):
def test_keras_cnn_mnist(request, fx_federation):
log.info("Testing keras_cnn_mnist model")

# Setup PKI for trusted communication within the federation
if fx_federation.use_tls:
if request.config.use_tls:
assert fed_helper.setup_pki(fx_federation), "Failed to setup PKI for trusted communication"

# Start the federation
results = fed_helper.run_federation(fx_federation)

# Verify the completion of the federation run
assert fed_helper.verify_federation_run_completion(fx_federation, results), "Federation completion failed"
assert fed_helper.verify_federation_run_completion(fx_federation, results,
num_rounds=request.config.num_rounds), "Federation completion failed"


@pytest.mark.torch_cnn_histology
def test_torch_cnn_histology(fx_federation):
def test_torch_cnn_histology(request, fx_federation):
"""
Test for torch_cnn_histology model
"""
log.info("Testing torch_cnn_histology model")

# Setup PKI for trusted communication within the federation
if fx_federation.use_tls:
if request.config.use_tls:
assert fed_helper.setup_pki(fx_federation), "Failed to setup PKI for trusted communication"

# Start the federation
results = fed_helper.run_federation(fx_federation)

# Verify the completion of the federation run
assert fed_helper.verify_federation_run_completion(fx_federation, results), "Federation completion failed"
assert fed_helper.verify_federation_run_completion(fx_federation, results,
num_rounds=request.config.num_rounds), "Federation completion failed"
Loading

0 comments on commit 9ce8983

Please sign in to comment.