Skip to content

Commit

Permalink
added changes
Browse files Browse the repository at this point in the history
  • Loading branch information
rajithkrishnegowda committed Dec 11, 2024
1 parent d8dda08 commit 275fb37
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 120 deletions.
118 changes: 74 additions & 44 deletions openfl-workspace/torch_cnn_mnist/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pip install virtualenv
mkdir ~/openfl-quickstart
virtualenv ~/openfl-quickstart/venv
source ~/openfl-quickstart/venv/bin/activate
pip install openfl==1.6
pip install openfl
"""
2. Creating the Workspace Folder

Expand Down Expand Up @@ -49,9 +49,78 @@ taskrunner
- taskrunner.py: The core task runner module that manages the execution of federated learning tasks.

## Defining the Data Loader
The data loader in OpenFL is responsible for batching and iterating through the dataset that will be used for local training and validation on each collaborator node. The TemplateDataLoader class in src/dataloader.py is designed to be a starting template for creating a data loader that is tailored to the FL experiment’s data format requirements.
The data loader in OpenFL is responsible for batching and iterating through the dataset that will be used for local training and validation on each collaborator node. The PyTorchMNISTInMemory class is designed to handle the MNIST dataset, ensuring it is properly loaded and preprocessed for federated learning experiments.

To customize the TemplateDataLoader, we just need to implement the load_dataset() function to process the dataset available at data_path on the local file system. The data_path parameter comes from the data.yaml configuration file, which is populated when the collaborator’s identity is created via fx collaborator create.
To customize the PyTorchMNISTInMemory class, you need to implement the load_mnist_shard() function to process the dataset available at data_path on the local file system. The data_path parameter represents the data shard number used by the collaborator. This setup allows each collaborator to work with a specific subset of the data, facilitating distributed training.

The load_mnist_shard() function is responsible for loading the MNIST dataset, dividing it into training and validation sets, and applying necessary transformations. The data is then batched and made ready for the training process.

# Modify the dataloader to support "bring your own data"
You can either try to implement the placeholders by yourself, or get the solution from [dataloader.py](https://github.com/securefederatedai/openfl-contrib/blob/main/openfl_contrib_tutorials/ml_to_fl/federated/src/dataloader.py)

```
import numpy as np
from typing import Iterator, Tuple
from openfl.federated import PyTorchTaskRunner
from openfl.utilities import Metric
import torch.optim as optim
import torch.nn.functional as F
from src.cnn_model import DigitRecognizerCNN, train_epoch, validate
class TemplateDataLoader(PyTorchDataLoader):
def __init__(self, data_path, batch_size, **kwargs):
super().__init__(batch_size, **kwargs)
# Load the dataset using the provided data_path and any additional kwargs.
X_train, y_train, X_valid, y_valid = load_dataset(data_path, **kwargs)
# Assign the loaded data to instance variables.
self.X_train = X_train
self.y_train = y_train
self.X_valid = X_valid
self.y_valid = y_valid
def load_dataset(data_path, train_split_ratio=0.8, **kwargs):
dataset = MNISTDataset(
root=data_path,
transform=Compose([Grayscale(num_output_channels=1), ToTensor()])
)
n_train = int(train_split_ratio * len(dataset))
n_valid = len(dataset) - n_train
ds_train, ds_val = random_split(
dataset, lengths=[n_train, n_valid], generator=manual_seed(0))
X_train, y_train = list(zip(*ds_train))
X_train, y_train = np.stack(X_train), np.array(y_train)
X_valid, y_valid = list(zip(*ds_val))
X_valid, y_valid = np.stack(X_valid), np.array(y_valid)
return X_train, y_train, X_valid, y_valid
class MNISTDataset(ImageFolder):
"""Encapsulates the MNIST dataset"""
FOLDER_NAME = "mnist_images"
DEFAULT_PATH = path.join(path.expanduser('~'), '.openfl', 'data')
def __init__(self, root: str = DEFAULT_PATH, **kwargs) -> None:
"""Initialize."""
makedirs(root, exist_ok=True)
super(MNISTDataset, self).__init__(
path.join(root, MNISTDataset.FOLDER_NAME), **kwargs)
def __getitem__(self, index):
"""Allow getting items by slice index."""
if isinstance(index, Iterable):
return [super(MNISTDataset, self).__getitem__(i) for i in index]
else:
return super(MNISTDataset, self).__getitem__(index)
```

## Defining the Task Runner
The Task Runner class defines the actual computational tasks of the FL experiment (such as local training and validation). We can implement the placeholders of the TemplateTaskRunner class (src/taskrunner.py) by importing the DigitRecognizerCNN model, as well as the train_epoch() and validate() helper functions from the centralized ML script. The template also provides placeholders for providing custom optimizer and loss function objects.
Expand All @@ -66,45 +135,6 @@ mkdir save
fx plan initialize --input_shape [1,28,28] --aggregator_address localhost
```

The pre-sharded dataset can be downloaded from [mnist_data_shards.tar.gz](https://github.com/securefederatedai/openfl-contrib/blob/main/openfl_contrib_tutorials/ml_to_fl/federated/mnist_data_shards.tar.gz). Copy the dataset bundle to the root of the FL workspace and unpack it:

```
cp mnist_data_shards.tar.gz ~/openfl/openfl-tutorials/taskrunner/
cd ~/openfl/openfl-tutorials/taskrunner/
tar -xvf mnist_data_shards.tar.gz
rm mnist_data_shards.tar.gz
```

This will populate the data folder of the FL workspace with two shards (data/1 and data/2) of labeled MNIST images of digits (the 0–9 labels being encoded in the sub-folder names). Note that in a real-world federation each of the collaborator nodes would only hold one shard, given the decentralized nature of Federated Learning. To facilitate the local testing of the FL workspace, both shards are made available under the local data/ folder:

```
data
├── 1
└── mnist_images
└── 0
└── 1
└── 2
└── 3
└── 4
└── 5
└── 6
└── 7
└── 8
└── 9
├── 2
└── mnist_images
└── 0
└── 1
└── 2
└── 3
└── 4
└── 5
└── 6
└── 7
└── 8
└── 9
```

We can now perform a test run with the following commands for creating a local PKI setup and starting the aggregator and the collaborators on the same machine:

```
Expand All @@ -129,7 +159,7 @@ fx aggregator certify --fqdn localhost --silent
# Create a collaborator named "collaborator1" that will use data path "data/1"
# This command adds the collaborator1,data/1 entry in data.yaml
fx collaborator create -n collaborator1 -d data/1
fx collaborator create -n collaborator1 -d 1
# Generate a CSR for collaborator1
fx collaborator generate-cert-request -n collaborator1
Expand All @@ -143,7 +173,7 @@ fx collaborator certify -n collaborator1 --silent
# Create a collaborator named "collaborator2" that will use data path "data/2"
# This command adds the collaborator2,data/2 entry in data.yaml
fx collaborator create -n collaborator2 -d data/2
fx collaborator create -n collaborator2 -d 2
# Generate a CSR for collaborator2
fx collaborator generate-cert-request -n collaborator2
Expand Down
3 changes: 2 additions & 1 deletion openfl-workspace/torch_cnn_mnist/plan/plan.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ compression_pipeline:
data_loader:
settings:
batch_size: 64
template: src.dataloader.TemplateDataLoader
collaborator_count: 2
template: src.dataloader.PyTorchMNISTInMemory
network:
settings:
agg_addr: localhost
Expand Down
179 changes: 104 additions & 75 deletions openfl-workspace/torch_cnn_mnist/src/dataloader.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,43 @@
# Copyright (C) 2024 Intel Corporation
# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.
# Copyright (C) 2020-2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

from os import path, makedirs
from collections.abc import Iterable
"""You may copy this file as the starting point of your own model."""

from openfl.federated import PyTorchDataLoader
from torchvision import datasets
from torchvision import transforms
import numpy as np
from torch import manual_seed
from torch.utils.data import random_split
from torchvision.transforms import ToTensor, Grayscale, Compose
from torchvision.datasets import ImageFolder
from logging import getLogger

from openfl.federated import PyTorchDataLoader
logger = getLogger(__name__)

class TemplateDataLoader(PyTorchDataLoader):
"""Template dataloader for PyTorch.
This class should be used as a template to create a custom DataLoader for your specific dataset.
After generating this template, you should:
1. Implement the `load_dataset` function to load your data.
2. Modify the `plan.yaml` file to use this DataLoader.
The `plan.yaml` modifications should be done under the `<workspace>/plan/plan.yaml` section:
```
data_loader:
defaults : plan/defaults/data_loader.yaml
template: src.dataloader.TemplateDataLoader # Modify this line appropriately if you change the class name
settings:
# Add additional arguments (such as batch_size) that you wish to pass through `def __init__():`
# You do not need to pass in data_path here. It will be set by the collaborators
```
`batch_size` is passed to the `super().`__init__` method to ensure that the superclass is properly initialized with the specified batch size.
After calling `super().__init__`, define `self.X_train`, `self.y_train`, `self.X_valid`, and `self.y_valid`.
"""

class PyTorchMNISTInMemory(PyTorchDataLoader):
"""PyTorch data loader for MNIST dataset."""

def __init__(self, data_path, batch_size, **kwargs):
"""Initialize the data loader.
"""Instantiate the data object.
Args:
data_path: The file path to the data at the respective collaborator site.
batch_size: The batch size of the data loader.
**kwargs: Additional arguments that may be defined in `plan.yaml`
data_path: The file path to the data
batch_size: The batch size of the data loader
**kwargs: Additional arguments, passed to super
init and load_mnist_shard
"""
super().__init__(batch_size, **kwargs)

# Load the dataset using the provided data_path and any additional kwargs.
X_train, y_train, X_valid, y_valid = load_dataset(data_path, **kwargs)

# Assign the loaded data to instance variables.
try:
int(data_path)
except:
raise ValueError(
"Expected `%s` to be representable as `int`, as it refers to the data shard " +
"number used by the collaborator.",
data_path
)

num_classes, X_train, y_train, X_valid, y_valid = load_mnist_shard(
shard_num=int(data_path), **kwargs
)
self.X_train = X_train
self.y_train = y_train
self.train_loader = self.get_train_loader()
Expand All @@ -52,58 +46,93 @@ def __init__(self, data_path, batch_size, **kwargs):
self.y_valid = y_valid
self.val_loader = self.get_valid_loader()

def load_dataset(data_path, train_split_ratio=0.8, **kwargs):
self.num_classes = num_classes


def load_mnist_shard(
shard_num, collaborator_count, categorical=False, channels_last=True, **kwargs
):
"""
Load your dataset here.
This function should be implemented to load the dataset from the given `data_path`.
You can use additional arguments passed via `**kwargs` if necessary.
Load the MNIST dataset.
Args:
data_path (str): Path to the data directory.
**kwargs: Additional arguments that may be defined in `plan.yaml`
shard_num (int): The shard to use from the dataset
collaborator_count (int): The number of collaborators in the
federation
categorical (bool): True = convert the labels to one-hot encoded
vectors (Default = True)
channels_last (bool): True = The input images have the channels
last (Default = True)
**kwargs: Additional parameters to pass to the function
Returns:
Tuple containing:
- numpy.ndarray: The training data.
- numpy.ndarray: The training labels.
- numpy.ndarray: The validation data.
- numpy.ndarray: The validation labels.
list: The input shape
int: The number of classes
numpy.ndarray: The training data
numpy.ndarray: The training labels
numpy.ndarray: The validation data
numpy.ndarray: The validation labels
"""
# Implement dataset loading logic here and return the appropriate data.
# Replace the following placeholders with actual data loading code.
dataset = MNISTDataset(
root=data_path,
transform=Compose([Grayscale(num_output_channels=1), ToTensor()])
num_classes = 10

(X_train, y_train), (X_valid, y_valid) = _load_raw_datashards(
shard_num, collaborator_count, transform=transforms.ToTensor()
)
n_train = int(train_split_ratio * len(dataset))
n_valid = len(dataset) - n_train

ds_train, ds_val = random_split(
dataset, lengths=[n_train, n_valid], generator=manual_seed(0))
logger.info(f"MNIST > X_train Shape : {X_train.shape}")
logger.info(f"MNIST > y_train Shape : {y_train.shape}")
logger.info(f"MNIST > Train Samples : {X_train.shape[0]}")
logger.info(f"MNIST > Valid Samples : {X_valid.shape[0]}")

X_train, y_train = list(zip(*ds_train))
if categorical:
# convert class vectors to binary class matrices
y_train = one_hot(y_train, num_classes)
y_valid = one_hot(y_valid, num_classes)

X_train, y_train = np.stack(X_train), np.array(y_train)
return num_classes, X_train, y_train, X_valid, y_valid

X_valid, y_valid = list(zip(*ds_val))
X_valid, y_valid = np.stack(X_valid), np.array(y_valid)

return X_train, y_train, X_valid, y_valid
def one_hot(labels, classes):
"""
One Hot encode a vector.
class MNISTDataset(ImageFolder):
"""Encapsulates the MNIST dataset"""
Args:
labels (list): List of labels to onehot encode
classes (int): Total number of categorical classes
FOLDER_NAME = "mnist_images"
DEFAULT_PATH = path.join(path.expanduser('~'), '.openfl', 'data')
Returns:
np.array: Matrix of one-hot encoded labels
"""
return np.eye(classes)[labels]


def _load_raw_datashards(shard_num, collaborator_count, transform=None):
"""
Load the raw data by shard.
Returns tuples of the dataset shard divided into training and validation.
Args:
shard_num (int): The shard number to use
collaborator_count (int): The number of collaborators in the federation
transform: torchvision.transforms.Transform to apply to images
Returns:
2 tuples: (image, label) of the training, validation dataset
"""
train_data, val_data = (
datasets.MNIST("data", train=train, download=True, transform=transform)
for train in (True, False)
)
X_train_tot, y_train_tot = train_data.train_data, train_data.train_labels
X_valid_tot, y_valid_tot = val_data.test_data, val_data.test_labels

def __init__(self, root: str = DEFAULT_PATH, **kwargs) -> None:
"""Initialize."""
makedirs(root, exist_ok=True)
# create the shards
shard_num = int(shard_num)
X_train = X_train_tot[shard_num::collaborator_count].unsqueeze(1).float()
y_train = y_train_tot[shard_num::collaborator_count]

super(MNISTDataset, self).__init__(
path.join(root, MNISTDataset.FOLDER_NAME), **kwargs)
X_valid = X_valid_tot[shard_num::collaborator_count].unsqueeze(1).float()
y_valid = y_valid_tot[shard_num::collaborator_count]

def __getitem__(self, index):
"""Allow getting items by slice index."""
if isinstance(index, Iterable):
return [super(MNISTDataset, self).__getitem__(i) for i in index]
else:
return super(MNISTDataset, self).__getitem__(index)
return (X_train, y_train), (X_valid, y_valid)

0 comments on commit 275fb37

Please sign in to comment.