From f622efaf06778e12e7126c06995d5d669075de9b Mon Sep 17 00:00:00 2001 From: rajithkrishnegowda <134698520+rajithkrishnegowda@users.noreply.github.com> Date: Wed, 11 Dec 2024 18:39:23 +0530 Subject: [PATCH] adding task runner api example (#1199) * adding task runner example * corrected typo error * adding readme * adding task runner example * corrected typo error * adding readme * update README.md * removed unwanted part * requested changes done * moved to torch_cnn_mnist * changes done * added changes * fixing code format * fixing issue * modified changes requested * update plan.yaml * updated plan.yaml --- README.md | 7 +- openfl-workspace/torch_cnn_mnist/.workspace | 2 - openfl-workspace/torch_cnn_mnist/README.md | 212 ++++++++++++++++++ .../torch_cnn_mnist/plan/cols.yaml | 5 +- .../torch_cnn_mnist/plan/data.yaml | 11 +- .../torch_cnn_mnist/plan/defaults | 3 +- .../torch_cnn_mnist/plan/plan.yaml | 110 +++++---- .../torch_cnn_mnist/requirements.txt | 2 - .../torch_cnn_mnist/src/__init__.py | 4 +- .../torch_cnn_mnist/src/cnn_model.py | 151 +++++++++++++ .../torch_cnn_mnist/src/taskrunner.py | 134 +++++------ openfl-workspace/torch_cnn_mnist/src/utils.py | 21 -- 12 files changed, 501 insertions(+), 161 deletions(-) delete mode 100644 openfl-workspace/torch_cnn_mnist/.workspace create mode 100644 openfl-workspace/torch_cnn_mnist/README.md create mode 100644 openfl-workspace/torch_cnn_mnist/src/cnn_model.py delete mode 100644 openfl-workspace/torch_cnn_mnist/src/utils.py diff --git a/README.md b/README.md index eb5a28e8d4..717fece66d 100644 --- a/README.md +++ b/README.md @@ -40,12 +40,13 @@ OpenFL supports two APIs to set up a Federated Learning experiment: - [Task Runner API](https://openfl.readthedocs.io/en/latest/about/features_index/taskrunner.html): Define an experiment and distribute it manually. All participants can verify model code and [FL plan](https://openfl.readthedocs.io/en/latest/about/features_index/taskrunner.html#federated-learning-plan-fl-plan-settings) prior to execution. The federation is terminated when the experiment is finished. This API is meant for enterprise-grade FL experiments, including support for mTLS-based communication channels and TEE-ready nodes (based on Intel® SGX). +The quickest way to start testing the [TaskRunner API](https://openfl.readthedocs.io/en/latest/about/features_index/taskrunner.html) for managing and automating your tasks efficiently is to follow the steps outlined in the documentation. The TaskRunner API provides a simple and flexible interface to define, execute, and monitor tasks, making it an ideal choice for users looking to quickly integrate task automation into their projects. +
+Read the [GitHub README File](https://github.com/securefederatedai/openfl/tree/develop/openfl-workspace/torch_cnn_mnist/README.md) explaining steps to train a model with OpenFL.
+ - [Workflow API](https://openfl.readthedocs.io/en/latest/about/features_index/workflowinterface.html) ([*experimental*](https://openfl.readthedocs.io/en/latest/developer_guide/experimental_features.html)): Create complex experiments that extend beyond traditional horizontal federated learning. This API enables an experiment to be simulated locally, then seamlessly scaled to a federated setting. See the [experimental tutorials](https://github.com/securefederatedai/openfl/blob/develop/openfl-tutorials/experimental/workflow/) to learn how to coordinate [aggregator validation after collaborator model training](https://github.com/securefederatedai/openfl/tree/develop/openfl-tutorials/experimental/workflow/102_Aggregator_Validation.ipynb), [perform global differentially private federated learning](https://github.com/psfoley/openfl/tree/experimental-workflow-interface/openfl-tutorials/experimental/workflow/Global_DP), measure the amount of private information embedded in a model after collaborator training with [privacy meter](https://github.com/securefederatedai/openfl/blob/develop/openfl-tutorials/experimental/workflow/Privacy_Meter/readme.md), or [add a watermark to a federated model](https://github.com/securefederatedai/openfl/blob/develop/openfl-tutorials/experimental/workflow/301_MNIST_Watermarking.ipynb). -The quickest way to test OpenFL is to follow the [online documentation](https://openfl.readthedocs.io/en/latest/index.html) to launch your first federation.
-Read the [blog post](https://medium.com/openfl/from-centralized-machine-learning-to-federated-learning-with-openfl-b3e61da52432) explaining steps to train a model with OpenFL.
- ## Requirements OpenFL supports popular NumPy-based ML frameworks like TensorFlow, PyTorch and Jax which should be installed separately.
diff --git a/openfl-workspace/torch_cnn_mnist/.workspace b/openfl-workspace/torch_cnn_mnist/.workspace deleted file mode 100644 index 3c2c5d08b4..0000000000 --- a/openfl-workspace/torch_cnn_mnist/.workspace +++ /dev/null @@ -1,2 +0,0 @@ -current_plan_name: default - diff --git a/openfl-workspace/torch_cnn_mnist/README.md b/openfl-workspace/torch_cnn_mnist/README.md new file mode 100644 index 0000000000..8c363d205b --- /dev/null +++ b/openfl-workspace/torch_cnn_mnist/README.md @@ -0,0 +1,212 @@ +## Instantiating a Workspace from Torch Template +To instantiate a workspace from the torch_cnn_mnist template, you can use the fx workspace create command. This allows you to quickly set up a new workspace based on a predefined configuration and template. + +1. Ensure the necessary dependencies are installed. +``` +pip install virtualenv +mkdir ~/openfl-quickstart +virtualenv ~/openfl-quickstart/venv +source ~/openfl-quickstart/venv/bin/activate +pip install openfl +``` +2. Creating the Workspace Folder + +``` +cd ~/openfl-quickstart +fx workspace create --template torch_template --prefix fl_workspace +cd ~/openfl-quickstart/fl_workspace +``` + +## Directory Structure +The taskrunner workspace has the following file structure: +``` +taskrunner +├── requirements.txt # defines the required software packages +└── plan + ├── plan.yaml # the Federated Learning plan declaration + ├── cols.yaml # holds the list of authorized collaborators + ├── data.yaml # holds the collaborator data set path + ├── defaults # path to the default values for the FL plan +├── src + ├── __init__.py # treat src as a Python package + └── cnn_model.py # centralized CNN model, ready for use in federated learning + ├── dataloader.py # data loader module + └── taskrunner.py # task runner module +``` + +## Directory Breakdown: +* requirements.txt: Lists all the Python dependencies required to run the TaskRunner API and its components. Ensure you install these dependencies by running pip install -r requirements.txt. +* plan: Contains configuration files for federated learning: + - plan.yaml: The main Federated Learning plan declaration, defining the structure of the federated learning workflow. + - cols.yaml: A list of authorized collaborators for the federated learning task. + - data.yaml: Specifies the path to the data set for each collaborator. + - defaults: Path to the default configuration values for the federated learning plan. +* src: Contains the Python modules used for federated learning: + - init.py: Marks the src directory as a Python package, allowing you to import modules within the directory. + - cnn_model.py: Defines the Convolutional Neural Network (CNN) model for federated learning. + - dataloader.py: A module responsible for loading and processing datasets for the federated learning task. + - taskrunner.py: The core task runner module that manages the execution of federated learning tasks. + +## Defining the Data Loader +The data loader in OpenFL is responsible for batching and iterating through the dataset that will be used for local training and validation on each collaborator node. The PyTorchMNISTInMemory class is responsible for batching and iterating through the MNIST data set, additionally sharded "on the fly". + +To customize the PyTorchMNISTInMemory class, you need to implement the load_mnist_shard() function to process the dataset available at data_path on the local file system. The data_path parameter represents the data shard number used by the collaborator. This setup allows each collaborator to work with a specific subset of the data, facilitating distributed training. + +The load_mnist_shard() function is responsible for loading the MNIST dataset, dividing it into training and validation sets, and applying necessary transformations. The data is then batched and made ready for the training process. + +# Modify the dataloader to support "Bring Your Own Data" +You can either try to implement the placeholders by yourself, or get the solution from [dataloader.py](https://github.com/securefederatedai/openfl-contrib/blob/main/openfl_contrib_tutorials/ml_to_fl/federated/src/dataloader.py) +Also, update the data loader class name in plan.yaml accordingly. + +``` +import numpy as np +from typing import Iterator, Tuple +from openfl.federated import PyTorchTaskRunner +from openfl.utilities import Metric +import torch.optim as optim +import torch.nn.functional as F +from src.cnn_model import DigitRecognizerCNN, train_epoch, validate + +class MNISTShardDataLoader(PyTorchDataLoader): + + def __init__(self, data_path, batch_size, **kwargs): + super().__init__(batch_size, **kwargs) + + # Load the dataset using the provided data_path and any additional kwargs. + X_train, y_train, X_valid, y_valid = load_dataset(data_path, **kwargs) + + # Assign the loaded data to instance variables. + self.X_train = X_train + self.y_train = y_train + + self.X_valid = X_valid + self.y_valid = y_valid + +def load_dataset(data_path, train_split_ratio=0.8, **kwargs): + dataset = MNISTDataset( + root=data_path, + transform=Compose([Grayscale(num_output_channels=1), ToTensor()]) + ) + n_train = int(train_split_ratio * len(dataset)) + n_valid = len(dataset) - n_train + + ds_train, ds_val = random_split( + dataset, lengths=[n_train, n_valid], generator=manual_seed(0)) + + X_train, y_train = list(zip(*ds_train)) + X_train, y_train = np.stack(X_train), np.array(y_train) + + X_valid, y_valid = list(zip(*ds_val)) + X_valid, y_valid = np.stack(X_valid), np.array(y_valid) + + return X_train, y_train, X_valid, y_valid + +class MNISTDataset(ImageFolder): + """Encapsulates the MNIST dataset""" + + FOLDER_NAME = "mnist_images" + DEFAULT_PATH = path.join(path.expanduser('~'), '.openfl', 'data') + + def __init__(self, root: str = DEFAULT_PATH, **kwargs) -> None: + """Initialize.""" + makedirs(root, exist_ok=True) + + super(MNISTDataset, self).__init__( + path.join(root, MNISTDataset.FOLDER_NAME), **kwargs) + + def __getitem__(self, index): + """Allow getting items by slice index.""" + if isinstance(index, Iterable): + return [super(MNISTDataset, self).__getitem__(i) for i in index] + else: + return super(MNISTDataset, self).__getitem__(index) +``` + +## Defining the Task Runner +The Task Runner class defines the actual computational tasks of the FL experiment (such as local training and validation). We can implement the placeholders of the TemplateTaskRunner class (src/taskrunner.py) by importing the DigitRecognizerCNN model, as well as the train_epoch() and validate() helper functions from the centralized ML script. The template also provides placeholders for providing custom optimizer and loss function objects. + +## How to run this tutorial (local simulation): +The fx plan initialize command bootstraps the workspace by first setting the initial weights of the aggregate model. It then parses the plan, updates the aggregator address if necessary, and produces a hash of the initialized plan for integrity and auditing purposes. + +To help OpenFL calculate the initial model weights, we need to provide the shape of the input tensor as an additional parameter. For the MNIST data set of grayscale (single-channel) 28x28 pixel images, the input tensor shape is [1,28,28]. We will also use a locally deployed aggregator (localhost). Thus, the workspace initialization command for our local federation becomes: + +``` +mkdir save +fx plan initialize --input_shape [1,28,28] --aggregator_address localhost +``` + +We can now perform a test run with the following commands for creating a local PKI setup and starting the aggregator and the collaborators on the same machine: + +``` +cd ~/openfl/openfl-tutorials/taskrunner/ + +# This will create a local certificate authority (CA), so the participants communicate over a secure TLS Channel +fx workspace certify + +################################################################# +# Step 1: Setup the Aggregator # +################################################################# + +# Generate a Certificate Signing Request (CSR) for the Aggregator +fx aggregator generate-cert-request --fqdn localhost + +# The CA signs the aggregator's request, which is now available in the workspace +fx aggregator certify --fqdn localhost --silent + +################################ +# Step 2: Setup Collaborator 1 # +################################ + +# Create a collaborator named "collaborator1" that will use data path "data/1" +# This command adds the collaborator1,data/1 entry in data.yaml +fx collaborator create -n collaborator1 -d 1 + +# Generate a CSR for collaborator1 +fx collaborator generate-cert-request -n collaborator1 + +# The CA signs collaborator1's certificate, adding an entry to the authorized cols.yaml +fx collaborator certify -n collaborator1 --silent + +################################ +# Step 3: Setup Collaborator 2 # +################################ + +# Create a collaborator named "collaborator2" that will use data path "data/2" +# This command adds the collaborator2,data/2 entry in data.yaml +fx collaborator create -n collaborator2 -d 2 + +# Generate a CSR for collaborator2 +fx collaborator generate-cert-request -n collaborator2 + +# The CA signs collaborator2's certificate, adding an entry to the authorized cols.yaml +fx collaborator certify -n collaborator2 --silent + +############################## +# Step 4. Run the Federation # +############################## + +fx aggregator start & fx collaborator start -n collaborator1 & fx collaborator start -n collaborator2 +``` + +A successful local simulation of the FL workspace involves the aggregator and collaborators completing a round of training, saving the best-performing model under save/best.pbuf, and exiting with a unanimous “End of Federation reached…”: + +## Sample output +``` +INFO Round: 1, Collaborators that have completed all tasks: ['collaborator2', 'collaborator1'] + METRIC {'metric_origin': 'aggregator', 'task_name': 'aggregated_model_validation', 'metric_name': 'accuracy', 'metric_value': + 0.8915090382660382, 'round': 1} + METRIC Round 1: saved the best model with score 0.891509 + METRIC {'metric_origin': 'aggregator', 'task_name': 'train', 'metric_name': 'training loss', 'metric_value': 0.2952194180338876, + 'round': 1} + METRIC {'metric_origin': 'aggregator', 'task_name': 'locally_tuned_model_validation', 'metric_name': 'accuracy', 'metric_value': + 0.9181734901767464, 'round': 1} +INFO Saving round 1 model... +INFO Experiment Completed. Cleaning up... +INFO Waiting for tasks... +INFO Sending signal to collaborator collaborator1 to shutdown... +INFO End of Federation reached. Exiting... + +INFO Waiting for tasks... +INFO Sending signal to collaborator collaborator2 to shutdown... +INFO End of Federation reached. Exiting... +``` \ No newline at end of file diff --git a/openfl-workspace/torch_cnn_mnist/plan/cols.yaml b/openfl-workspace/torch_cnn_mnist/plan/cols.yaml index 95307de3bc..b60b50e5a8 100644 --- a/openfl-workspace/torch_cnn_mnist/plan/cols.yaml +++ b/openfl-workspace/torch_cnn_mnist/plan/cols.yaml @@ -1,5 +1,6 @@ -# Copyright (C) 2020-2021 Intel Corporation +# Copyright (C) 2020-2024 Intel Corporation # Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you. collaborators: - \ No newline at end of file +- collaborator1 +- collaborator2 \ No newline at end of file diff --git a/openfl-workspace/torch_cnn_mnist/plan/data.yaml b/openfl-workspace/torch_cnn_mnist/plan/data.yaml index cc7e6c9595..ebd2ad65d3 100644 --- a/openfl-workspace/torch_cnn_mnist/plan/data.yaml +++ b/openfl-workspace/torch_cnn_mnist/plan/data.yaml @@ -1,9 +1,6 @@ -## Copyright (C) 2020-2021 Intel Corporation +# Copyright (C) 2020-2024 Intel Corporation # Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you. -# all keys under 'collaborators' corresponds to a specific colaborator name the corresponding dictionary has data_name, data_path pairs. -# Note that in the mnist case we do not store the data locally, and the data_path is used to pass an integer that helps the data object -# construct the shard of the mnist dataset to be use for this collaborator. - -# collaborator_name ,data_directory_path -one,1 \ No newline at end of file +# collaborator_name,data_directory_path +collaborator1,1 +collaborator2,2 diff --git a/openfl-workspace/torch_cnn_mnist/plan/defaults b/openfl-workspace/torch_cnn_mnist/plan/defaults index fb82f9c5b6..5042bedbcf 100644 --- a/openfl-workspace/torch_cnn_mnist/plan/defaults +++ b/openfl-workspace/torch_cnn_mnist/plan/defaults @@ -1,2 +1 @@ -../../workspace/plan/defaults - +../../workspace/plan/defaults \ No newline at end of file diff --git a/openfl-workspace/torch_cnn_mnist/plan/plan.yaml b/openfl-workspace/torch_cnn_mnist/plan/plan.yaml index d85442e8e4..6d38003735 100644 --- a/openfl-workspace/torch_cnn_mnist/plan/plan.yaml +++ b/openfl-workspace/torch_cnn_mnist/plan/plan.yaml @@ -1,45 +1,69 @@ -# Copyright (C) 2020-2021 Intel Corporation +# Copyright (C) 2020-2024 Intel Corporation # Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you. -aggregator : - defaults : plan/defaults/aggregator.yaml - template : openfl.component.Aggregator - settings : - init_state_path : save/torch_cnn_mnist_init.pbuf - best_state_path : save/torch_cnn_mnist_best.pbuf - last_state_path : save/torch_cnn_mnist_last.pbuf - rounds_to_train : 10 - log_metric_callback : - template : src.utils.write_metric - - -collaborator : - defaults : plan/defaults/collaborator.yaml - template : openfl.component.Collaborator - settings : - delta_updates : false - opt_treatment : RESET - -data_loader : - defaults : plan/defaults/data_loader.yaml - template : src.dataloader.PyTorchMNISTInMemory - settings : - collaborator_count : 2 - data_group_name : mnist - batch_size : 256 - -task_runner : - defaults : plan/defaults/task_runner.yaml - template : src.taskrunner.PyTorchCNN - -network : - defaults : plan/defaults/network.yaml - -assigner : - defaults : plan/defaults/assigner.yaml - -tasks : - defaults : plan/defaults/tasks_torch.yaml - -compression_pipeline : - defaults : plan/defaults/compression_pipeline.yaml +aggregator: + settings: + best_state_path: save/best.pbuf + db_store_rounds: 2 + init_state_path: save/init.pbuf + last_state_path: save/last.pbuf + rounds_to_train: 2 + write_logs: false + template: openfl.component.aggregator.Aggregator +assigner: + settings: + task_groups: + - name: train_and_validate + percentage: 1.0 + tasks: + - aggregated_model_validation + - train + - locally_tuned_model_validation + template: openfl.component.RandomGroupedAssigner +collaborator: + settings: + db_store_rounds: 1 + delta_updates: false + opt_treatment: RESET + template: openfl.component.collaborator.Collaborator +compression_pipeline: + settings: {} + template: openfl.pipelines.NoCompressionPipeline +data_loader: + settings: + batch_size: 64 + collaborator_count: 2 + template: src.dataloader.PyTorchMNISTInMemory +network: + settings: + agg_addr: localhost + agg_port: 59583 + cert_folder: cert + client_reconnect_interval: 5 + require_client_auth: true + hash_salt: auto + use_tls: true + template: openfl.federation.Network +task_runner: + settings: {} + template: src.taskrunner.TemplateTaskRunner +tasks: + aggregated_model_validation: + function: validate_task + kwargs: + apply: global + metrics: + - acc + locally_tuned_model_validation: + function: validate_task + kwargs: + apply: local + metrics: + - acc + settings: {} + train: + function: train_task + kwargs: + epochs: 1 + metrics: + - loss diff --git a/openfl-workspace/torch_cnn_mnist/requirements.txt b/openfl-workspace/torch_cnn_mnist/requirements.txt index 8b084abb8f..62c8356720 100644 --- a/openfl-workspace/torch_cnn_mnist/requirements.txt +++ b/openfl-workspace/torch_cnn_mnist/requirements.txt @@ -1,4 +1,2 @@ -tensorboard torch==2.3.1 torchvision==0.18.1 -wheel>=0.38.0 # not directly required, pinned by Snyk to avoid a vulnerability diff --git a/openfl-workspace/torch_cnn_mnist/src/__init__.py b/openfl-workspace/torch_cnn_mnist/src/__init__.py index d5df5b8668..62a08a5da8 100644 --- a/openfl-workspace/torch_cnn_mnist/src/__init__.py +++ b/openfl-workspace/torch_cnn_mnist/src/__init__.py @@ -1,3 +1,3 @@ -# Copyright (C) 2020-2021 Intel Corporation +# Copyright (C) 2020-2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 -"""You may copy this file as the starting point of your own model.""" +"""You may copy this file as the starting point of your own model.""" diff --git a/openfl-workspace/torch_cnn_mnist/src/cnn_model.py b/openfl-workspace/torch_cnn_mnist/src/cnn_model.py new file mode 100644 index 0000000000..aebad53904 --- /dev/null +++ b/openfl-workspace/torch_cnn_mnist/src/cnn_model.py @@ -0,0 +1,151 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import numpy as np +import torch +import torch.nn as nn +import torch.nn.functional as F + +class DigitRecognizerCNN(nn.Module): + """ + A Convolutional Neural Network (CNN) model for digit recognition. + + This model consists of two convolutional layers followed by two fully connected layers. + It uses ReLU activations and max pooling. + + Attributes: + conv1 (nn.Conv2d): The first convolutional layer. + conv2 (nn.Conv2d): The second convolutional layer. + fc1 (nn.Linear): The first fully connected layer. + fc2 (nn.Linear): The second fully connected layer. + + Methods: + forward(x): + Defines the forward pass of the model. + Args: + x (torch.Tensor): Input tensor of shape (batch_size, 1, height, width). + Returns: + torch.Tensor: Output tensor of shape (batch_size, 10). + """ + + def __init__(self, **kwargs): + """ + Initializes the DigitRecognizerCNN model. + + Args: + **kwargs: Additional keyword arguments to pass to the parent class initializer. + + Attributes: + conv1 (nn.Conv2d): First convolutional layer with 1 input channel, 20 output channels, + kernel size of 2, and stride of 1. + conv2 (nn.Conv2d): Second convolutional layer with 20 input channels, 50 output channels, + kernel size of 5, and stride of 1. + fc1 (nn.Linear): First fully connected layer with 800 input features and 500 output features. + fc2 (nn.Linear): Second fully connected layer with 500 input features and 10 output features. + """ + super(DigitRecognizerCNN, self).__init__(**kwargs) + self.conv1 = nn.Conv2d(1, 20, 2, 1) + self.conv2 = nn.Conv2d(20, 50, 5, 1) + self.fc1 = nn.Linear(800, 500) + self.fc2 = nn.Linear(500, 10) + + def forward(self, x): + """ + Defines the forward pass of the CNN model. + + Args: + x (torch.Tensor): Input tensor of shape (N, C, H, W) where + N is the batch size, + C is the number of channels, + H is the height, and + W is the width. + + Returns: + torch.Tensor: Output tensor after passing through the CNN layers. + """ + x = F.relu(self.conv1(x)) + x = F.max_pool2d(x, 2, 2) + x = F.relu(self.conv2(x)) + x = F.max_pool2d(x, 2, 2) + x = x.view(-1, 800) + x = F.relu(self.fc1(x)) + x = self.fc2(x) + + return x + +def train(model, optimizer, loss_fn, dataloader, device="cpu", epochs=1): + """ + Trains the given model using the specified optimizer and loss function over the provided dataloader. + + Args: + model (torch.nn.Module): The neural network model to be trained. + optimizer (torch.optim.Optimizer): The optimizer used to update the model parameters. + loss_fn (callable): The loss function used to compute the loss. + dataloader (torch.utils.data.DataLoader): The dataloader providing the training data. + device (str, optional): The device to run the training on ("cpu" or "cuda"). Defaults to "cpu". + epochs (int, optional): The number of epochs to train the model. Defaults to 1. + + Returns: + float: The average loss of the last epoch. + """ + for epoch in range(epochs): + print(f"Starting epoch {epoch + 1}/{epochs}") + average_loss = train_epoch(model, optimizer, loss_fn, dataloader, device) + print(f"Completed epoch {epoch + 1}/{epochs} with average loss: {average_loss}") + + return average_loss + +def validate(model, test_dataloader, device="cpu"): + """ + Validate the given model using the provided test data loader. + + Args: + model (torch.nn.Module): The model to be validated. + test_dataloader (torch.utils.data.DataLoader): DataLoader containing the test dataset. + device (str, optional): The device to run the validation on (default is "cpu"). + + Returns: + float: The accuracy of the model on the test dataset. + """ + total_correct = 0 + total_samples = 0 + + for data, target in test_dataloader: + data = torch.as_tensor(data).to(device) + target = torch.as_tensor(target).to(device) + output = model(data) + pred = output.argmax(dim=1, keepdim=True) + total_correct += pred.eq(target.view_as(pred)).sum().item() + total_samples += len(target) + + return total_correct / total_samples + +def train_epoch(model, optimizer, loss_fn, dataloader, device="cpu"): + """ + Trains the model for one epoch. + + Args: + model (torch.nn.Module): The neural network model to be trained. + optimizer (torch.optim.Optimizer): The optimizer used for updating the model parameters. + loss_fn (callable): The loss function used to compute the loss. + dataloader (torch.utils.data.DataLoader): The DataLoader providing the training data. + device (str, optional): The device to run the training on ("cpu" or "cuda"). Defaults to "cpu". + + Returns: + float: The average loss over all batches in the epoch. + """ + total_loss = 0 + num_batches = 0 + for data, target in dataloader: + data = torch.as_tensor(data).to(device) + target = torch.as_tensor(target).to(device) + optimizer.zero_grad() + output = model(data) + loss = loss_fn(output, target) + loss.backward() + optimizer.step() + total_loss += loss.detach().cpu().item() + num_batches += 1 + average_loss = total_loss / num_batches + + return average_loss diff --git a/openfl-workspace/torch_cnn_mnist/src/taskrunner.py b/openfl-workspace/torch_cnn_mnist/src/taskrunner.py index b68ce9b2a1..03eb39fb48 100644 --- a/openfl-workspace/torch_cnn_mnist/src/taskrunner.py +++ b/openfl-workspace/torch_cnn_mnist/src/taskrunner.py @@ -1,120 +1,100 @@ # Copyright (C) 2020-2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 +# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you. -"""You may copy this file as the starting point of your own model.""" import numpy as np -import torch -import torch.nn as nn -import torch.nn.functional as F -import torch.optim as optim -from typing import Iterator -from typing import Tuple + +from typing import Iterator, Tuple from openfl.federated import PyTorchTaskRunner from openfl.utilities import Metric - -class PyTorchCNN(PyTorchTaskRunner): - """ - Simple CNN for classification. - - PyTorchTaskRunner inherits from nn.module, so you can define your model - in the same way that you would for PyTorch +import torch.optim as optim +import torch.nn.functional as F +from src.cnn_model import DigitRecognizerCNN, train_epoch, validate + +class TemplateTaskRunner(PyTorchTaskRunner): + """Template Task Runner for PyTorch. + + This class should be used as a template to create a custom Task Runner for your specific model and training workload. + After generating this template, you should: + 1. Define your model, optimizer, and loss function as you would in PyTorch. PyTorchTaskRunner inherits from torch.nn.Module. + 2. Implement the `train_` and `validate_` functions to define a single train and validate epoch of your workload. + 3. Modify the `plan.yaml` file to use this Task Runner. + + The `plan.yaml` modifications should be done under the `/plan/plan.yaml` section: + ``` + task_runner: + defaults : plan/defaults/task_runner.yaml + template: src.taskrunner.TemplateTaskRunner # Modify this line appropriately if you change the class name + settings: + # Add additional arguments that you wish to pass through `__init__` + ``` + + Define the `forward` method of this class as a forward pass through the model. """ def __init__(self, device="cpu", **kwargs): - """Initialize. + """Initialize the Task Runner. Args: - device: The hardware device to use for training (Default = "cpu") - **kwargs: Additional arguments to pass to the function - + device: The hardware device to use for training (Default = "cpu"). + **kwargs: Additional arguments that may be defined in `plan.yaml` """ super().__init__(device=device, **kwargs) # Define the model - self.conv1 = nn.Conv2d(1, 20, 2, 1) - self.conv2 = nn.Conv2d(20, 50, 5, 1) - self.fc1 = nn.Linear(800, 500) - self.fc2 = nn.Linear(500, 10) + self.model = DigitRecognizerCNN() self.to(device) - # `self.optimizer` must be set for optimizer weights to be federated - self.optimizer = optim.Adam(self.parameters(), lr=1e-4) + # Define the optimizer + self.optimizer = optim.Adam(self.model.parameters(), lr=1e-4) - # Set the loss function + # Define the loss function self.loss_fn = F.cross_entropy def forward(self, x): - """ - Forward pass of the model. + """Forward pass of the model. Args: - x: Data input to the model for the forward pass + x: Data input to the model for the forward pass. + + Returns: + The output of the model's forward pass. """ - x = F.relu(self.conv1(x)) - x = F.max_pool2d(x, 2, 2) - x = F.relu(self.conv2(x)) - x = F.max_pool2d(x, 2, 2) - x = x.view(-1, 800) - x = F.relu(self.fc1(x)) - x = self.fc2(x) - return x + return self.model(x) def train_( self, train_dataloader: Iterator[Tuple[np.ndarray, np.ndarray]] ) -> Metric: - """ - Train single epoch. - - Override this function in order to use custom training. + """Single Training epoch. Args: - train_dataloader: Train dataset batch generator. Yields (samples, targets) tuples of - size = `self.data_loader.batch_size`. + train_dataloader: Train dataset batch generator. Yields (samples, targets) tuples + of size = `self.train_dataloader.batch_size`. + Returns: - Metric: An object containing name and np.ndarray value. + Metric: An object containing the name of the metric and its value as an np.ndarray. """ - losses = [] - for data, target in train_dataloader: - data, target = data.to(self.device), target.to(self.device) - self.optimizer.zero_grad() - output = self(data) - loss = self.loss_fn(output, target) - loss.backward() - self.optimizer.step() - losses.append(loss.detach().cpu().numpy()) - loss = np.mean(losses) - return Metric(name=self.loss_fn.__name__, value=np.array(loss)) + # Implement training logic here and return a Metric object with the training loss. + # Replace the following placeholder with actual training code. + + loss = train_epoch(self.model, self.optimizer, self.loss_fn, train_dataloader, self.device) + return Metric(name="crossentropy_loss", value=np.array(loss)) def validate_( self, validation_dataloader: Iterator[Tuple[np.ndarray, np.ndarray]] ) -> Metric: - """ - Perform validation on PyTorch Model - - Override this function for your own custom validation function + """Single validation epoch. Args: - validation_dataloader: Validation dataset batch generator. - Yields (samples, targets) tuples + validation_dataloader: Validation dataset batch generator. Yields (samples, targets) tuples. + of size = `self.validation_dataloader.batch_size`. + Returns: - Metric: An object containing name and np.ndarray value + Metric: An object containing the name of the metric and its value as an np.ndarray. """ + # Implement validation logic here and return a Metric object with the validation accuracy. + # Replace the following placeholder with actual validation code. - total_samples = 0 - val_score = 0 - with torch.no_grad(): - for data, target in validation_dataloader: - samples = target.shape[0] - total_samples += samples - data, target = data.to(self.device), target.to( - self.device, dtype=torch.int64 - ) - output = self(data) - # get the index of the max log-probability - pred = output.argmax(dim=1) - val_score += pred.eq(target).sum().cpu().numpy() - - accuracy = val_score / total_samples + accuracy = validate(self.model, validation_dataloader, self.device) return Metric(name="accuracy", value=np.array(accuracy)) diff --git a/openfl-workspace/torch_cnn_mnist/src/utils.py b/openfl-workspace/torch_cnn_mnist/src/utils.py deleted file mode 100644 index 6b6124e76a..0000000000 --- a/openfl-workspace/torch_cnn_mnist/src/utils.py +++ /dev/null @@ -1,21 +0,0 @@ -# Copyright (C) 2020-2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -"""You may copy this file as the starting point of your own utilities.""" - -from torch.utils.tensorboard import SummaryWriter - -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number)