-
Notifications
You must be signed in to change notification settings - Fork 210
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Functional workflow for LocalRuntime APIs (#1220)
* WorkFlow functional tests Signed-off-by: Chaurasiya, Payal <[email protected]> * Functional workflow Signed-off-by: Chaurasiya, Payal <[email protected]> * Workflow all functional tests Signed-off-by: Chaurasiya, Payal <[email protected]> * Functional test Signed-off-by: Chaurasiya, Payal <[email protected]> * Functional test Signed-off-by: Chaurasiya, Payal <[email protected]> * Functional test Signed-off-by: Chaurasiya, Payal <[email protected]> * fix python installs Signed-off-by: Chaurasiya, Payal <[email protected]> * fix python installs Signed-off-by: Chaurasiya, Payal <[email protected]> * fix python installs Signed-off-by: Chaurasiya, Payal <[email protected]> * keep import conditional Signed-off-by: Chaurasiya, Payal <[email protected]> * Remove extra files Signed-off-by: Chaurasiya, Payal <[email protected]> * Remove extra files Signed-off-by: Chaurasiya, Payal <[email protected]> * Add copyright message to workflow files Add copyright message to the top of each file in the "tests/end_to_end/workflow/" folder. * Add the copyright message to `exclude_flow.py` * Add the copyright message to `include_exclude_flow.py` * Add the copyright message to `include_flow.py` * Add the copyright message to `internal_loop.py` * Add the copyright message to `private_attr_both.py` * Add the copyright message to `private_attr_wo_callable.py` * Add the copyright message to `private_attributes_flow.py` * Add the copyright message to `reference_exclude.py` * Add the copyright message to `reference_flow.py` * Add the copyright message to `reference_include_flow.py` * Add the copyright message to `subset_flow.py` Signed-off-by: Chaurasiya, Payal <[email protected]> * Add copyright Signed-off-by: Chaurasiya, Payal <[email protected]> * Review comments fixes Signed-off-by: Chaurasiya, Payal <[email protected]> * docstring Signed-off-by: Chaurasiya, Payal <[email protected]> * docstring Signed-off-by: Chaurasiya, Payal <[email protected]> * Doc string Signed-off-by: Chaurasiya, Payal <[email protected]> * Add comments Signed-off-by: Chaurasiya, Payal <[email protected]> * Review comments Signed-off-by: Chaurasiya, Payal <[email protected]> * Review comments Signed-off-by: Chaurasiya, Payal <[email protected]> --------- Signed-off-by: Chaurasiya, Payal <[email protected]>
- Loading branch information
Showing
17 changed files
with
2,307 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
--- | ||
#--------------------------------------------------------------------------- | ||
# Workflow to run Task Runner E2E tests via Docker | ||
# Authors - Noopur, Payal Chaurasiya | ||
#--------------------------------------------------------------------------- | ||
name: Workflow Functional E2E | ||
|
||
on: | ||
pull_request: | ||
branches: [ develop ] | ||
types: [opened, synchronize, reopened, ready_for_review] | ||
|
||
workflow_dispatch: | ||
inputs: | ||
num_rounds: | ||
description: "Number of rounds to train" | ||
required: false | ||
default: "2" | ||
type: string | ||
num_collaborators: | ||
description: "Number of collaborators" | ||
required: false | ||
default: "2" | ||
type: string | ||
|
||
permissions: | ||
contents: read | ||
|
||
# Environment variables common for all the jobs | ||
env: | ||
NUM_ROUNDS: ${{ github.event.inputs.num_rounds || '2' }} | ||
NUM_COLLABORATORS: ${{ github.event.inputs.num_collaborators || '2' }} | ||
|
||
jobs: | ||
test_wf_func: | ||
if: github.event.pull_request.draft == false | ||
name: wf_func | ||
runs-on: ubuntu-22.04 | ||
timeout-minutes: 15 | ||
strategy: | ||
matrix: | ||
python_version: ["3.10"] | ||
fail-fast: false # do not immediately fail if one of the combinations fail | ||
|
||
steps: | ||
- name: Checkout OpenFL repository | ||
id: checkout_openfl | ||
uses: actions/[email protected] | ||
with: | ||
fetch-depth: 2 # needed for detecting changes | ||
submodules: "true" | ||
token: ${{ secrets.GITHUB_TOKEN }} | ||
|
||
- name: Set up Python | ||
id: setup_python | ||
uses: actions/setup-python@v3 | ||
with: | ||
python-version: ${{ matrix.python_version }} | ||
|
||
- name: Install dependencies | ||
id: install_dependencies | ||
run: | | ||
python -m pip install --upgrade pip | ||
pip install . | ||
pip install -r test-requirements.txt | ||
pip install -r openfl-tutorials/experimental/workflow/workflow_interface_requirements.txt | ||
- name: Run Work Flow Functional tests | ||
id: run_tests | ||
run: | | ||
python -m pytest -s tests/end_to_end/test_suites/wf_local_func_tests.py \ | ||
--num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} | ||
echo "Work Flow Functional tests run completed" | ||
- name: Print test summary | ||
id: print_test_summary | ||
if: ${{ always() }} | ||
run: | | ||
export PYTHONPATH="$PYTHONPATH:." | ||
python tests/end_to_end/utils/summary_helper.py | ||
echo "Test summary printed" | ||
- name: Create Tar (exclude cert and data folders) | ||
id: tar_files | ||
if: ${{ always() }} | ||
run: | | ||
tar -cvf result.tar --exclude="cert" --exclude="data" --exclude="__pycache__" $HOME/results | ||
- name: Upload Artifacts | ||
id: upload_artifacts | ||
uses: actions/upload-artifact@v4 | ||
if: ${{ always() }} | ||
with: | ||
name: wf_func_${{ github.event.inputs.model_name || 'default_model' }}_python${{ matrix.python_version }}_${{ github.run_id }} | ||
path: result.tar |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,4 +15,5 @@ venv/* | |
.eggs | ||
eggs/* | ||
*.pyi | ||
.metaflow/* | ||
results/* |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,251 @@ | ||
# Copyright 2020-2025 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
import logging | ||
import pytest | ||
import os | ||
import shutil | ||
import random | ||
from metaflow import Step | ||
|
||
from tests.end_to_end.utils.common_fixtures import fx_local_federated_workflow, fx_local_federated_workflow_prvt_attr | ||
from tests.end_to_end.workflow.exclude_flow import TestFlowExclude | ||
from tests.end_to_end.workflow.include_exclude_flow import TestFlowIncludeExclude | ||
from tests.end_to_end.workflow.include_flow import TestFlowInclude | ||
from tests.end_to_end.workflow.internal_loop import TestFlowInternalLoop | ||
from tests.end_to_end.workflow.reference_flow import TestFlowReference | ||
from tests.end_to_end.workflow.reference_include_flow import TestFlowReferenceWithInclude | ||
from tests.end_to_end.workflow.reference_exclude import TestFlowReferenceWithExclude | ||
from tests.end_to_end.workflow.subset_flow import TestFlowSubsetCollaborators | ||
from tests.end_to_end.workflow.private_attr_wo_callable import TestFlowPrivateAttributesWoCallable | ||
from tests.end_to_end.workflow.private_attributes_flow import TestFlowPrivateAttributes | ||
from tests.end_to_end.workflow.private_attr_both import TestFlowPrivateAttributesBoth | ||
|
||
from tests.end_to_end.utils import wf_helper as wf_helper | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
def test_exclude_flow(request, fx_local_federated_workflow): | ||
""" | ||
Test if variable is excluded, variables not show in next step | ||
and all other variables will be visible to next step | ||
""" | ||
log.info("Starting test_exclude_flow") | ||
flflow = TestFlowExclude(checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow.runtime | ||
for i in range(request.config.num_rounds): | ||
log.info(f"Starting round {i}...") | ||
flflow.run() | ||
log.info("Successfully ended test_exclude_flow") | ||
|
||
|
||
def test_include_exclude_flow(request, fx_local_federated_workflow): | ||
""" | ||
Test variables which are excluded will not show up in next step | ||
Test variables which are included will show up in next step | ||
""" | ||
log.info("Starting test_include_exclude_flow") | ||
flflow = TestFlowIncludeExclude(checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow.runtime | ||
for i in range(request.config.num_rounds): | ||
log.info(f"Starting round {i}...") | ||
flflow.run() | ||
log.info("Successfully ended test_include_exclude_flow") | ||
|
||
|
||
def test_include_flow(request, fx_local_federated_workflow): | ||
""" | ||
Test if variable is included, variables will show up in next step | ||
All other variables will not show up | ||
""" | ||
log.info("Starting test_include_flow") | ||
flflow = TestFlowInclude(checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow.runtime | ||
for i in range(request.config.num_rounds): | ||
log.info(f"Starting round {i}...") | ||
flflow.run() | ||
log.info("Successfully ended test_include_flow") | ||
|
||
|
||
def test_internal_loop(request, fx_local_federated_workflow): | ||
""" | ||
Verify that through internal loop, rounds to train is set | ||
""" | ||
log.info("Starting test_internal_loop") | ||
model = None | ||
optimizer = None | ||
|
||
flflow = TestFlowInternalLoop(model, optimizer, request.config.num_rounds, checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow.runtime | ||
flflow.run() | ||
|
||
expected_flow_steps = [ | ||
"join", | ||
"internal_loop", | ||
"agg_model_mean", | ||
"collab_model_update", | ||
"local_model_mean", | ||
"start", | ||
"end", | ||
] | ||
|
||
steps_present_in_cli, missing_steps_in_cli, extra_steps_in_cli = wf_helper.validate_flow( | ||
flflow, expected_flow_steps | ||
) | ||
|
||
assert len(steps_present_in_cli) == len(expected_flow_steps), "Number of steps fetched from Datastore through CLI do not match the Expected steps provided" | ||
assert len(missing_steps_in_cli) == 0, f"Following steps missing from Datastore: {missing_steps_in_cli}" | ||
assert len(extra_steps_in_cli) == 0, f"Following steps are extra in Datastore: {extra_steps_in_cli}" | ||
assert flflow.end_count == 1, "End function called more than one time" | ||
|
||
log.info("\n Summary of internal flow testing \n" | ||
"No issues found and below are the tests that ran successfully\n" | ||
"1. Number of training completed is equal to training rounds\n" | ||
"2. CLI steps and Expected steps are matching\n" | ||
"3. Number of tasks are aligned with number of rounds and number of collaborators\n" | ||
"4. End function executed one time") | ||
log.info("Successfully ended test_internal_loop") | ||
|
||
|
||
@pytest.mark.parametrize("fx_local_federated_workflow", [("init_collaborator_private_attr_index", "int", None )], indirect=True) | ||
def test_reference_flow(request, fx_local_federated_workflow): | ||
""" | ||
Test reference variables matched through out the flow | ||
""" | ||
log.info("Starting test_reference_flow") | ||
flflow = TestFlowReference(checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow.runtime | ||
for i in range(request.config.num_rounds): | ||
log.info(f"Starting round {i}...") | ||
flflow.run() | ||
log.info("Successfully ended test_reference_flow") | ||
|
||
|
||
def test_reference_include_flow(request, fx_local_federated_workflow): | ||
""" | ||
Test reference variables matched if included else not | ||
""" | ||
log.info("Starting test_reference_include_flow") | ||
flflow = TestFlowReferenceWithInclude(checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow.runtime | ||
for i in range(request.config.num_rounds): | ||
log.info(f"Starting round {i}...") | ||
flflow.run() | ||
log.info("Successfully ended test_reference_include_flow") | ||
|
||
|
||
def test_reference_exclude_flow(request, fx_local_federated_workflow): | ||
""" | ||
Test reference variables matched if not excluded | ||
""" | ||
log.info("Starting test_reference_exclude_flow") | ||
flflow = TestFlowReferenceWithExclude(checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow.runtime | ||
for i in range(request.config.num_rounds): | ||
log.info(f"Starting round {i}...") | ||
flflow.run() | ||
log.info("Successfully ended test_reference_exclude_flow") | ||
|
||
|
||
@pytest.mark.parametrize("fx_local_federated_workflow", [("init_collaborator_private_attr_name", "str", None )], indirect=True) | ||
def test_subset_collaborators(request, fx_local_federated_workflow): | ||
""" | ||
Test the subset of collaborators in a federated workflow. | ||
Parameters: | ||
request (FixtureRequest): The request fixture provides information about the requesting test function. | ||
fx_local_federated_workflow (Fixture): The fixture for the local federated workflow. | ||
Tests: | ||
- Ensure the test starts and ends correctly. | ||
- Verify the number of collaborators matches the expected subset. | ||
- Check that the flow runs for each subset collaborator. | ||
""" | ||
log.info("Starting test_subset_collaborators") | ||
collaborators = fx_local_federated_workflow.collaborators | ||
|
||
random_ints = random.sample(range(1, len(collaborators) + 1), len(collaborators)) | ||
|
||
collaborators = fx_local_federated_workflow.runtime.collaborators | ||
for round_num in range(len(collaborators)): | ||
log.info(f"Starting round {round_num}...") | ||
|
||
if os.path.exists(".metaflow"): | ||
shutil.rmtree(".metaflow") | ||
|
||
flflow = TestFlowSubsetCollaborators(checkpoint=True, random_ints=random_ints) | ||
flflow.runtime = fx_local_federated_workflow.runtime | ||
flflow.run() | ||
subset_collaborators = flflow.subset_collaborators | ||
collaborators_ran = flflow.collaborators_ran | ||
random_ints = flflow.random_ints | ||
random_ints.remove(len(subset_collaborators)) | ||
|
||
step = Step( | ||
f"TestFlowSubsetCollaborators/{flflow._run_id}/" | ||
+ "test_valid_collaborators" | ||
) | ||
|
||
assert len(list(step)) == len(subset_collaborators), ( | ||
f"...Flow only ran for {len(list(step))} " | ||
+ f"instead of the {len(subset_collaborators)} expected " | ||
+ f"collaborators- Testcase Failed." | ||
) | ||
log.info( | ||
f"Found {len(list(step))} tasks for each of the " | ||
+ f"{len(subset_collaborators)} collaborators" | ||
) | ||
log.info(f'subset_collaborators = {subset_collaborators}') | ||
log.info(f'collaborators_ran = {collaborators_ran}') | ||
for collaborator_name in subset_collaborators: | ||
assert collaborator_name in collaborators_ran, ( | ||
f"...Flow did not execute for " | ||
+ f"collaborator {collaborator_name}" | ||
+ f" - Testcase Failed." | ||
) | ||
|
||
log.info( | ||
f"Testing FederatedFlow - Ending test for validating " | ||
+ f"the subset of collaborators.") | ||
log.info("Successfully ended test_subset_collaborators") | ||
|
||
|
||
def test_private_attr_wo_callable(request, fx_local_federated_workflow_prvt_attr): | ||
""" | ||
Set private attribute without callable function i.e through direct assignment | ||
""" | ||
log.info("Starting test_private_attr_wo_callable") | ||
flflow = TestFlowPrivateAttributesWoCallable(checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow_prvt_attr.runtime | ||
for i in range(request.config.num_rounds): | ||
log.info(f"Starting round {i}...") | ||
flflow.run() | ||
log.info("Successfully ended test_private_attr_wo_callable") | ||
|
||
|
||
@pytest.mark.parametrize("fx_local_federated_workflow", [("init_collaborate_pvt_attr_np", "int", "init_agg_pvt_attr_np" )], indirect=True) | ||
def test_private_attributes(request, fx_local_federated_workflow): | ||
""" | ||
Set private attribute through callable function | ||
""" | ||
log.info("Starting test_private_attributes") | ||
flflow = TestFlowPrivateAttributes(checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow.runtime | ||
for i in range(request.config.num_rounds): | ||
log.info(f"Starting round {i}...") | ||
flflow.run() | ||
log.info("Successfully ended test_private_attributes") | ||
|
||
|
||
@pytest.mark.parametrize("fx_local_federated_workflow_prvt_attr", [("init_collaborate_pvt_attr_np", "int", "init_agg_pvt_attr_np" )], indirect=True) | ||
def test_private_attr_both(request, fx_local_federated_workflow_prvt_attr): | ||
""" | ||
Set private attribute through callable function and direct assignment | ||
""" | ||
log.info("Starting test_private_attr_both") | ||
flflow = TestFlowPrivateAttributesBoth(checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow_prvt_attr.runtime | ||
for i in range(5): | ||
log.info(f"Starting round {i}...") | ||
flflow.run() | ||
log.info("Successfully ended test_private_attr_both") |
Oops, something went wrong.