Skip to content

Commit

Permalink
addressing various comments, fixing typos, code snippets, etc
Browse files Browse the repository at this point in the history
Signed-off-by: kta-intel <[email protected]>
  • Loading branch information
kta-intel committed Feb 29, 2024
1 parent 2008a4e commit 02b2540
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 70 deletions.
15 changes: 15 additions & 0 deletions docs/developer_guide/experimental_features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,18 @@ Experimental features are *not* ready for production. These features are under a

1. *Backward compatibility is not guaranteed* - Our goal is to maintain backward compatibility whenever possible, but user feedback (and our own internal research)
may result in necessary changes to the APIs.

**Workflow Interface**

Learn how to:
- Chain a series of tasks that run on aggregator or collaborator.
- Filter out information that should stay local
- Use Metaflow tools to analyze and debug experiments

- :doc:`../about/features_index/workflowinterface`

.. toctree::
:maxdepth: 4
:hidden:

workflow_interface
2 changes: 1 addition & 1 deletion docs/developer_guide/manual.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Customize the federation:

Get familiar with the APIs:

- :doc:`running_the_federation.tutorial`
- `Open Federated Learning (OpenFL) Tutorials <https://github.com/securefederatedai/openfl/tree/develop/openfl-tutorials>`_

Explore new and experimental features:

Expand Down
2 changes: 1 addition & 1 deletion docs/get_started/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ See :ref:`interactive_tensorflow_mnist`
examples/interactive_tensorflow_mnist

-------------------------
Workflow Interface
Workflow Interface (Experimental)
-------------------------
Formulate the experiment as a series of tasks, or a flow.

Expand Down
150 changes: 91 additions & 59 deletions docs/get_started/examples/workflowinterface_pytorch_mnist.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ This tutorial introduces the API to get up and running with your first horizonta
See `full notebook <https://github.com/securefederatedai/openfl/blob/f1657abe88632d542504d6d71ca961de9333913f/openfl-tutorials/experimental/Workflow_Interface_101_MNIST.ipynb>`_.

**What is it?**
The workflow interface is a new way of composing federated learning expermients with ||productName|.
The workflow interface is a new way of composing federated learning experiments with |productName|.
It was borne through conversations with researchers and existing users who had novel use cases that didn't quite fit the standard horizontal federated learning paradigm.

