From caf1148b0c4c555153f18e2d80cca8e7dd6731a9 Mon Sep 17 00:00:00 2001 From: Olga Perepelkina Date: Thu, 30 Mar 2023 11:08:26 +0200 Subject: [PATCH 1/7] Update GOVERNANCE.md --- GOVERNANCE.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/GOVERNANCE.md b/GOVERNANCE.md index f4a976aea96..981582c8199 100644 --- a/GOVERNANCE.md +++ b/GOVERNANCE.md @@ -137,5 +137,10 @@ Except as described below, all contributions to the OpenFL Project are subject t - All outbound code will be made available under the Project License. - Documentation will be received and made available by the OpenFL Project under the Creative Commons Attribution 4.0 International License (available at http://creativecommons.org/licenses/by/4.0/). +The OpenFL Project may seek to integrate and contribute back to other open source projects (“Upstream Projects”). In such cases, the OpenFL Project will conform to all license requirements of the Upstream Projects, including dependencies, leveraged by the OpenFL Project. Upstream Project code contributions not stored within the OpenFL Project’s main code repository will comply with the contribution process and license terms for the applicable Upstream Project. + +The TSC may approve the use of an alternative license or licenses for inbound or outbound contributions on an exception basis. To request an exception, please describe the contribution, the alternative open source license(s), and the justification for using an alternative open source license for the Project. License exceptions must be approved by a two-thirds vote of the entire TSC. + +Contributed files should contain license information, such as SPDX short form identifiers, indicating the open source license or licenses pertaining to the file. From 888a1d288870cc544958b97a4737196d6283dcc6 Mon Sep 17 00:00:00 2001 From: Joe Devon <138038+joedevon@users.noreply.github.com> Date: Mon, 3 Apr 2023 08:36:26 -0700 Subject: [PATCH 2/7] Update ROADMAP.md (#785) Typos --- ROADMAP.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ROADMAP.md b/ROADMAP.md index ba05f3bf1a4..d476ec07321 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -50,7 +50,7 @@ single experiment, or experiment session (with the director / envoy infrastructu ### 1.5 Component standardization and framework interoperability -Federated Learning is a [burgoening space](https://github.com/weimingwill/awesome-federated-learning#frameworks). +Federated Learning is a [burgeoning space](https://github.com/weimingwill/awesome-federated-learning#frameworks). Most core FL infrastructure (model weight extraction, network protocols, and serialization designs) must be reimplemented ad hoc by each framework. This causes community fragmentation and distracts from some of the bigger problems to be solved in federated learning. In the short term, we want to collaborate on standards for FL, first at the communication and storage layer, and make these components modular across other frameworks. Our aim is also to provide a library for FL algorithms, compression methods, @@ -63,9 +63,9 @@ This causes community fragmentation and distracts from some of the bigger proble 2. New use cases enabled by custom workflows * Standard ML Models (i.e. Tree-based algorithms) 3. Federated evaluation documentation and examples -4. Well defined aggregator / collaborator interfaces indended for building higher level projects on top of OpenFL +4. Well defined aggregator / collaborator interfaces intended for building higher level projects on top of OpenFL 5. Significantly improved documentation -6. New OpenFL Security Repo for that extends OpenFL to provide governance, and end-to-end security for federated learning experiments +6. New OpenFL Security Repo that extends OpenFL to provide governance, and end-to-end security for federated learning experiments ### OpenFL 2.0 (2023) 1. Interface Cohesion From 38cef2ba5b1c40aa66e0e11a262be55e492b10a8 Mon Sep 17 00:00:00 2001 From: Sarthak Pati Date: Mon, 3 Apr 2023 13:29:40 -0400 Subject: [PATCH 3/7] Updated integrations to GaNDLF (#781) * renaming loader and runner Signed-off-by: sarthakpati * updated plan to pick the new names Signed-off-by: sarthakpati * new key name Signed-off-by: sarthakpati * allow the ability to pass a file to `gandlf_config_dict` in addition to fully-fledged parameters Signed-off-by: sarthakpati * checking this differently Signed-off-by: sarthakpati * rename variable for clarity Signed-off-by: sarthakpati --------- Signed-off-by: sarthakpati --- .../fets_challenge_seg_test/plan/plan.yaml | 6 +++--- ...{loader_fets_challenge.py => loader_gandlf.py} | 4 ++-- ...{runner_fets_challenge.py => runner_gandlf.py} | 15 +++++++++++---- 3 files changed, 16 insertions(+), 9 deletions(-) rename openfl/federated/data/{loader_fets_challenge.py => loader_gandlf.py} (92%) rename openfl/federated/task/{runner_fets_challenge.py => runner_gandlf.py} (98%) diff --git a/openfl-workspace/fets_challenge_seg_test/plan/plan.yaml b/openfl-workspace/fets_challenge_seg_test/plan/plan.yaml index 33dab735edf..e804a344609 100644 --- a/openfl-workspace/fets_challenge_seg_test/plan/plan.yaml +++ b/openfl-workspace/fets_challenge_seg_test/plan/plan.yaml @@ -21,17 +21,17 @@ collaborator : data_loader : defaults : plan/defaults/data_loader.yaml - template : openfl.federated.data.loader_fets_challenge.FeTSChallengeDataLoaderWrapper + template : openfl.federated.data.loader_gandlf.GaNDLFDataLoaderWrapper settings : feature_shape : [32, 32, 32] task_runner : - template : openfl.federated.task.runner_fets_challenge.FeTSChallengeTaskRunner + template : openfl.federated.task.runner_gandlf.GaNDLFTaskRunner settings : train_csv : seg_test_train.csv val_csv : seg_test_val.csv device : cpu - fets_config_dict : + gandlf_config : batch_size: 1 clip_grad: null clip_mode: null diff --git a/openfl/federated/data/loader_fets_challenge.py b/openfl/federated/data/loader_gandlf.py similarity index 92% rename from openfl/federated/data/loader_fets_challenge.py rename to openfl/federated/data/loader_gandlf.py index 678a84769ad..911211e0411 100644 --- a/openfl/federated/data/loader_fets_challenge.py +++ b/openfl/federated/data/loader_gandlf.py @@ -5,8 +5,8 @@ from .loader import DataLoader -class FeTSChallengeDataLoaderWrapper(DataLoader): - """Federation Data Loader for FeTS Challenge Model.""" +class GaNDLFDataLoaderWrapper(DataLoader): + """Data Loader for the Generally Nuanced Deep Learning Framework (GaNDLF).""" def __init__(self, data_path, feature_shape): self.train_dataloader = None diff --git a/openfl/federated/task/runner_fets_challenge.py b/openfl/federated/task/runner_gandlf.py similarity index 98% rename from openfl/federated/task/runner_fets_challenge.py rename to openfl/federated/task/runner_gandlf.py index f041240a1c0..ca4685eb2e2 100644 --- a/openfl/federated/task/runner_fets_challenge.py +++ b/openfl/federated/task/runner_gandlf.py @@ -6,7 +6,10 @@ from copy import deepcopy import numpy as np +import os import torch as pt +from typing import Union +import yaml from openfl.utilities import split_tensor_dict_for_holdouts from openfl.utilities import TensorKey @@ -18,14 +21,14 @@ from GANDLF.compute.forward_pass import validate_network -class FeTSChallengeTaskRunner(TaskRunner): - """FeTSChallenge Model class for Federated Learning.""" +class GaNDLFTaskRunner(TaskRunner): + """GaNDLF Model class for Federated Learning.""" def __init__( self, train_csv: str = None, val_csv: str = None, - fets_config_dict: dict = None, + gandlf_config: Union[str, dict] = None, device: str = None, **kwargs ): @@ -36,6 +39,10 @@ def __init__( """ super().__init__(**kwargs) + # allow pass-through of a gandlf config as a file or a dict + if isinstance(gandlf_config, str) and os.path.exists(gandlf_config): + gandlf_config = yaml.safe_load(open(gandlf_config, "r")) + ( model, optimizer, @@ -44,7 +51,7 @@ def __init__( scheduler, params, ) = create_pytorch_objects( - fets_config_dict, train_csv=train_csv, val_csv=val_csv, device=device + gandlf_config, train_csv=train_csv, val_csv=val_csv, device=device ) self.model = model self.optimizer = optimizer From 4084d797d4784374b393eaea3b6a9c4d06ce5634 Mon Sep 17 00:00:00 2001 From: Prashant Shah <40899779+SprashAI@users.noreply.github.com> Date: Tue, 18 Apr 2023 09:58:08 -0700 Subject: [PATCH 4/7] Update README.md Removed references to Intel's ownship, given it's now owned by the LF AI and Data. --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 620c82ae1e3..fb5116fe012 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/intel/openfl/blob/develop/openfl-tutorials/interactive_api/numpy_linear_regression/workspace/SingleNotebook.ipynb) [![CII Best Practices](https://bestpractices.coreinfrastructure.org/projects/6599/badge)](https://bestpractices.coreinfrastructure.org/projects/6599) -OpenFL is a Python 3 framework for Federated Learning. OpenFL is designed to be a _flexible_, _extensible_ and _easily learnable_ tool for data scientists. OpenFL is hosted by Intel, aims to be community-driven, and welcomes contributions back to the project. +OpenFL is a Python 3 framework for Federated Learning. OpenFL is designed to be a _flexible_, _extensible_ and _easily learnable_ tool for data scientists. OpenFL is hosted by The Linux Foundation, aims to be community-driven, and welcomes contributions back to the project. ## Installation @@ -64,7 +64,7 @@ OpenFL builds on a collaboration between Intel and the University of Pennsylvani The grant for FeTS was awarded to the [Center for Biomedical Image Computing and Analytics (CBICA)](https://www.cbica.upenn.edu/) at UPenn (PI: S. Bakas) from the [Informatics Technology for Cancer Research (ITCR)](https://itcr.cancer.gov/) program of the National Cancer Institute (NCI) of the National Institutes of Health (NIH). FeTS is a real-world medical federated learning platform with international collaborators. The original OpenFederatedLearning project and OpenFL are designed to serve as the backend for the FeTS platform, -and OpenFL developers and researchers continue to work very closely with UPenn on the FeTS project. An example is the [FeTS-AI/Front-End](https://github.com/FETS-AI/Front-End), which integrates UPenn’s medical AI expertise with Intel’s framework to create a federated learning solution for medical imaging. +and OpenFL developers and researchers continue to work very closely with UPenn on the FeTS project. An example is the [FeTS-AI/Front-End](https://github.com/FETS-AI/Front-End), which integrates UPenn’s medical AI expertise with OpenFL framework to create a federated learning solution for medical imaging. Although initially developed for use in medical imaging, OpenFL designed to be agnostic to the use-case, the industry, and the machine learning framework. @@ -87,7 +87,7 @@ You can find more details in the following articles: ## Support Please join us for our bi-monthly community meetings starting December 1 & 2, 2022!
-Meet with some of the Intel team members behind OpenFL.
+Meet with some of the OpenFL team members behind OpenFL.
We will be going over our roadmap, open for Q&A, and welcome idea sharing.
Calendar and links to a Community calls are [here](https://wiki.lfaidata.foundation/pages/viewpage.action?pageId=70648254) From 60911a6ed0573665fd583296ce0b70a01eeba085 Mon Sep 17 00:00:00 2001 From: akantak Date: Thu, 20 Apr 2023 21:15:24 +0200 Subject: [PATCH 5/7] Fix Flake8 C419 for Ubuntu CI (#800) C419 Unnecessary list comprehension passed to any()/all() prevents short-circuiting - rewrite as a generator Signed-off-by: Aleksander Kantak --- openfl/component/aggregator/aggregator.py | 4 ++-- openfl/pipelines/pipeline.py | 2 +- openfl/utilities/data_splitters/numpy.py | 2 +- openfl/utilities/utils.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index 1e165a01011..3f19444d987 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -933,9 +933,9 @@ def _is_round_done(self): """Check that round is done.""" tasks_for_round = self.assigner.get_all_tasks_for_round(self.round_number) - return all([ + return all( self._is_task_done( - task_name) for task_name in tasks_for_round]) + task_name) for task_name in tasks_for_round) def _log_big_warning(self): """Warn user about single collaborator cert mode.""" diff --git a/openfl/pipelines/pipeline.py b/openfl/pipelines/pipeline.py index 03ab1ae8704..a5a6479914b 100644 --- a/openfl/pipelines/pipeline.py +++ b/openfl/pipelines/pipeline.py @@ -154,4 +154,4 @@ def backward(self, data, transformer_metadata, **kwargs): def is_lossy(self): """If any of the transformers are lossy, then the pipeline is lossy.""" - return any([transformer.lossy for transformer in self.transformers]) + return any(transformer.lossy for transformer in self.transformers) diff --git a/openfl/utilities/data_splitters/numpy.py b/openfl/utilities/data_splitters/numpy.py index 9c78fa06add..6d8cf22fc99 100644 --- a/openfl/utilities/data_splitters/numpy.py +++ b/openfl/utilities/data_splitters/numpy.py @@ -143,7 +143,7 @@ def split(self, data, num_collaborators): slice_end = slice_start + self.min_samples_per_class print(f'Assigning {slice_start}:{slice_end} of class {label} to {col} col...') idx[col] += list(label_idx[slice_start:slice_end]) - if any([len(i) != samples_per_col for i in idx]): + if any(len(i) != samples_per_col for i in idx): raise SystemError(f'''All collaborators should have {samples_per_col} elements but distribution is {[len(i) for i in idx]}''') diff --git a/openfl/utilities/utils.py b/openfl/utilities/utils.py index 23602c7882c..ab3b498e023 100644 --- a/openfl/utilities/utils.py +++ b/openfl/utilities/utils.py @@ -132,7 +132,7 @@ def split_tensor_dict_by_types(tensor_dict, keep_types): keep_dict = {} holdout_dict = {} for k, v in tensor_dict.items(): - if any([np.issubdtype(v.dtype, type_) for type_ in keep_types]): + if any(np.issubdtype(v.dtype, type_) for type_ in keep_types): keep_dict[k] = v else: holdout_dict[k] = v From 8943064e29b85200532b07abb54621af179a057f Mon Sep 17 00:00:00 2001 From: Keerti Talwar Date: Tue, 9 May 2023 15:36:39 +0530 Subject: [PATCH 6/7] fixed flake-8 errors --- ...rkflow_Interface_Mnist_Implementation_1.py | 35 +++++--- ...rkflow_Interface_Mnist_Implementation_2.py | 35 +++++--- .../experimental/Privacy_Meter/cifar10_PM.py | 33 +++---- openfl/experimental/interface/fl_spec.py | 3 +- openfl/experimental/interface/participants.py | 17 ++-- openfl/experimental/runtime/local_runtime.py | 89 ++++++++++++------- openfl/experimental/utilities/__init__.py | 8 +- openfl/experimental/utilities/exceptions.py | 1 + openfl/experimental/utilities/resources.py | 1 - .../experimental/utilities/runtime_utils.py | 7 +- .../testflow_privateattributes.py | 7 +- .../github/experimental/testflow_reference.py | 6 +- .../testflow_reference_with_exclude.py | 2 +- .../testflow_reference_with_include.py | 2 +- 14 files changed, 146 insertions(+), 100 deletions(-) diff --git a/openfl-tutorials/experimental/Global_DP/Workflow_Interface_Mnist_Implementation_1.py b/openfl-tutorials/experimental/Global_DP/Workflow_Interface_Mnist_Implementation_1.py index eaa4e1bfc7a..eb9dfd1fc00 100644 --- a/openfl-tutorials/experimental/Global_DP/Workflow_Interface_Mnist_Implementation_1.py +++ b/openfl-tutorials/experimental/Global_DP/Workflow_Interface_Mnist_Implementation_1.py @@ -39,7 +39,7 @@ # Fixing the seed for result repeatation: remove below to stop repeatable runs # ---------------------------------- random_seed = 5495300300540669060 -g_device = torch.Generator(device='cuda') +g_device = torch.Generator(device="cuda") # Uncomment the line below to use g_cpu if not using cuda # g_device = torch.Generator() # noqa: E800 # NOTE: remove below to stop repeatable runs @@ -601,7 +601,6 @@ def end(self): if __name__ == "__main__": - argparser = argparse.ArgumentParser(description=__doc__) argparser.add_argument( "--config_path", help="Absolute path to the flow configuration file" @@ -615,9 +614,7 @@ def end(self): args = argparser.parse_args() if torch.cuda.is_available(): - device = torch.device( - "cuda:0" - ) + device = torch.device("cuda:0") else: device = torch.device("cpu") @@ -638,8 +635,9 @@ def end(self): "Guadalajara", ] - def callable_to_initialize_collaborator_private_attributes(index, n_collaborators, - batch_size, train_dataset, test_dataset): + def callable_to_initialize_collaborator_private_attributes( + index, n_collaborators, batch_size, train_dataset, test_dataset + ): train = deepcopy(train_dataset) test = deepcopy(test_dataset) train.data = train_dataset.data[index::n_collaborators] @@ -648,8 +646,12 @@ def callable_to_initialize_collaborator_private_attributes(index, n_collaborator test.targets = test_dataset.targets[index::n_collaborators] return { - "train_loader": torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True), - "test_loader": torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=True), + "train_loader": torch.utils.data.DataLoader( + train, batch_size=batch_size, shuffle=True + ), + "test_loader": torch.utils.data.DataLoader( + test, batch_size=batch_size, shuffle=True + ), } collaborators = [] @@ -659,14 +661,19 @@ def callable_to_initialize_collaborator_private_attributes(index, n_collaborator name=collaborator_name, private_attributes_callable=callable_to_initialize_collaborator_private_attributes, # Set `num_gpus=0.1` to `num_gpus=0.0` in order to run this tutorial on CPU - num_cpus=0.0, num_gpus=0.1, # Assuming GPU(s) is available in the machine - index=idx, n_collaborators=len(collaborator_names), - batch_size=batch_size_train, train_dataset=mnist_train, - test_dataset=mnist_test + num_cpus=0.0, + num_gpus=0.1, # Assuming GPU(s) is available in the machine + index=idx, + n_collaborators=len(collaborator_names), + batch_size=batch_size_train, + train_dataset=mnist_train, + test_dataset=mnist_test, ) ) - local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend="ray") + local_runtime = LocalRuntime( + aggregator=aggregator, collaborators=collaborators, backend="ray" + ) print(f"Local runtime collaborators = {local_runtime.collaborators}") top_model_accuracy = 0 diff --git a/openfl-tutorials/experimental/Global_DP/Workflow_Interface_Mnist_Implementation_2.py b/openfl-tutorials/experimental/Global_DP/Workflow_Interface_Mnist_Implementation_2.py index 0a73ee923fd..8d4effa142e 100644 --- a/openfl-tutorials/experimental/Global_DP/Workflow_Interface_Mnist_Implementation_2.py +++ b/openfl-tutorials/experimental/Global_DP/Workflow_Interface_Mnist_Implementation_2.py @@ -35,7 +35,7 @@ random_seed = 5495300300540669060 -g_device = torch.Generator(device='cuda') +g_device = torch.Generator(device="cuda") # Uncomment the line below to use g_cpu if not using cuda # g_device = torch.Generator() # noqa: E800 # NOTE: remove below to stop repeatable runs @@ -580,7 +580,6 @@ def end(self): if __name__ == "__main__": - argparser = argparse.ArgumentParser(description=__doc__) argparser.add_argument( "--config_path", help="Absolute path to the flow configuration file." @@ -594,9 +593,7 @@ def end(self): args = argparser.parse_args() if torch.cuda.is_available(): - device = torch.device( - "cuda:0" - ) + device = torch.device("cuda:0") else: device = torch.device("cpu") @@ -617,8 +614,9 @@ def end(self): "Guadalajara", ] - def callable_to_initialize_collaborator_private_attributes(index, n_collaborators, - batch_size, train_dataset, test_dataset): + def callable_to_initialize_collaborator_private_attributes( + index, n_collaborators, batch_size, train_dataset, test_dataset + ): train = deepcopy(train_dataset) test = deepcopy(test_dataset) train.data = train_dataset.data[index::n_collaborators] @@ -627,8 +625,12 @@ def callable_to_initialize_collaborator_private_attributes(index, n_collaborator test.targets = test_dataset.targets[index::n_collaborators] return { - "train_loader": torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True), - "test_loader": torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=True), + "train_loader": torch.utils.data.DataLoader( + train, batch_size=batch_size, shuffle=True + ), + "test_loader": torch.utils.data.DataLoader( + test, batch_size=batch_size, shuffle=True + ), } collaborators = [] @@ -638,14 +640,19 @@ def callable_to_initialize_collaborator_private_attributes(index, n_collaborator name=collaborator_name, private_attributes_callable=callable_to_initialize_collaborator_private_attributes, # Set `num_gpus=0.1` to `num_gpus=0.0` in order to run this tutorial on CPU - num_cpus=0.0, num_gpus=0.1, # Assuming GPU(s) is available in the machine - index=idx, n_collaborators=len(collaborator_names), - batch_size=batch_size_train, train_dataset=mnist_train, - test_dataset=mnist_test + num_cpus=0.0, + num_gpus=0.1, # Assuming GPU(s) is available in the machine + index=idx, + n_collaborators=len(collaborator_names), + batch_size=batch_size_train, + train_dataset=mnist_train, + test_dataset=mnist_test, ) ) - local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend="ray") + local_runtime = LocalRuntime( + aggregator=aggregator, collaborators=collaborators, backend="ray" + ) print(f"Local runtime collaborators = {local_runtime.collaborators}") best_model = None diff --git a/openfl-tutorials/experimental/Privacy_Meter/cifar10_PM.py b/openfl-tutorials/experimental/Privacy_Meter/cifar10_PM.py index cc400182009..f9d9cf3b23a 100644 --- a/openfl-tutorials/experimental/Privacy_Meter/cifar10_PM.py +++ b/openfl-tutorials/experimental/Privacy_Meter/cifar10_PM.py @@ -146,7 +146,6 @@ def inference(network, test_loader, device): def optimizer_to_device(optimizer, device): - """ Sending the "torch.optim.Optimizer" object into the specified device for model training and inference @@ -581,7 +580,6 @@ def end(self): if __name__ == "__main__": - argparser = argparse.ArgumentParser(description=__doc__) argparser.add_argument( "--audit_dataset_ratio", @@ -663,9 +661,7 @@ def end(self): collaborator_names = ["Portland", "Seattle"] if torch.cuda.is_available(): - device = torch.device( - "cuda:0" - ) + device = torch.device("cuda:0") else: device = torch.device("cpu") @@ -693,9 +689,9 @@ def end(self): train_dataset.targets = Y[:train_dataset_size] test_dataset = deepcopy(cifar_test) - test_dataset.data = X[train_dataset_size:train_dataset_size + test_dataset_size] + test_dataset.data = X[train_dataset_size: train_dataset_size + test_dataset_size] test_dataset.targets = Y[ - train_dataset_size:train_dataset_size + test_dataset_size + train_dataset_size: train_dataset_size + test_dataset_size ] population_dataset = deepcopy(cifar_test) @@ -715,7 +711,8 @@ def end(self): # this function will be called before executing collaborator steps # which will return private attributes dictionary for each collaborator def callable_to_initialize_collaborator_private_attributes( - index, n_collaborators, train_ds, test_ds, population_ds, args): + index, n_collaborators, train_ds, test_ds, population_ds, args + ): # construct the training and test and population dataset local_train = deepcopy(train_ds) local_test = deepcopy(test_ds) @@ -773,7 +770,6 @@ def callable_to_initialize_collaborator_private_attributes( ), } - collaborators = [] for idx, collab_name in enumerate(collaborator_names): collaborators.append( @@ -781,16 +777,23 @@ def callable_to_initialize_collaborator_private_attributes( name=collab_name, private_attributes_callable=callable_to_initialize_collaborator_private_attributes, # If 1 GPU is available in the machine - # Set `num_gpus=0.0` to `num_gpus=0.5` to run on GPU with ray backend with 2 collaborators - num_cpus=0.0, num_gpus=0.0, - index=idx, n_collaborators=len(collaborator_names), - train_ds=train_dataset, test_ds=test_dataset, - population_ds=population_dataset, args=args + # Set `num_gpus=0.0` to `num_gpus=0.5` to run on GPU + # with ray backend with 2 collaborators + num_cpus=0.0, + num_gpus=0.0, + index=idx, + n_collaborators=len(collaborator_names), + train_ds=train_dataset, + test_ds=test_dataset, + population_ds=population_dataset, + args=args, ) ) # Set backend='ray' to use ray-backend - local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend="single_process") + local_runtime = LocalRuntime( + aggregator=aggregator, collaborators=collaborators, backend="single_process" + ) print(f"Local runtime collaborators = {local_runtime.collaborators}") diff --git a/openfl/experimental/interface/fl_spec.py b/openfl/experimental/interface/fl_spec.py index a342a2111f4..c87ce68cae3 100644 --- a/openfl/experimental/interface/fl_spec.py +++ b/openfl/experimental/interface/fl_spec.py @@ -84,7 +84,8 @@ def run(self) -> None: "\nLocalRuntime(...,backend='single_process')\n" "\n or for more information about the original error," "\nPlease see the official Ray documentation" - "\nhttps://docs.ray.io/en/releases-2.2.0/ray-core/objects/serialization.html" + "\nhttps://docs.ray.io/en/releases-2.2.0/ray-core/\ + objects/serialization.html" ) raise SerializationError(str(e) + msg) else: diff --git a/openfl/experimental/interface/participants.py b/openfl/experimental/interface/participants.py index bcedfb96364..5c17c7f76d8 100644 --- a/openfl/experimental/interface/participants.py +++ b/openfl/experimental/interface/participants.py @@ -45,20 +45,20 @@ class Collaborator(Participant): def __init__(self, name: str = "", private_attributes_callable: Callable = None, num_cpus: int = 0, num_gpus: int = 0.0, **kwargs): """ - Create collaborator object with custom resources and a callable + Create collaborator object with custom resources and a callable function to assign private attributes Parameters: name (str): Name of the collaborator. default="" private_attributes_callable (Callable): A function which returns collaborator - private attributes for each collaborator. In case private_attributes are not + private attributes for each collaborator. In case private_attributes are not required this can be omitted. default=None num_cpus (int): Specifies how many cores to use for the collaborator step exection. This will only be used if backend is set to ray. default=0 - num_gpus (float): Specifies how many GPUs to use to accerlerate the collaborator + num_gpus (float): Specifies how many GPUs to use to accerlerate the collaborator step exection. This will only be used if backend is set to ray. default=0 kwargs (dict): Parameters required to call private_attributes_callable function. @@ -84,7 +84,7 @@ def get_name(self) -> str: def initialize_private_attributes(self) -> None: """ - initialize private attributes of Collaborator object by invoking + initialize private attributes of Collaborator object by invoking the callable specified by user """ if self.private_attributes_callable is not None: @@ -92,7 +92,7 @@ def initialize_private_attributes(self) -> None: def __set_collaborator_attrs_to_clone(self, clone: Any) -> None: """ - Set collaborator private attributes to FLSpec clone before transitioning + Set collaborator private attributes to FLSpec clone before transitioning from Aggregator step to collaborator steps """ # set collaborator private attributes as @@ -102,10 +102,10 @@ def __set_collaborator_attrs_to_clone(self, clone: Any) -> None: def __delete_collab_attrs_from_clone(self, clone: Any) -> None: """ - Remove collaborator private attributes from FLSpec clone before + Remove collaborator private attributes from FLSpec clone before transitioning from Collaborator step to Aggregator step """ - # Update collaborator private attributes by taking latest + # Update collaborator private attributes by taking latest # parameters from clone, then delete attributes from clone. for attr_name in self.private_attributes: if hasattr(clone, attr_name): @@ -146,9 +146,8 @@ def __init__(self, name: str = "", private_attributes_callable: Callable = None, def initialize_private_attributes(self) -> None: """ - initialize private attributes of Aggregator object by invoking + initialize private attributes of Aggregator object by invoking the callable specified by user """ if self.private_attributes_callable is not None: self.private_attributes = self.private_attributes_callable(**self.kwargs) - diff --git a/openfl/experimental/runtime/local_runtime.py b/openfl/experimental/runtime/local_runtime.py index a8790942cca..d64510b9b88 100644 --- a/openfl/experimental/runtime/local_runtime.py +++ b/openfl/experimental/runtime/local_runtime.py @@ -9,9 +9,9 @@ import ray import os import gc -import numpy as np from openfl.experimental.runtime import Runtime from typing import TYPE_CHECKING + if TYPE_CHECKING: from openfl.experimental.interface import Aggregator, Collaborator, FLSpec @@ -33,10 +33,11 @@ def __init__(self): """Create RayExecutor object""" self.__remote_contexts = [] - def ray_call_put(self, collaborator: Collaborator, ctx: Any, - f_name: str, callback: Callable) -> None: + def ray_call_put( + self, collaborator: Collaborator, ctx: Any, f_name: str, callback: Callable + ) -> None: """ - Execute f_name from inside collaborator class with the context + Execute f_name from inside collaborator class with the context of clone (ctx) """ self.__remote_contexts.append( @@ -116,19 +117,27 @@ def __get_collaborator_object(self, collaborators: List) -> Any: total_available_cpus = os.cpu_count() total_available_gpus = get_number_of_gpus() - total_required_gpus = sum([collaborator.num_gpus for collaborator in collaborators]) - total_required_cpus = sum([collaborator.num_cpus for collaborator in collaborators]) + total_required_gpus = sum( + [collaborator.num_gpus for collaborator in collaborators] + ) + total_required_cpus = sum( + [collaborator.num_cpus for collaborator in collaborators] + ) if total_required_gpus > 0: - check_resource_allocation(total_available_gpus, {collab.get_name(): collab.num_gpus - for collab in collaborators}) + check_resource_allocation( + total_available_gpus, + {collab.get_name(): collab.num_gpus for collab in collaborators}, + ) if total_available_gpus < total_required_gpus: raise ResourcesNotAvailableError( - f"cannot assign more than available GPUs ({total_required_gpus} < {total_available_gpus})." - ) + f"cannot assign more than available GPUs \ + ({total_required_gpus} < {total_available_gpus})." + ) if total_available_cpus < total_required_cpus: raise ResourcesNotAvailableError( - f"cannot assign more than available CPUs ({total_required_cpus} < {total_available_cpus})." + f"cannot assign more than available CPUs \ + ({total_required_cpus} < {total_available_cpus})." ) interface_module = importlib.import_module("openfl.experimental.interface") collaborator_class = getattr(interface_module, "Collaborator") @@ -139,13 +148,16 @@ def __get_collaborator_object(self, collaborators: List) -> Any: num_gpus = collaborator.num_gpus collaborator_actor = ray.remote(collaborator_class).options( - num_cpus=num_cpus, num_gpus=num_gpus) + num_cpus=num_cpus, num_gpus=num_gpus + ) - collaborator_ray_refs.append(collaborator_actor.remote( - name=collaborator.get_name(), - private_attributes_callable=collaborator.private_attributes_callable, - **collaborator.kwargs - )) + collaborator_ray_refs.append( + collaborator_actor.remote( + name=collaborator.get_name(), + private_attributes_callable=collaborator.private_attributes_callable, + **collaborator.kwargs, + ) + ) return collaborator_ray_refs @property @@ -169,9 +181,14 @@ def collaborators(self) -> List[str]: def collaborators(self, collaborators: List[Type[Collaborator]]): """Set LocalRuntime collaborators""" if self.backend == "single_process": - get_collab_name = lambda collab: collab.get_name() + + def get_collab_name(collab): + return collab.get_name() + else: - get_collab_name = lambda collab: ray.get(collab.get_name.remote()) + + def get_collab_name(collab): + return ray.get(collab.get_name.remote()) self.__collaborators = { get_collab_name(collaborator): collaborator @@ -185,17 +202,20 @@ def initialize_aggregator(self): def initialize_collaborators(self): """initialize collaborator private attributes""" if self.backend == "single_process": - init_private_attrs = lambda collab: collab.initialize_private_attributes() + + def init_private_attrs(collab): + return collab.initialize_private_attributes() + else: - init_private_attrs = lambda collab: collab.initialize_private_attributes.remote() + + def init_private_attrs(collab): + return collab.initialize_private_attributes.remote() for collaborator in self.__collaborators.values(): init_private_attrs(collaborator) def restore_instance_snapshot( - self, - ctx: Type[FLSpec], - instance_snapshot: List[Type[FLSpec]] + self, ctx: Type[FLSpec], instance_snapshot: List[Type[FLSpec]] ): """Restores attributes from backup (in instance snapshot) to ctx""" for backup in instance_snapshot: @@ -206,7 +226,7 @@ def restore_instance_snapshot( def execute_collaborator_steps(self, ctx: Any, f_name: str): """ - Execute collaborator steps for each + Execute collaborator steps for each collaborator until at transition point """ not_at_transition_point = True @@ -279,8 +299,11 @@ def execute_end_task(self, flspec_obj, f): list: updated arguments to be executed """ - interface_module = importlib.import_module("openfl.experimental.interface") - final_attributes = getattr(interface_module, "final_attributes") + from openfl.experimental.interface import ( + final_attributes, + ) + + global final_attributes to_exec = getattr(flspec_obj, f.__name__) to_exec() @@ -325,9 +348,7 @@ def execute_foreach_task( clone = FLSpec._clones[col] if aggregator_to_collaborator(f, parent_func): for attr in self._aggregator.private_attributes: - self._aggregator.private_attributes[attr] = getattr( - clone, attr - ) + self._aggregator.private_attributes[attr] = getattr(clone, attr) if hasattr(clone, attr): delattr(clone, attr) @@ -350,9 +371,13 @@ def execute_foreach_task( collaborator = self.__collaborators[collab_name] if self.backend == "ray": - ray_executor.ray_call_put(collaborator, clone, f.__name__, self.execute_collaborator_steps) + ray_executor.ray_call_put( + collaborator, clone, f.__name__, self.execute_collaborator_steps + ) else: - collaborator.execute_func(clone, f.__name__, self.execute_collaborator_steps) + collaborator.execute_func( + clone, f.__name__, self.execute_collaborator_steps + ) if self.backend == "ray": clones = ray_executor.get_remote_clones() diff --git a/openfl/experimental/utilities/__init__.py b/openfl/experimental/utilities/__init__.py index e0b4d119c80..2272d1459a4 100644 --- a/openfl/experimental/utilities/__init__.py +++ b/openfl/experimental/utilities/__init__.py @@ -10,10 +10,10 @@ collaborator_to_aggregator, ) from .exceptions import ( - SerializationError, - ResourcesNotAvailableError, - ResourcesAllocationError, - ) + SerializationError, + ResourcesNotAvailableError, + ResourcesAllocationError, +) from .stream_redirect import ( RedirectStdStreamBuffer, RedirectStdStream, diff --git a/openfl/experimental/utilities/exceptions.py b/openfl/experimental/utilities/exceptions.py index e89773e09a6..12a307d271e 100644 --- a/openfl/experimental/utilities/exceptions.py +++ b/openfl/experimental/utilities/exceptions.py @@ -12,6 +12,7 @@ def __init__(self, *args: object) -> None: super().__init__(*args) pass + class ResourcesAllocationError(Exception): def __init__(self, *args: object) -> None: super().__init__(*args) diff --git a/openfl/experimental/utilities/resources.py b/openfl/experimental/utilities/resources.py index 125a2dbd9e4..126c6b0e175 100644 --- a/openfl/experimental/utilities/resources.py +++ b/openfl/experimental/utilities/resources.py @@ -9,4 +9,3 @@ def get_number_of_gpus(): # TODO remove pytorch dependency return device_count() - diff --git a/openfl/experimental/utilities/runtime_utils.py b/openfl/experimental/utilities/runtime_utils.py index 9ac420c4a9c..fa4ae989027 100644 --- a/openfl/experimental/utilities/runtime_utils.py +++ b/openfl/experimental/utilities/runtime_utils.py @@ -96,16 +96,17 @@ def checkpoint(ctx, parent_func, chkpnt_reserved_words=["next", "runtime"]): ) print(f"Saved data artifacts for {parent_func.__name__}") + def check_resource_allocation(num_gpus, each_collab_gpu_usage): remaining_gpu_memory = {} for gpu in np.ones(num_gpus, dtype=int): for i, (collab_name, collab_gpu_usage) in enumerate(each_collab_gpu_usage.items()): if gpu == 0: break - if gpu < collab_gpu_usage: # and collab_gpu_usage > 0: + if gpu < collab_gpu_usage: # and collab_gpu_usage > 0: remaining_gpu_memory.update({collab_name: gpu}) - print (f"each_collab_gpu_usage: {each_collab_gpu_usage}") - print (f"i: {i}") + print(f"each_collab_gpu_usage: {each_collab_gpu_usage}") + print(f"i: {i}") each_collab_gpu_usage = dict(itertools.islice(each_collab_gpu_usage.items(), i)) else: gpu -= collab_gpu_usage diff --git a/tests/github/experimental/testflow_privateattributes.py b/tests/github/experimental/testflow_privateattributes.py index 3b0d653523a..ae320f56f0c 100644 --- a/tests/github/experimental/testflow_privateattributes.py +++ b/tests/github/experimental/testflow_privateattributes.py @@ -105,6 +105,7 @@ def join(self, inputs): ) for input in enumerate(inputs): + collab = input[1].input if ( hasattr(input, "train_loader") is True or hasattr(input, "test_loader") is True @@ -114,7 +115,7 @@ def join(self, inputs): "join_collaborator_attributes_found" ) print( - f"{bcolors.FAIL} ... Attribute test failed in Join - COllaborator: {collab}" + f"{bcolors.FAIL} ... Attribute test failed in Join - Collaborator: {collab}" + f" private attributes accessible {bcolors.ENDC}" ) @@ -239,7 +240,9 @@ def callable_to_initialize_collaborator_private_attributes(index): if len(sys.argv) > 1 and sys.argv[1] == "ray": backend = "ray" - local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend=backend) + local_runtime = LocalRuntime( + aggregator=aggregator, collaborators=collaborators, backend=backend + ) print(f"Local runtime collaborators = {local_runtime.collaborators}") flflow = TestFlowPrivateAttributes(checkpoint=True) diff --git a/tests/github/experimental/testflow_reference.py b/tests/github/experimental/testflow_reference.py index c6ff2493122..16dc38914f8 100644 --- a/tests/github/experimental/testflow_reference.py +++ b/tests/github/experimental/testflow_reference.py @@ -242,7 +242,7 @@ def find_matched_references(collab_attr_list, all_collaborators): for attr_name in collab_attr_list: for i, curr_collab in enumerate(all_collaborators): # Compare the current collaborator with the collaborator(s) that come(s) after it. - for next_collab in all_collaborators[i + 1 :]: + for next_collab in all_collaborators[i + 1:]: # Check if both collaborators have the current attribute if hasattr(curr_collab, attr_name) and hasattr(next_collab, attr_name): # Check if both collaborators are sharing same reference @@ -336,13 +336,13 @@ def validate_agg_collab_references(all_collborators, agg_obj, agg_attrs): if __name__ == "__main__": # Setup participants aggregator = Aggregator() - + # Setup collaborators private attributes via callable function collaborator_names = ["Portland", "Seattle", "Chandler", "Bangalore"] def callable_to_initialize_collaborator_private_attributes(index): return {"index": index + 1} - + collaborators = [] for idx, collaborator_name in enumerate(collaborator_names): collaborators.append( diff --git a/tests/github/experimental/testflow_reference_with_exclude.py b/tests/github/experimental/testflow_reference_with_exclude.py index 04295c27599..0b5ffa93b75 100644 --- a/tests/github/experimental/testflow_reference_with_exclude.py +++ b/tests/github/experimental/testflow_reference_with_exclude.py @@ -206,7 +206,7 @@ def find_matched_references(collab_attr_list, all_collaborators): for attr_name in collab_attr_list: for i, curr_collab in enumerate(all_collaborators): # Compare the current collaborator with the collaborator(s) that come(s) after it. - for next_collab in all_collaborators[i + 1 :]: + for next_collab in all_collaborators[i + 1:]: # Check if both collaborators have the current attribute if hasattr(curr_collab, attr_name) and hasattr(next_collab, attr_name): # Check if both collaborators are sharing same reference diff --git a/tests/github/experimental/testflow_reference_with_include.py b/tests/github/experimental/testflow_reference_with_include.py index 5003c6752fd..954b20ae5a5 100644 --- a/tests/github/experimental/testflow_reference_with_include.py +++ b/tests/github/experimental/testflow_reference_with_include.py @@ -203,7 +203,7 @@ def find_matched_references(collab_attr_list, all_collaborators): for attr_name in collab_attr_list: for i, curr_collab in enumerate(all_collaborators): # Compare the current collaborator with the collaborator(s) that come(s) after it. - for next_collab in all_collaborators[i + 1 :]: + for next_collab in all_collaborators[i + 1:]: # Check if both collaborators have the current attribute if hasattr(curr_collab, attr_name) and hasattr(next_collab, attr_name): # Check if both collaborators are sharing same reference From a9b838f06dfeba451a93578af39a8fed86f355dd Mon Sep 17 00:00:00 2001 From: KeertiX Date: Wed, 10 May 2023 00:35:59 -0700 Subject: [PATCH 7/7] reverted import module code --- openfl/experimental/utilities/metaflow_utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/openfl/experimental/utilities/metaflow_utils.py b/openfl/experimental/utilities/metaflow_utils.py index 126a86ef0df..ba142ddd811 100644 --- a/openfl/experimental/utilities/metaflow_utils.py +++ b/openfl/experimental/utilities/metaflow_utils.py @@ -201,7 +201,8 @@ def __init__(self, flow): self._postprocess() def _create_nodes(self, flow): - tree = ast.parse(getsource(flow)).body + module = __import__(flow.__module__) + tree = ast.parse(getsource(module)).body root = [ n for n in tree