Skip to content

Commit

Permalink
add /docs/workflow_interface.rst
Browse files Browse the repository at this point in the history
Signed-off-by: kta-intel <[email protected]>
  • Loading branch information
kta-intel committed Feb 29, 2024
1 parent 02b2540 commit 6d6893e
Show file tree
Hide file tree
Showing 2 changed files with 527 additions and 47 deletions.
193 changes: 146 additions & 47 deletions docs/about/features_index/workflowinterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Workflow Interface

**Important Note**

The OpenFL workflow interface is experimental, subject to change, and is currently limited to single node execution. To setup and launch a real federation, see :ref:`running_a_federation`
The OpenFL workflow interface is experimental, subject to change, and is currently limited to single node execution. To setup and launch a real federation, see :doc:`running_the_federation`

What is it?
===========
Expand Down Expand Up @@ -146,30 +146,60 @@ A :code:`Runtime` defines where the flow will be executed, who the participants

.. code-block:: python
# Setup participants
aggregator = Aggregator()
aggregator.private_attributes = {}
# Setup collaborators with private attributes
collaborator_names = ['Portland', 'Seattle', 'Chandler','Bangalore']
collaborators = [Collaborator(name=name) for name in collaborator_names]
for idx, collaborator in enumerate(collaborators):
local_train = deepcopy(mnist_train)
local_test = deepcopy(mnist_test)
local_train.data = mnist_train.data[idx::len(collaborators)]
local_train.targets = mnist_train.targets[idx::len(collaborators)]
local_test.data = mnist_test.data[idx::len(collaborators)]
local_test.targets = mnist_test.targets[idx::len(collaborators)]
collaborator.private_attributes = {
'train_loader': torch.utils.data.DataLoader(local_train,batch_size=batch_size_train, shuffle=True),
'test_loader': torch.utils.data.DataLoader(local_test,batch_size=batch_size_train, shuffle=True)
# Aggregator
aggregator_ = Aggregator()
collaborator_names = ["Portland", "Seattle", "Chandler", "Bangalore"]
def callable_to_initialize_collaborator_private_attributes(index, n_collaborators, batch_size, train_dataset, test_dataset):
train = deepcopy(train_dataset)
test = deepcopy(test_dataset)
train.data = train_dataset.data[index::n_collaborators]
train.targets = train_dataset.targets[index::n_collaborators]
test.data = test_dataset.data[index::n_collaborators]
test.targets = test_dataset.targets[index::n_collaborators]
return {
"train_loader": torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True),
"test_loader": torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=True),
}
# This is equivalent to:
# local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='single_process')
local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators)
Let's break this down, starting with the :code:`Aggregator` and :code:`Collaborator` placeholders. These placeholders represent the nodes where tasks will be executed. Each participant placeholder has its own set of :code:`private_attributes`; a dictionary where the key is the name of the attribute, and the value is the object. In the above example, each of the four collaborators ('Portland', 'Seattle', 'Chandler', and 'Bangalore'), have a :code:`train_loader` and `test_loader` that they can access. These private attributes can be named anything, and do not necessarily need to be the same across each participant.
# Setup collaborators private attributes via callable function
collaborators = []
for idx, collaborator_name in enumerate(collaborator_names):
collaborators.append(
Collaborator(
name=collaborator_name,
private_attributes_callable=callable_to_initialize_collaborator_private_attributes,
index=idx,
n_collaborators=len(collaborator_names),
train_dataset=mnist_train,
test_dataset=mnist_test,
batch_size=64
)
)
local_runtime = LocalRuntime(aggregator=aggregator_, collaborators=collaborators)
Let's break this down, starting with the :code:`Aggregator` and :code:`Collaborator` components. These components represent the *Participants* in a Federated Learning experiment. Each participant has its own set of *private attributes* that represent the information / data specific to its role or requirements. As the name suggests these *private attributes* are accessible only to the particular participant, and are appropriately inserted into or filtered out of current Flow state when transferring from between Participants. For e.g. Collaborator private attributes are inserted into :code:`flow` when transitioning from Aggregator to Collaborator and are filtered out when transitioning from Collaborator to Aggregator.

In the above :code:`FederatedFlow`, each collaborator accesses train and test datasets via *private attributes* :code:`train_loader` and :code:`test_loader`. These *private attributes* need to be set using a (user defined) callback function while instantiating the participant. Participant *private attributes* are returned by the callback function in form of a dictionary, where the key is the name of the attribute and the value is the object.