**Getting Started**
Expand All @@ -30,6 +30,8 @@ First we start by installing the necessary dependencies for the workflow interfa
pip install git+https://github.com/intel/openfl.git
pip install -r requirements_workflow_interface.txt
pip install torch
pip install torchvision
We begin with the quintessential example of a small pytorch CNN model trained on the MNIST dataset.
Let's start define our dataloaders, model, optimizer, and some helper functions like we would for any other deep learning experiment
Expand All @@ -54,19 +56,29 @@ Let's start define our dataloaders, model, optimizer, and some helper functions
torch.backends.cudnn.enabled = False
torch.manual_seed(random_seed)
mnist_train = torchvision.datasets.MNIST('files/', train=True, download=True,
transform=torchvision.transforms.Compose([
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize(
(0.1307,), (0.3081,))
]))
mnist_test = torchvision.datasets.MNIST('files/', train=False, download=True,
transform=torchvision.transforms.Compose([
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize(
(0.1307,), (0.3081,))
]))
mnist_train = torchvision.datasets.MNIST(
"./files/",
train=True,
download=True,
transform=torchvision.transforms.Compose(
[
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize((0.1307,), (0.3081,)),
]
),
)
mnist_test = torchvision.datasets.MNIST(
"./files/",
train=False,
download=True,
transform=torchvision.transforms.Compose(
[
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize((0.1307,), (0.3081,)),
]
),
)
class Net(nn.Module):
def __init__(self):
Expand Down Expand Up @@ -114,6 +126,7 @@ Next we import the FLSpec, LocalRuntime, and placement decorators.
.. code-block:: python
from copy import deepcopy
from openfl.experimental.interface import FLSpec, Aggregator, Collaborator
from openfl.experimental.runtime import LocalRuntime
from openfl.experimental.placement import aggregator, collaborator
Expand Down Expand Up @@ -145,15 +158,15 @@ function on the aggregator to the `aggregated_model_validation` task on the coll
class FederatedFlow(FLSpec):
def __init__(self, model = None, optimizer = None, rounds=3, **kwargs):
def __init__(self, model=None, optimizer=None, rounds=3, **kwargs):
super().__init__(**kwargs)
if model is not None:
self.model = model
self.optimizer = optimizer
else:
self.model = Net()
self.optimizer = optim.SGD(self.model.parameters(), lr=learning_rate,
momentum=momentum)
momentum=momentum)
self.rounds = rounds
@aggregator
Expand All @@ -162,12 +175,12 @@ function on the aggregator to the `aggregated_model_validation` task on the coll
self.collaborators = self.runtime.collaborators
self.private = 10
self.current_round = 0
self.next(self.aggregated_model_validation,foreach='collaborators',exclude=['private'])
self.next(self.aggregated_model_validation, foreach='collaborators', exclude=['private'])
@collaborator
def aggregated_model_validation(self):
print(f'Performing aggregated model validation for collaborator {self.input}')
self.agg_validation_score = inference(self.model,self.test_loader)
self.agg_validation_score = inference(self.model, self.test_loader)
print(f'{self.input} value of {self.agg_validation_score}')
self.next(self.train)
Expand All @@ -178,46 +191,50 @@ function on the aggregator to the `aggregated_model_validation` task on the coll
momentum=momentum)
train_losses = []
for batch_idx, (data, target) in enumerate(self.train_loader):
self.optimizer.zero_grad()
output = self.model(data)
loss = F.nll_loss(output, target)
loss.backward()
self.optimizer.step()
if batch_idx % log_interval == 0:
print('Train Epoch: 1 [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
batch_idx * len(data), len(self.train_loader.dataset),
100. * batch_idx / len(self.train_loader), loss.item()))
self.loss = loss.item()
torch.save(self.model.state_dict(), 'model.pth')
torch.save(self.optimizer.state_dict(), 'optimizer.pth')
self.optimizer.zero_grad()
output = self.model(data)
loss = F.nll_loss(output, target)
loss.backward()
self.optimizer.step()
if batch_idx % log_interval == 0:
print('Train Epoch: 1 [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
batch_idx * len(data), len(self.train_loader.dataset),
100. * batch_idx / len(self.train_loader), loss.item()))
self.loss = loss.item()
torch.save(self.model.state_dict(), 'model.pth')
torch.save(self.optimizer.state_dict(), 'optimizer.pth')
self.training_completed = True
self.next(self.local_model_validation)
@collaborator
def local_model_validation(self):
self.local_validation_score = inference(self.model,self.test_loader)
print(f'Doing local model validation for collaborator {self.input}: {self.local_validation_score}')
self.local_validation_score = inference(self.model, self.test_loader)
print(
f'Doing local model validation for collaborator {self.input}: {self.local_validation_score}')
self.next(self.join, exclude=['training_completed'])
@aggregator
def join(self,inputs):
self.average_loss = sum(input.loss for input in inputs)/len(inputs)
self.aggregated_model_accuracy = sum(input.agg_validation_score for input in inputs)/len(inputs)
self.local_model_accuracy = sum(input.local_validation_score for input in inputs)/len(inputs)
def join(self, inputs):
self.average_loss = sum(input.loss for input in inputs) / len(inputs)
self.aggregated_model_accuracy = sum(
input.agg_validation_score for input in inputs) / len(inputs)
self.local_model_accuracy = sum(
input.local_validation_score for input in inputs) / len(inputs)
print(f'Average aggregated model validation values = {self.aggregated_model_accuracy}')
print(f'Average training loss = {self.average_loss}')
print(f'Average local model validation values = {self.local_model_accuracy}')
self.model = FedAvg([input.model for input in inputs])
self.optimizer = [input.optimizer for input in inputs][0]
self.current_round += 1
if self.current_round < self.rounds:
self.next(self.aggregated_model_validation, foreach='collaborators', exclude=['private'])
self.next(self.aggregated_model_validation,
foreach='collaborators', exclude=['private'])
else:
self.next(self.end)
@aggregator
def end(self):
print(f'This is the end of the flow')
print(f'This is the end of the flow')
You'll notice in the `FederatedFlow` definition above that there were certain attributes that the flow was not initialized with, namely the `train_loader` and `test_loader` for each of the collaborators. These are **private_attributes** that are exposed only throught he runtime. Each participant has it's own set of private attributes: a dictionary where the key is the attribute name, and the value is the object that will be made accessible through that participant's task.
Expand All @@ -227,26 +244,41 @@ Below, we segment shards of the MNIST dataset for **four collaborators**: Portla

