-
Notifications
You must be signed in to change notification settings - Fork 210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Functional workflow for LocalRuntime APIs #1220
Merged
rajithkrishnegowda
merged 23 commits into
securefederatedai:develop
from
payalcha:functional-workflow
Dec 18, 2024
Merged
Changes from 12 commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
6420cb5
WorkFlow functional tests
payalcha c853117
Functional workflow
payalcha 93fbed1
Workflow all functional tests
payalcha fd91a85
Merge branch 'develop' into functional-workflow
payalcha cb2950d
Functional test
payalcha 212fdbe
Merge branch 'functional-workflow' of https://github.com/payalcha/ope…
payalcha aab00f0
Functional test
payalcha 98ea164
Functional test
payalcha 5489e9c
fix python installs
payalcha f7fe465
fix python installs
payalcha 8ac2f07
fix python installs
payalcha f8b0c03
keep import conditional
payalcha a3351b3
Remove extra files
payalcha d7d5baf
Remove extra files
payalcha 5831f2f
Add copyright message to workflow files
payalcha b93927e
Add copyright
payalcha 956b137
Review comments fixes
payalcha 6965f66
docstring
payalcha 54ad590
docstring
payalcha 68b0b8c
Doc string
payalcha 362a718
Add comments
payalcha 290aa50
Review comments
payalcha 62edfca
Review comments
payalcha File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
--- | ||
#--------------------------------------------------------------------------- | ||
# Workflow to run Task Runner E2E tests via Docker | ||
# Authors - Noopur, Payal Chaurasiya | ||
#--------------------------------------------------------------------------- | ||
name: Workflow Functional E2E | ||
|
||
on: | ||
pull_request: | ||
branches: [ develop ] | ||
types: [opened, synchronize, reopened, ready_for_review] | ||
|
||
workflow_dispatch: | ||
inputs: | ||
num_rounds: | ||
description: "Number of rounds to train" | ||
required: false | ||
default: "2" | ||
type: string | ||
num_collaborators: | ||
description: "Number of collaborators" | ||
required: false | ||
default: "2" | ||
type: string | ||
|
||
permissions: | ||
contents: read | ||
|
||
# Environment variables common for all the jobs | ||
env: | ||
NUM_ROUNDS: ${{ github.event.inputs.num_rounds || '2' }} | ||
NUM_COLLABORATORS: ${{ github.event.inputs.num_collaborators || '2' }} | ||
|
||
jobs: | ||
test_wf_func: | ||
name: wf_func | ||
runs-on: ubuntu-22.04 | ||
timeout-minutes: 15 | ||
strategy: | ||
matrix: | ||
python_version: ["3.10"] | ||
fail-fast: false # do not immediately fail if one of the combinations fail | ||
|
||
steps: | ||
- name: Checkout OpenFL repository | ||
id: checkout_openfl | ||
uses: actions/[email protected] | ||
with: | ||
fetch-depth: 2 # needed for detecting changes | ||
submodules: "true" | ||
token: ${{ secrets.GITHUB_TOKEN }} | ||
|
||
- name: Set up Python | ||
id: setup_python | ||
uses: actions/setup-python@v3 | ||
with: | ||
python-version: ${{ matrix.python_version }} | ||
|
||
- name: Install dependencies | ||
id: install_dependencies | ||
run: | | ||
python -m pip install --upgrade pip | ||
pip install . | ||
pip install -r test-requirements.txt | ||
pip install -r openfl-tutorials/experimental/workflow/workflow_interface_requirements.txt | ||
payalcha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pip install -r tests/github/experimental/workflow/LocalRuntime/requirements_experimental_localruntime_tests.txt | ||
|
||
- name: Run Work Flow Functional tests | ||
id: run_tests | ||
run: | | ||
python -m pytest -s tests/end_to_end/test_suites/wf_local_func_tests.py \ | ||
--num_rounds ${{ env.NUM_ROUNDS }} --num_collaborators ${{ env.NUM_COLLABORATORS }} | ||
echo "Work Flow Functional tests run completed" | ||
|
||
- name: Print test summary | ||
id: print_test_summary | ||
if: ${{ always() }} | ||
run: | | ||
export PYTHONPATH="$PYTHONPATH:." | ||
python tests/end_to_end/utils/summary_helper.py | ||
echo "Test summary printed" | ||
|
||
- name: Create Tar (exclude cert and data folders) | ||
id: tar_files | ||
if: ${{ always() }} | ||
run: | | ||
tar -cvf result.tar --exclude="cert" --exclude="data" --exclude="__pycache__" $HOME/results | ||
|
||
- name: Upload Artifacts | ||
id: upload_artifacts | ||
uses: actions/upload-artifact@v4 | ||
if: ${{ always() }} | ||
with: | ||
name: wf_func_${{ github.event.inputs.model_name || 'default_model' }}_python${{ matrix.python_version }}_${{ github.run_id }} | ||
path: result.tar |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,203 @@ | ||
import logging | ||
payalcha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
import pytest | ||
import os | ||
import shutil | ||
import random | ||
from metaflow import Step, Flow | ||
|
||
from tests.end_to_end.utils.common_fixtures import fx_local_federated_workflow, fx_local_federated_workflow_prvt_attr | ||
from tests.end_to_end.workflow.exclude_flow import TestFlowExclude | ||
from tests.end_to_end.workflow.include_exclude_flow import TestFlowIncludeExclude | ||
from tests.end_to_end.workflow.include_flow import TestFlowInclude | ||
from tests.end_to_end.workflow.internal_loop import TestFlowInternalLoop | ||
from tests.end_to_end.workflow.reference_flow import TestFlowReference | ||
from tests.end_to_end.workflow.reference_include_flow import TestFlowReferenceWithInclude | ||
from tests.end_to_end.workflow.reference_exclude import TestFlowReferenceWithExclude | ||
from tests.end_to_end.workflow.subset_flow import TestFlowSubsetCollaborators | ||
from tests.end_to_end.workflow.private_attr_wo_callable import TestFlowPrivateAttributesWoCallable | ||
from tests.end_to_end.workflow.private_attributes_flow import TestFlowPrivateAttributes | ||
from tests.end_to_end.workflow.private_attr_both import TestFlowPrivateAttributesBoth | ||
|
||
from tests.end_to_end.utils import wf_helper as wf_helper | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
def test_exclude_flow(request, fx_local_federated_workflow): | ||
payalcha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
log.info("***** Starting test_exclude_flow *****") | ||
payalcha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
flflow = TestFlowExclude(checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow.runtime | ||
for i in range(request.config.num_rounds): | ||
log.info(f"Starting round {i}...") | ||
flflow.run() | ||
log.info("***** Successfully ended test_exclude_flow *****") | ||
|
||
|
||
def test_include_exclude_flow(request, fx_local_federated_workflow): | ||
log.info("***** Starting test_include_exclude_flow *****") | ||
flflow = TestFlowIncludeExclude(checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow.runtime | ||
for i in range(request.config.num_rounds): | ||
log.info(f"Starting round {i}...") | ||
flflow.run() | ||
log.info("***** Successfully ended test_include_exclude_flow *****") | ||
|
||
|
||
def test_include_flow(request, fx_local_federated_workflow): | ||
log.info("***** Starting test_include_flow *****") | ||
flflow = TestFlowInclude(checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow.runtime | ||
for i in range(request.config.num_rounds): | ||
log.info(f"Starting round {i}...") | ||
flflow.run() | ||
log.info("***** Successfully ended test_include_flow *****") | ||
|
||
|
||
def test_internal_loop(request, fx_local_federated_workflow): | ||
log.info("***** Starting test_internal_loop *****") | ||
model = None | ||
optimizer = None | ||
|
||
flflow = TestFlowInternalLoop(model, optimizer, 5, checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow.runtime | ||
flflow.run() | ||
|
||
expected_flow_steps = [ | ||
"join", | ||
"internal_loop", | ||
"agg_model_mean", | ||
"collab_model_update", | ||
"local_model_mean", | ||
"start", | ||
"end", | ||
] | ||
|
||
steps_present_in_cli, missing_steps_in_cli, extra_steps_in_cli = wf_helper.validate_flow( | ||
flflow, expected_flow_steps | ||
) | ||
|
||
assert len(steps_present_in_cli) == len(expected_flow_steps), "Number of steps fetched from Datastore through CLI do not match the Expected steps provided" | ||
assert len(missing_steps_in_cli) == 0, f"Following steps missing from Datastore: {missing_steps_in_cli}" | ||
assert len(extra_steps_in_cli) == 0, f"Following steps are extra in Datastore: {extra_steps_in_cli}" | ||
assert flflow.end_count == 1, "End function called more than one time" | ||
|
||
log.info("\n **** Summary of internal flow testing ****\n" | ||
"No issues found and below are the tests that ran successfully\n" | ||
"1. Number of training completed is equal to training rounds\n" | ||
"2. Cli steps and Expected steps are matching\n" | ||
payalcha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"3. Number of tasks are aligned with number of rounds and number of collaborators\n" | ||
"4. End function executed one time") | ||
log.info("***** Successfully ended test_internal_loop *****") | ||
|
||
|
||
@pytest.mark.parametrize("fx_local_federated_workflow", [("init_collaborator_private_attr_index", "int", None )], indirect=True) | ||
def test_reference_flow(request, fx_local_federated_workflow): | ||
log.info("***** Starting test_reference_flow *****") | ||
flflow = TestFlowReference(checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow.runtime | ||
for i in range(request.config.num_rounds): | ||
log.info(f"Starting round {i}...") | ||
flflow.run() | ||
log.info("***** Successfully ended test_reference_flow *****") | ||
|
||
|
||
def test_reference_include_flow(request, fx_local_federated_workflow): | ||
log.info("***** Starting test_reference_include_flow *****") | ||
flflow = TestFlowReferenceWithInclude(checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow.runtime | ||
for i in range(request.config.num_rounds): | ||
log.info(f"Starting round {i}...") | ||
flflow.run() | ||
log.info("***** Successfully ended test_reference_include_flow *****") | ||
|
||
|
||
def test_reference_exclude_flow(request, fx_local_federated_workflow): | ||
log.info("***** Starting test_reference_exclude_flow *****") | ||
flflow = TestFlowReferenceWithExclude(checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow.runtime | ||
for i in range(request.config.num_rounds): | ||
log.info(f"Starting round {i}...") | ||
flflow.run() | ||
log.info("***** Successfully ended test_reference_exclude_flow *****") | ||
|
||
|
||
@pytest.mark.parametrize("fx_local_federated_workflow", [("init_collaborator_private_attr_name", "str", None )], indirect=True) | ||
def test_subset_collaborators(request, fx_local_federated_workflow): | ||
log.info("***** Starting test_subset_collaborators *****") | ||
collaborators = fx_local_federated_workflow.collaborators | ||
|
||
random_ints = random.sample(range(1, len(collaborators) + 1), len(collaborators)) | ||
|
||
collaborators = fx_local_federated_workflow.runtime.collaborators | ||
for round_num in range(len(collaborators)): | ||
log.info(f"Starting round {round_num}...") | ||
|
||
if os.path.exists(".metaflow"): | ||
shutil.rmtree(".metaflow") | ||
|
||
flflow = TestFlowSubsetCollaborators(checkpoint=True, random_ints=random_ints) | ||
flflow.runtime = fx_local_federated_workflow.runtime | ||
flflow.run() | ||
subset_collaborators = flflow.subset_collaborators | ||
collaborators_ran = flflow.collaborators_ran | ||
random_ints = flflow.random_ints | ||
random_ints.remove(len(subset_collaborators)) | ||
|
||
step = Step( | ||
f"TestFlowSubsetCollaborators/{flflow._run_id}/" | ||
+ "test_valid_collaborators" | ||
) | ||
|
||
assert len(list(step)) == len(subset_collaborators), ( | ||
f"...Flow only ran for {len(list(step))} " | ||
+ f"instead of the {len(subset_collaborators)} expected " | ||
+ f"collaborators- Testcase Failed." | ||
) | ||
log.info( | ||
f"Found {len(list(step))} tasks for each of the " | ||
+ f"{len(subset_collaborators)} collaborators" | ||
) | ||
log.info(f'subset_collaborators = {subset_collaborators}') | ||
log.info(f'collaborators_ran = {collaborators_ran}') | ||
for collaborator_name in subset_collaborators: | ||
assert collaborator_name in collaborators_ran, ( | ||
f"...Flow did not execute for " | ||
+ f"collaborator {collaborator_name}" | ||
+ f" - Testcase Failed." | ||
) | ||
|
||
log.info( | ||
f"Testing FederatedFlow - Ending test for validating " | ||
+ f"the subset of collaborators.") | ||
log.info("***** Successfully ended test_subset_collaborators *****") | ||
|
||
|
||
def test_private_attr_wo_callable(request, fx_local_federated_workflow_prvt_attr): | ||
log.info("***** Starting test_private_attr_wo_callable *****") | ||
flflow = TestFlowPrivateAttributesWoCallable(checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow_prvt_attr.runtime | ||
for i in range(request.config.num_rounds): | ||
log.info(f"Starting round {i}...") | ||
flflow.run() | ||
log.info("***** Successfully ended test_private_attr_wo_callable *****") | ||
|
||
|
||
@pytest.mark.parametrize("fx_local_federated_workflow", [("init_collaborate_pvt_attr_np", "int", "init_agg_pvt_attr_np" )], indirect=True) | ||
def test_private_attributes(request, fx_local_federated_workflow): | ||
log.info("***** Starting test_private_attributes *****") | ||
flflow = TestFlowPrivateAttributes(checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow.runtime | ||
for i in range(request.config.num_rounds): | ||
log.info(f"Starting round {i}...") | ||
flflow.run() | ||
log.info("***** Successfully ended test_private_attributes *****") | ||
|
||
|
||
@pytest.mark.parametrize("fx_local_federated_workflow_prvt_attr", [("init_collaborate_pvt_attr_np", "int", "init_agg_pvt_attr_np" )], indirect=True) | ||
def test_private_attr_both(request, fx_local_federated_workflow_prvt_attr): | ||
log.info("***** Starting test_private_attr_both *****") | ||
flflow = TestFlowPrivateAttributesBoth(checkpoint=True) | ||
flflow.runtime = fx_local_federated_workflow_prvt_attr.runtime | ||
for i in range(5): | ||
log.info(f"Starting round {i}...") | ||
flflow.run() | ||
log.info("***** Successfully ended test_private_attr_both *****") |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of "Task Runner E2E tests" we can mention "Workflow Interface E2E Tests for LocalRuntime"