diff --git a/openfl-workspace/torch_cnn_mnist/README.md b/openfl-workspace/torch_cnn_mnist/README.md index e83b732fef..13b60a2361 100644 --- a/openfl-workspace/torch_cnn_mnist/README.md +++ b/openfl-workspace/torch_cnn_mnist/README.md @@ -6,7 +6,7 @@ pip install virtualenv mkdir ~/openfl-quickstart virtualenv ~/openfl-quickstart/venv source ~/openfl-quickstart/venv/bin/activate -pip install openfl==1.6 +pip install openfl """ 2. Creating the Workspace Folder @@ -49,9 +49,78 @@ taskrunner - taskrunner.py: The core task runner module that manages the execution of federated learning tasks. ## Defining the Data Loader -The data loader in OpenFL is responsible for batching and iterating through the dataset that will be used for local training and validation on each collaborator node. The TemplateDataLoader class in src/dataloader.py is designed to be a starting template for creating a data loader that is tailored to the FL experiment’s data format requirements. +The data loader in OpenFL is responsible for batching and iterating through the dataset that will be used for local training and validation on each collaborator node. The PyTorchMNISTInMemory class is designed to handle the MNIST dataset, ensuring it is properly loaded and preprocessed for federated learning experiments. -To customize the TemplateDataLoader, we just need to implement the load_dataset() function to process the dataset available at data_path on the local file system. The data_path parameter comes from the data.yaml configuration file, which is populated when the collaborator’s identity is created via fx collaborator create. +To customize the PyTorchMNISTInMemory class, you need to implement the load_mnist_shard() function to process the dataset available at data_path on the local file system. The data_path parameter represents the data shard number used by the collaborator. This setup allows each collaborator to work with a specific subset of the data, facilitating distributed training. + +The load_mnist_shard() function is responsible for loading the MNIST dataset, dividing it into training and validation sets, and applying necessary transformations. The data is then batched and made ready for the training process. + +# Modify the dataloader to support "bring your own data" +You can either try to implement the placeholders by yourself, or get the solution from [dataloader.py](https://github.com/securefederatedai/openfl-contrib/blob/main/openfl_contrib_tutorials/ml_to_fl/federated/src/dataloader.py) + +``` +import numpy as np +from typing import Iterator, Tuple +from openfl.federated import PyTorchTaskRunner +from openfl.utilities import Metric +import torch.optim as optim +import torch.nn.functional as F +from src.cnn_model import DigitRecognizerCNN, train_epoch, validate + +class TemplateDataLoader(PyTorchDataLoader): + + def __init__(self, data_path, batch_size, **kwargs): + super().__init__(batch_size, **kwargs) + + # Load the dataset using the provided data_path and any additional kwargs. + X_train, y_train, X_valid, y_valid = load_dataset(data_path, **kwargs) + + # Assign the loaded data to instance variables. + self.X_train = X_train + self.y_train = y_train + + self.X_valid = X_valid + self.y_valid = y_valid + +def load_dataset(data_path, train_split_ratio=0.8, **kwargs): + dataset = MNISTDataset( + root=data_path, + transform=Compose([Grayscale(num_output_channels=1), ToTensor()]) + ) + n_train = int(train_split_ratio * len(dataset)) + n_valid = len(dataset) - n_train + + ds_train, ds_val = random_split( + dataset, lengths=[n_train, n_valid], generator=manual_seed(0)) + + X_train, y_train = list(zip(*ds_train)) + X_train, y_train = np.stack(X_train), np.array(y_train) + + X_valid, y_valid = list(zip(*ds_val)) + X_valid, y_valid = np.stack(X_valid), np.array(y_valid) + + return X_train, y_train, X_valid, y_valid + +class MNISTDataset(ImageFolder): + """Encapsulates the MNIST dataset""" + + FOLDER_NAME = "mnist_images" + DEFAULT_PATH = path.join(path.expanduser('~'), '.openfl', 'data') + + def __init__(self, root: str = DEFAULT_PATH, **kwargs) -> None: + """Initialize.""" + makedirs(root, exist_ok=True) + + super(MNISTDataset, self).__init__( + path.join(root, MNISTDataset.FOLDER_NAME), **kwargs) + + def __getitem__(self, index): + """Allow getting items by slice index.""" + if isinstance(index, Iterable): + return [super(MNISTDataset, self).__getitem__(i) for i in index] + else: + return super(MNISTDataset, self).__getitem__(index) +``` ## Defining the Task Runner The Task Runner class defines the actual computational tasks of the FL experiment (such as local training and validation). We can implement the placeholders of the TemplateTaskRunner class (src/taskrunner.py) by importing the DigitRecognizerCNN model, as well as the train_epoch() and validate() helper functions from the centralized ML script. The template also provides placeholders for providing custom optimizer and loss function objects. @@ -66,45 +135,6 @@ mkdir save fx plan initialize --input_shape [1,28,28] --aggregator_address localhost ``` -The pre-sharded dataset can be downloaded from [mnist_data_shards.tar.gz](https://github.com/securefederatedai/openfl-contrib/blob/main/openfl_contrib_tutorials/ml_to_fl/federated/mnist_data_shards.tar.gz). Copy the dataset bundle to the root of the FL workspace and unpack it: - -``` -cp mnist_data_shards.tar.gz ~/openfl/openfl-tutorials/taskrunner/ -cd ~/openfl/openfl-tutorials/taskrunner/ -tar -xvf mnist_data_shards.tar.gz -rm mnist_data_shards.tar.gz -``` - -This will populate the data folder of the FL workspace with two shards (data/1 and data/2) of labeled MNIST images of digits (the 0–9 labels being encoded in the sub-folder names). Note that in a real-world federation each of the collaborator nodes would only hold one shard, given the decentralized nature of Federated Learning. To facilitate the local testing of the FL workspace, both shards are made available under the local data/ folder: - -``` -data -├── 1 - └── mnist_images - └── 0 - └── 1 - └── 2 - └── 3 - └── 4 - └── 5 - └── 6 - └── 7 - └── 8 - └── 9 -├── 2 - └── mnist_images - └── 0 - └── 1 - └── 2 - └── 3 - └── 4 - └── 5 - └── 6 - └── 7 - └── 8 - └── 9 -``` - We can now perform a test run with the following commands for creating a local PKI setup and starting the aggregator and the collaborators on the same machine: ``` @@ -129,7 +159,7 @@ fx aggregator certify --fqdn localhost --silent # Create a collaborator named "collaborator1" that will use data path "data/1" # This command adds the collaborator1,data/1 entry in data.yaml -fx collaborator create -n collaborator1 -d data/1 +fx collaborator create -n collaborator1 -d 1 # Generate a CSR for collaborator1 fx collaborator generate-cert-request -n collaborator1 @@ -143,7 +173,7 @@ fx collaborator certify -n collaborator1 --silent # Create a collaborator named "collaborator2" that will use data path "data/2" # This command adds the collaborator2,data/2 entry in data.yaml -fx collaborator create -n collaborator2 -d data/2 +fx collaborator create -n collaborator2 -d 2 # Generate a CSR for collaborator2 fx collaborator generate-cert-request -n collaborator2 diff --git a/openfl-workspace/torch_cnn_mnist/plan/plan.yaml b/openfl-workspace/torch_cnn_mnist/plan/plan.yaml index 26b7d47adb..03c7e879e0 100644 --- a/openfl-workspace/torch_cnn_mnist/plan/plan.yaml +++ b/openfl-workspace/torch_cnn_mnist/plan/plan.yaml @@ -32,7 +32,8 @@ compression_pipeline: data_loader: settings: batch_size: 64 - template: src.dataloader.TemplateDataLoader + collaborator_count: 2 + template: src.dataloader.PyTorchMNISTInMemory network: settings: agg_addr: localhost diff --git a/openfl-workspace/torch_cnn_mnist/src/dataloader.py b/openfl-workspace/torch_cnn_mnist/src/dataloader.py index 7957688c78..e0eb7844ce 100644 --- a/openfl-workspace/torch_cnn_mnist/src/dataloader.py +++ b/openfl-workspace/torch_cnn_mnist/src/dataloader.py @@ -1,49 +1,43 @@ -# Copyright (C) 2024 Intel Corporation -# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you. +# Copyright (C) 2020-2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 -from os import path, makedirs -from collections.abc import Iterable +"""You may copy this file as the starting point of your own model.""" +from openfl.federated import PyTorchDataLoader +from torchvision import datasets +from torchvision import transforms import numpy as np -from torch import manual_seed -from torch.utils.data import random_split -from torchvision.transforms import ToTensor, Grayscale, Compose -from torchvision.datasets import ImageFolder +from logging import getLogger -from openfl.federated import PyTorchDataLoader +logger = getLogger(__name__) -class TemplateDataLoader(PyTorchDataLoader): - """Template dataloader for PyTorch. - This class should be used as a template to create a custom DataLoader for your specific dataset. - After generating this template, you should: - 1. Implement the `load_dataset` function to load your data. - 2. Modify the `plan.yaml` file to use this DataLoader. - The `plan.yaml` modifications should be done under the `/plan/plan.yaml` section: - ``` - data_loader: - defaults : plan/defaults/data_loader.yaml - template: src.dataloader.TemplateDataLoader # Modify this line appropriately if you change the class name - settings: - # Add additional arguments (such as batch_size) that you wish to pass through `def __init__():` - # You do not need to pass in data_path here. It will be set by the collaborators - ``` - `batch_size` is passed to the `super().`__init__` method to ensure that the superclass is properly initialized with the specified batch size. - After calling `super().__init__`, define `self.X_train`, `self.y_train`, `self.X_valid`, and `self.y_valid`. - """ + +class PyTorchMNISTInMemory(PyTorchDataLoader): + """PyTorch data loader for MNIST dataset.""" def __init__(self, data_path, batch_size, **kwargs): - """Initialize the data loader. + """Instantiate the data object. + Args: - data_path: The file path to the data at the respective collaborator site. - batch_size: The batch size of the data loader. - **kwargs: Additional arguments that may be defined in `plan.yaml` + data_path: The file path to the data + batch_size: The batch size of the data loader + **kwargs: Additional arguments, passed to super + init and load_mnist_shard """ super().__init__(batch_size, **kwargs) - # Load the dataset using the provided data_path and any additional kwargs. - X_train, y_train, X_valid, y_valid = load_dataset(data_path, **kwargs) - - # Assign the loaded data to instance variables. + try: + int(data_path) + except: + raise ValueError( + "Expected `%s` to be representable as `int`, as it refers to the data shard " + + "number used by the collaborator.", + data_path + ) + + num_classes, X_train, y_train, X_valid, y_valid = load_mnist_shard( + shard_num=int(data_path), **kwargs + ) self.X_train = X_train self.y_train = y_train self.train_loader = self.get_train_loader() @@ -52,58 +46,93 @@ def __init__(self, data_path, batch_size, **kwargs): self.y_valid = y_valid self.val_loader = self.get_valid_loader() -def load_dataset(data_path, train_split_ratio=0.8, **kwargs): + self.num_classes = num_classes + + +def load_mnist_shard( + shard_num, collaborator_count, categorical=False, channels_last=True, **kwargs +): """ - Load your dataset here. - This function should be implemented to load the dataset from the given `data_path`. - You can use additional arguments passed via `**kwargs` if necessary. + Load the MNIST dataset. + Args: - data_path (str): Path to the data directory. - **kwargs: Additional arguments that may be defined in `plan.yaml` + shard_num (int): The shard to use from the dataset + collaborator_count (int): The number of collaborators in the + federation + categorical (bool): True = convert the labels to one-hot encoded + vectors (Default = True) + channels_last (bool): True = The input images have the channels + last (Default = True) + **kwargs: Additional parameters to pass to the function + Returns: - Tuple containing: - - numpy.ndarray: The training data. - - numpy.ndarray: The training labels. - - numpy.ndarray: The validation data. - - numpy.ndarray: The validation labels. + list: The input shape + int: The number of classes + numpy.ndarray: The training data + numpy.ndarray: The training labels + numpy.ndarray: The validation data + numpy.ndarray: The validation labels """ - # Implement dataset loading logic here and return the appropriate data. - # Replace the following placeholders with actual data loading code. - dataset = MNISTDataset( - root=data_path, - transform=Compose([Grayscale(num_output_channels=1), ToTensor()]) + num_classes = 10 + + (X_train, y_train), (X_valid, y_valid) = _load_raw_datashards( + shard_num, collaborator_count, transform=transforms.ToTensor() ) - n_train = int(train_split_ratio * len(dataset)) - n_valid = len(dataset) - n_train - ds_train, ds_val = random_split( - dataset, lengths=[n_train, n_valid], generator=manual_seed(0)) + logger.info(f"MNIST > X_train Shape : {X_train.shape}") + logger.info(f"MNIST > y_train Shape : {y_train.shape}") + logger.info(f"MNIST > Train Samples : {X_train.shape[0]}") + logger.info(f"MNIST > Valid Samples : {X_valid.shape[0]}") - X_train, y_train = list(zip(*ds_train)) + if categorical: + # convert class vectors to binary class matrices + y_train = one_hot(y_train, num_classes) + y_valid = one_hot(y_valid, num_classes) - X_train, y_train = np.stack(X_train), np.array(y_train) + return num_classes, X_train, y_train, X_valid, y_valid - X_valid, y_valid = list(zip(*ds_val)) - X_valid, y_valid = np.stack(X_valid), np.array(y_valid) - return X_train, y_train, X_valid, y_valid +def one_hot(labels, classes): + """ + One Hot encode a vector. -class MNISTDataset(ImageFolder): - """Encapsulates the MNIST dataset""" + Args: + labels (list): List of labels to onehot encode + classes (int): Total number of categorical classes - FOLDER_NAME = "mnist_images" - DEFAULT_PATH = path.join(path.expanduser('~'), '.openfl', 'data') + Returns: + np.array: Matrix of one-hot encoded labels + """ + return np.eye(classes)[labels] + + +def _load_raw_datashards(shard_num, collaborator_count, transform=None): + """ + Load the raw data by shard. + + Returns tuples of the dataset shard divided into training and validation. + + Args: + shard_num (int): The shard number to use + collaborator_count (int): The number of collaborators in the federation + transform: torchvision.transforms.Transform to apply to images + + Returns: + 2 tuples: (image, label) of the training, validation dataset + """ + train_data, val_data = ( + datasets.MNIST("data", train=train, download=True, transform=transform) + for train in (True, False) + ) + X_train_tot, y_train_tot = train_data.train_data, train_data.train_labels + X_valid_tot, y_valid_tot = val_data.test_data, val_data.test_labels - def __init__(self, root: str = DEFAULT_PATH, **kwargs) -> None: - """Initialize.""" - makedirs(root, exist_ok=True) + # create the shards + shard_num = int(shard_num) + X_train = X_train_tot[shard_num::collaborator_count].unsqueeze(1).float() + y_train = y_train_tot[shard_num::collaborator_count] - super(MNISTDataset, self).__init__( - path.join(root, MNISTDataset.FOLDER_NAME), **kwargs) + X_valid = X_valid_tot[shard_num::collaborator_count].unsqueeze(1).float() + y_valid = y_valid_tot[shard_num::collaborator_count] - def __getitem__(self, index): - """Allow getting items by slice index.""" - if isinstance(index, Iterable): - return [super(MNISTDataset, self).__getitem__(i) for i in index] - else: - return super(MNISTDataset, self).__getitem__(index) + return (X_train, y_train), (X_valid, y_valid) \ No newline at end of file