In this example callback function :code:`callable_to_initialize_collaborator_private_attributes()` returns the collaborator private attributes :code:`train_loader` and :code:`test_loader` that are accessed by collaborator steps (:code:`aggregated_model_validation`, :code:`train` and :code:`local_model_validation`). Some important points to remember while creating callback function and private attributes are:

- Callback Function needs to be defined by the user and should return the *private attributes* required by the participant in form of a key/value pair
- In above example multiple collaborators have the same callback function. Depending on the Federated Learning requirements, user can specify unique callback functions for each Participant
- If no Callback Function is specified then the Participant shall not have any *private attributes*
- Callback function can be provided with any parameters required as arguments. In this example, parameters essential for the callback function are supplied with corresponding values bearing *same names* during the instantiation of the Collaborator

* :code:`index`: Index of the particular collaborator needed to shard the dataset
* :code:`n_collaborators`: Total number of collaborators in which the dataset is sharded
* :code:`batch_size`: For the train and test loaders
* :code:`train_dataset`: Train Dataset to be sharded between n_collaborators
* :code:`test_dataset`: Test Dataset to be sharded between n_collaborators

- Callback function needs to be specified by user while instantiating the participant. Callback function is invoked by the OpenFL runtime at the time participant is created and once created these attributes cannot be modified
- Private attributes are accessible only in the Participant steps

Now let's see how the runtime for a flow is assigned, and the flow gets run:

Expand All @@ -184,23 +214,43 @@ And that's it! This will run an instance of the :code:`FederatedFlow` on a singl
Runtime Backends
================

The Runtime defines where code will run, but the Runtime has a :code:`Backend` - which defines the underlying implementation of *how* the flow will be executed. :code:`'single_process'` is the default in the :code:`LocalRuntime`: it executes all code sequentially within a single python process, and is well suited to run both on high spec and low spec hardware. For users with large servers or multiple GPUs they wish to take advantage of, we also provide a `Ray <https://github.com/ray-project/ray>` backend. The Ray backend enables parallel task execution for collaborators, and optionally allows users to request dedicated GPUs for collaborator tasks in the placement decorator, as follows:
The Runtime defines where code will run, but the Runtime has a :code:`Backend` - which defines the underlying implementation of *how* the flow will be executed. :code:`single_process` is the default in the :code:`LocalRuntime`: it executes all code sequentially within a single python process, and is well suited to run both on high spec and low spec hardware

For users with large servers or multiple GPUs they wish to take advantage of, we also provide a :code:`ray` `<https://github.com/ray-project/ray>` backend. The Ray backend enables parallel task execution for collaborators, and optionally allows users to request dedicated CPU / GPUs for Participants by using the :code:`num_cpus` and :code:`num_gpus` arguments while instantiating the Participant in following manner:

.. code-block:: python
ExampleDedicatedGPUFlow(FLSpec):
...
# We request one dedicated GPU for this task
@collaborator(num_gpus=1)
def training(self):
print(f'CUDA_VISIBLE_DEVICES: {os.environ["CUDA_VISIBLE_DEVICES"]}'))
self.loss = train_func(self.model, self.train_loader)
self.next(self.validation)
...
# Aggregator
aggregator_ = Aggregator(num_gpus=0.2)
collaborator_names = ["Portland", "Seattle", "Chandler", "Bangalore"]
def callable_to_initialize_collaborator_private_attributes(index, n_collaborators, batch_size, train_dataset, test_dataset):
...
# Setup collaborators private attributes via callable function
collaborators = []
for idx, collaborator_name in enumerate(collaborator_names):
collaborators.append(
Collaborator(
name=collaborator_name,
num_gpus=0.2, # Number of the GPU allocated to Participant
private_attributes_callable=callable_to_initialize_collaborator_private_attributes,
index=idx,
n_collaborators=len(collaborator_names),
train_dataset=mnist_train,
test_dataset=mnist_test,
batch_size=64
)
)
# The Ray Backend will now be used for local execution
local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='ray')
In the above example, we have used :code:`num_gpus=0.2` while instantiating Aggregator and Collaborator to specify that each participant shall use 1/5th of GPU - this results in one GPU being dedicated for a total of 4 collaborators and 1 Aggregator. Users can tune these arguments based on their Federated Learning requirements and available hardware resources. Configurations where one Participant is shared across GPUs is not supported. For e.g. trying to run 5 participants on 2 GPU hardware with :code:`num_gpus=0.4` will not work since 80% of each GPU is allocated to 4 participants and 5th participant does not have any available GPU remaining for use.