.. code-block:: python
# Setup participants
aggregator = Aggregator()
aggregator.private_attributes = {}
# Setup collaborators with private attributes
collaborator_names = ['Portland', 'Seattle', 'Chandler','Bangalore']
collaborators = [Collaborator(name=name) for name in collaborator_names]
for idx, collaborator in enumerate(collaborators):
local_train = deepcopy(mnist_train)
local_test = deepcopy(mnist_test)
local_train.data = mnist_train.data[idx::len(collaborators)]
local_train.targets = mnist_train.targets[idx::len(collaborators)]
local_test.data = mnist_test.data[idx::len(collaborators)]
local_test.targets = mnist_test.targets[idx::len(collaborators)]
collaborator.private_attributes = {
'train_loader': torch.utils.data.DataLoader(local_train,batch_size=batch_size_train, shuffle=True),
'test_loader': torch.utils.data.DataLoader(local_test,batch_size=batch_size_train, shuffle=True)
# Aggregator
aggregator_ = Aggregator()
collaborator_names = ["Portland", "Seattle", "Chandler", "Bangalore"]
def callable_to_initialize_collaborator_private_attributes(index, n_collaborators, batch_size, train_dataset, test_dataset):
train = deepcopy(train_dataset)
test = deepcopy(test_dataset)
train.data = train_dataset.data[index::n_collaborators]
train.targets = train_dataset.targets[index::n_collaborators]
test.data = test_dataset.data[index::n_collaborators]
test.targets = test_dataset.targets[index::n_collaborators]
return {
"train_loader": torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True),
"test_loader": torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=True),
}
local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='single_process')
# Setup collaborators private attributes via callable function
collaborators = []
for idx, collaborator_name in enumerate(collaborator_names):
collaborators.append(
Collaborator(
name=collaborator_name,
private_attributes_callable=callable_to_initialize_collaborator_private_attributes,
index=idx,
n_collaborators=len(collaborator_names),
train_dataset=mnist_train,
test_dataset=mnist_test,
batch_size=64
)
)
local_runtime = LocalRuntime(aggregator=aggregator_, collaborators=collaborators,
backend="ray")
print(f'Local runtime collaborators = {local_runtime.collaborators}')
Now that we have our flow and runtime defined, let's run the experiment!
Expand All @@ -256,7 +288,7 @@ Now that we have our flow and runtime defined, let's run the experiment!
model = None
best_model = None
optimizer = None
flflow = FederatedFlow(model,optimizer)
flflow = FederatedFlow(model, optimizer, checkpoint=True)
flflow.runtime = local_runtime
flflow.run()
Expand All @@ -275,7 +307,7 @@ Let's make a tweak to the flow object, and run the experiment one more time (we

.. code-block:: python
flflow2 = FederatedFlow(model=flflow.model,optimizer=flflow.optimizer,checkpoint=True)
flflow2 = FederatedFlow(model=flflow.model, optimizer=flflow.optimizer, checkpoint=True)
flflow2.runtime = local_runtime
flflow2.run()
Expand Down
8 changes: 0 additions & 8 deletions docs/launch_page.sh

This file was deleted.

2 changes: 1 addition & 1 deletion docs/requirements-docs.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2020-2023 Intel Corporation
# Copyright (C) 2020-2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
sphinx-rtd-theme
sphinx-prompt
Expand Down

0 comments on commit 02b2540

Please sign in to comment.