**Note:** It is not necessary to have ALL the participants use GPUs. For e.g. only the Collaborator are allocated to GPUs. In this scenario user should ensure that the artifacts returned by Collaborators to Aggregator (e.g. locally trained model object) should be loaded back to CPU before exiting the collaborator step (i.e. before the join step). As Tensorflow manages the object allocation by default therefore this step is needed only for Pytorch.

Debugging with the Metaflow Client
==================================

Expand All @@ -218,19 +268,38 @@ After the flow has started running, you can use the Metaflow Client to get inter

.. code-block:: python
from metaflow import Flow, Run, Task, Step
from metaflow import Metaflow, Flow, Step, Task
# Initialize Metaflow object and obtain list of executed flows:
m = Metaflow()
list(m)
> [Flow('FederatedFlow'), Flow('AggregatorValidationFlow'), Flow('FederatedFlow_MNIST_Watermarking')]
# The name of the flow is the name of the class
flow = Flow('FederatedFlow')
run = flow.latest_run
# Identify the Flow name
flow_name = 'FederatedFlow'
# List all instances of Federatedflow executed under distinct run IDs
flow = Flow(flow_name)
list(flow)
> [Run('FederatedFlow/1692946840822001'),
Run('FederatedFlow/1692946796234386'),
Run('FederatedFlow/1692902602941163'),
Run('FederatedFlow/1692902559123920'),]
# To Retrieve the latest run of the Federatedflow
run = Flow(flow_name).latest_run
print(run)
> Run('FederatedFlow/1692946840822001')
list(run)
> [Step('FederatedFlow/1671152854447797/end'),
Step('FederatedFlow/1671152854447797/join'),
Step('FederatedFlow/1671152854447797/local_model_validation'),
Step('FederatedFlow/1671152854447797/train'),
Step('FederatedFlow/1671152854447797/aggregated_model_validation'),
Step('FederatedFlow/1671152854447797/start')]
step = Step('FederatedFlow/1671152854447797/aggregated_model_validation')
> [Step('FederatedFlow/1692946840822001/end'),
Step('FederatedFlow/1692946840822001/join'),
Step('FederatedFlow/1692946840822001/local_model_validation'),
Step('FederatedFlow/1692946840822001/train'),
Step('FederatedFlow/1692946840822001/aggregated_model_validation'),
Step('FederatedFlow/1692946840822001/start')]
step = Step('FederatedFlow/1692946840822001/aggregated_model_validation')
for task in step:
if task.data.input == 'Portland':
print(task.data)
Expand Down Expand Up @@ -260,6 +329,37 @@ And if we wanted to get log or error message for that task, you can just run:
print(portland_task.stderr)
> [No output]
Also, If we wanted to get the best model and the last model, you can just run:

.. code-block:: python
# Choose the specific step containing the desired models (e.g., 'join' step):
step = Step('FederatedFlow/1692946840822001/join')
list(step)
> [Task('FederatedFlow/1692946840822001/join/12'),--> Round 3
Task('FederatedFlow/1692946840822001/join/9'), --> Round 2
Task('FederatedFlow/1692946840822001/join/6'), --> Round 1
Task('FederatedFlow/1692946840822001/join/3')] --> Round 0
"""The sequence of tasks represents each round, with the most recent task corresponding to the final round and the preceding tasks indicating the previous rounds
in chronological order.
To determine the best model, analyze the command line logs and model accuracy for each round. Then, provide the corresponding task ID associated with that Task"""
task = Task('FederatedFlow/1692946840822001/join/9')
# Access the best model and its associated data
best_model = task.data.model
best_local_model_accuracy = task.data.local_model_accuracy
best_aggregated_model_accuracy = t.data.aggregated_model_accuracy
# To retrieve the last model, select the most recent Task i.e last round.
task = Task('FederatedFlow/1692946840822001/join/12')
last_model = task.data.model
# Save the chosen models using a suitable framework (e.g., PyTorch in this example):
import torch
torch.save(last_model.state_dict(), PATH)
torch.save(best_model.state_dict(), PATH)
While this information is useful for debugging, depending on your workflow it may require significant disk space. For this reason, `checkpoint` is disabled by default.

Runtimes: Future Plans
Expand All @@ -278,5 +378,4 @@ Our goal is to make it a one line change to configure where and how a flow is ex
# A future example of how the same flow could be run on distributed infrastructure
federated_runtime = FederatedRuntime(...)
flow.runtime = federated_runtime
flow.run()
flow.run()
Loading

0 comments on commit 6d6893e

Please sign in to comment.