Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow based tutorial for Tensorflow CNN training on MNIST data #1164

Merged
merged 5 commits into from
Nov 26, 2024
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
376 changes: 376 additions & 0 deletions openfl-tutorials/experimental/104_Keras_MNIST_with_CPU.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,376 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "b0d201a8",
"metadata": {},
"source": [
"# # Workflow Interface 104: Working with Keras on CPU\n",
"[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/snehal-das/openfl/blob/develop/openfl-tutorials/experimental/104_Keras_MNIST_with_CPU.ipynb)\n",
"\n",
"## Training a CNN on CPU using the Workflow Interface and MNIST data.\n",
"\n",
"The workflow interface is a way of orchestrating a federated learning experiment with OpenFL. The fundamental idea is to allow collaborators to train the model using the training data, while the aggregator is largely responsible for aggregating the model weights returned by the collaborators.\n",
"\n",
"The experiment can be broken down into the following steps:\n",
"1. Installing pre-requisites: This includes OpenFL, Tensorflow, Keras and NumPy for this example.\n",
"2. Downloading the training and testing data.\n",
"3. Setting up the neural network for training.\n",
"4. Define the Aggregator, Collaborators.\n",
"5. Defining the Workflow - This forms the crux of the example, intending to demonstrate how the training gets split between the aggregator and collaborators.\n",
"6. Running the experiment and evaluating the model performance."
]
},
{
"cell_type": "markdown",
"id": "1888be23",
"metadata": {},
"source": [
"#### STEP#1: Install pre-requisites for the exercise, including OpenFL and Tensorflow."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f7f98600",
"metadata": {},
"outputs": [],
"source": [
"#Install openfl and required packages for the workflow APIs to function\n",
"%pip install git+https://github.com/securefederatedai/openfl.git\n",
"#%pip install -r workflow_interface_requirements.txt\n",
"\n",
"#Install Tensorflow to access Keras\n",
"%pip install tensorflow==2.17\n",
"\n",
"# Uncomment this if running in Google Colab and set USERNAME if running in docker container.\n",
"%pip install -r https://raw.githubusercontent.com/intel/openfl/develop/openfl-tutorials/experimental/workflow_interface_requirements.txt\n",
"import os\n",
"os.environ[\"USERNAME\"] = \"colab\""
]
},
{
"cell_type": "markdown",
"id": "5f64c9d5",
"metadata": {},
"source": [
"#### STEP#2: Download testing and training data.\n",
"\n",
"For this example, we rely on the load_data() API of MNIST which upon being called downloads a total of 70,000 images of handwritten digits - 60,000 for training and 10,000 of testing the neural network model.\n",
"\n",
"For more details on the implementation, refer to: https://github.com/keras-team/keras/blob/master/keras/src/datasets/mnist.py#L10"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c2352436",
"metadata": {},
"outputs": [],
"source": [
"import tensorflow as tf\n",
"import tensorflow.python.keras as keras\n",
"from keras.utils import to_categorical\n",
"from keras.datasets import mnist\n",
"\n",
"nb_classes = 10\n",
"(X_train, y_train), (X_test, y_test) = mnist.load_data()\n",
"print(\"X_train original shape\", X_train.shape)\n",
"print(\"y_train original shape\", y_train.shape)\n",
"\n",
"# It is important to make sure that all values are scaled to the range [0..1] before you \n",
"# pass them to a neural network - it is the usual convention for data preparation, \n",
"# and all default weight initializations in neural networks are designed to work with this range.\n",
"# To achieve this:\n",
"# - Covert the integer values [0...255] to float32 \n",
"# - Divide each pixel value by 255 to get values in the range [0...1]\n",
"\n",
"X_train = X_train.astype(\"float32\")\n",
"X_test = X_test.astype(\"float32\")\n",
"X_train /= 255.0\n",
"X_test /= 255.0\n",
"\n",
"print(\"Training matrix shape\", X_train.shape)\n",
"print(\"Testing matrix shape\", X_test.shape)\n",
"\n",
"Y_train = to_categorical(y_train, nb_classes)\n",
"Y_test = to_categorical(y_test, nb_classes)"
]
},
{
"cell_type": "markdown",
"id": "7eb5da5d",
"metadata": {},
"source": [
"At this point, we have installed the necessary pre-requisites, imported required packages and downloaded the dataset.\n",
"Next we define the NN model, pre-process the data for learning and define helper functions for training."
]
},
{
"cell_type": "markdown",
"id": "96ca8867",
"metadata": {},
"source": [
"#### STEP#3: Setup the Neural Network Model; define helper functions."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "af50829f",
"metadata": {},
"outputs": [],
"source": [
"from keras.layers import Flatten, Dense, Dropout, Conv2D, MaxPool2D\n",
"from keras.models import Sequential\n",
"import numpy as np\n",
"\n",
"model = Sequential([\n",
" Conv2D(filters=32, kernel_size=(3, 3), activation=\"relu\", input_shape=(28, 28, 1)),\n",
" MaxPool2D(), \n",
" Flatten(), # Converts the multi-dimensional feature map into a one-dimensional vector.\n",
" Dense(512, activation=\"relu\"), # A fully connected layer with 512 neurons, used to learn higher-level abstract features and representations.\n",
" Dropout(0.2), # Randomly sets 20% of the neurons' outputs to zero during training, avoids relying on a specific neuron.\n",
" Dense(512, activation=\"relu\"), # A fully connected layer with 512 neurons, used to learn higher-level abstract features and representations.\n",
" Dropout(0.2), # Randomly sets 20% of the neurons' outputs to zero during training, avoids relying on a specific neuron.\n",
" Dense(nb_classes, activation=\"softmax\"), # To turn output vector into probability vector. To understand why, refer to the comment below.\n",
"])\n",
"# Because the output of a fully-connected layer is not normalized to be between 0 and 1, it cannot be thought of as probability. \n",
"# Moreover, if we want outputs to be probabilities of different digits, they all need to add up to 1. \n",
"# To turn output vectors into probability vector, a function called Softmax is often used as \n",
"# the last activation function in a classification neural network. For example, softmax([−1,1,2])=[0.035,0.25,0.705].\n",
"\n",
"model.compile(optimizer=\"adam\", loss=\"categorical_crossentropy\", metrics=[\"accuracy\"])\n",
"print(model.summary())\n",
"\n",
"# Helper function to merge the model weights returned from each of the collaborators.\n",
"def FedAvg(models):\n",
" new_model = models[0]\n",
" state_dicts = [model.weights for model in models]\n",
" state_dict = new_model.weights\n",
" for idx, _ in enumerate(models[1].weights):\n",
" state_dict[idx] = np.sum(np.array([state[idx]\n",
" for state in state_dicts], dtype=object), axis=0) / len(models)\n",
" new_model.set_weights(state_dict)\n",
" return new_model\n",
"\n",
"# Helper function to calculate the loss, accurancy of a given model. \n",
"def inference(model, test_loader, batch_size):\n",
" x_test, y_test = test_loader\n",
" loss, accuracy = model.evaluate(\n",
" x_test,\n",
" y_test,\n",
" batch_size=batch_size,\n",
" verbose=0\n",
" )\n",
" accuracy_percentage = accuracy * 100\n",
" print(f\"Test set: Avg. loss: {loss}, Accuracy: {accuracy_percentage:.2f}%\")\n",
" return accuracy"
]
},
{
"cell_type": "markdown",
"id": "72bfdbed",
"metadata": {},
"source": [
"#### STEP#4: Initialize the Aggregator and Collaborators.\n",
"\n",
"We import the `FLSpec`, `LocalRuntime`, and the aggregator, collaborator placement decorators.\n",
"\n",
"- `FLSpec` – Defines the flow specification. User defined flows are subclasses of this.\n",
"- `Runtime` – Defines where the flow runs, infrastructure for task transitions (how information gets sent). The `LocalRuntime` runs the flow on a single node.\n",
"- `aggregator/collaborator` - placement decorators that define where the task will be assigned.\n",
"\n",
"Edit collaborator_names to add/remove collaborators."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3a5964c0",
"metadata": {},
"outputs": [],
"source": [
"from openfl.experimental.interface import FLSpec, Aggregator, Collaborator\n",
"from openfl.experimental.runtime import LocalRuntime\n",
"from openfl.experimental.placement import aggregator, collaborator\n",
"\n",
"agg = Aggregator()\n",
"\n",
"collaborator_names = [\"Seattle\", \"London\"]\n",
"\n",
"def callable_to_initialize_collaborator_private_attributes(n_collaborators, index, train_dataset, test_dataset, batch_size):\n",
" from openfl.utilities.data_splitters import EqualNumPyDataSplitter\n",
" train_splitter = EqualNumPyDataSplitter()\n",
" test_splitter = EqualNumPyDataSplitter()\n",
"\n",
" X_train, y_train = train_dataset\n",
" X_test, y_test = test_dataset\n",
"\n",
" train_idx = train_splitter.split(y_train, n_collaborators)\n",
" valid_idx = test_splitter.split(y_test, n_collaborators)\n",
"\n",
" train_dataset = X_train[train_idx[index]], y_train[train_idx[index]]\n",
" test_dataset = X_test[valid_idx[index]], y_test[valid_idx[index]]\n",
"\n",
" return {\n",
" \"train_loader\": train_dataset, \n",
" \"test_loader\": test_dataset,\n",
" \"batch_size\": batch_size\n",
" }\n",
"\n",
"# Setup collaborators private attributes via callable function\n",
"collaborators = []\n",
"for idx, collaborator_name in enumerate(collaborator_names):\n",
" collaborators.append(\n",
" Collaborator(\n",
" name=collaborator_name,\n",
" num_cpus=1,\n",
" num_gpus=0,\n",
" private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n",
" n_collaborators=len(collaborator_names),\n",
" index=idx,\n",
" train_dataset=(X_train, Y_train),\n",
" test_dataset=(X_test, Y_test),\n",
" batch_size=64\n",
" )\n",
" )\n",
"\n",
"local_runtime = LocalRuntime(aggregator=agg, collaborators=collaborators, backend=\"ray\")\n",
"print(f'Local runtime collaborators = {local_runtime.collaborators}')"
]
},
{
"cell_type": "markdown",
"id": "e6ba622b",
"metadata": {},
"source": [
"#### STEP#5: Define the workflow needed to train the model using the data and participants.\n",
"\n",
"Now we come to the flow definition. The OpenFL Workflow Interface adopts the conventions set by Metaflow, that every workflow begins with `start` and concludes with the `end` task. The aggregator begins with an optionally passed in model and optimizer. The aggregator begins the flow with the `start` task, where the list of collaborators is extracted from the runtime (`self.collaborators = self.runtime.collaborators`) and is then used as the list of participants to run the task listed in `self.next`, `aggregated_model_validation`. The model, optimizer, and anything that is not explicitly excluded from the next function will be passed from the `start` function on the aggregator to the `aggregated_model_validation` task on the collaborator. Where the tasks run is determined by the placement decorator that precedes each task definition (`@aggregator` or `@collaborator`). Once each of the collaborators (defined in the runtime) complete the `aggregated_model_validation` task, they pass their current state onto the `train` task, from `train` to `local_model_validation`, and then finally to `join` at the aggregator. It is in `join` that an average is taken of the model weights, and the next round can begin."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "819d60e6",
"metadata": {},
"outputs": [],
"source": [
"class KerasMNISTWorkflow(FLSpec):\n",
" def __init__(self, model, rounds=3, **kwargs):\n",
" super().__init__(**kwargs)\n",
" self.model = model\n",
" self.n_rounds = rounds\n",
" self.current_round = 1\n",
"\n",
" @aggregator\n",
" def start(self):\n",
" self.collaborators = self.runtime.collaborators\n",
" self.next(self.aggregated_model_validation, foreach='collaborators')\n",
"\n",
" @collaborator\n",
" def aggregated_model_validation(self):\n",
" print(f'Performing aggregated model validation for collaborator {self.input}')\n",
" self.agg_validation_score = inference(self.model, self.test_loader, self.batch_size)\n",
" print(f'{self.input} value of {self.agg_validation_score}')\n",
" self.next(self.train)\n",
"\n",
" @collaborator\n",
" def train(self):\n",
" x_train, y_train = self.train_loader\n",
" history = self.model.fit(\n",
" x_train, y_train,\n",
" batch_size=self.batch_size,\n",
" epochs=1,\n",
" verbose=1,\n",
" )\n",
" self.loss = history.history[\"loss\"][0]\n",
" self.next(self.local_model_validation)\n",
"\n",
" @collaborator\n",
" def local_model_validation(self):\n",
" self.local_validation_score = inference(self.model, self.test_loader, self.batch_size)\n",
" print(\n",
" f'Doing local model validation for collaborator {self.input}: {self.local_validation_score}')\n",
" self.next(self.join)\n",
"\n",
" @aggregator\n",
" def join(self, inputs):\n",
" self.average_loss = sum(input.loss for input in inputs) / len(inputs)\n",
" self.aggregated_model_accuracy = sum(\n",
" input.agg_validation_score for input in inputs) / len(inputs)\n",
" self.local_model_accuracy = sum(\n",
" input.local_validation_score for input in inputs) / len(inputs)\n",
" print(f'Average aggregated model validation values = {self.aggregated_model_accuracy}')\n",
" print(f'Average training loss = {self.average_loss}')\n",
" print(f'Average local model validation values = {self.local_model_accuracy}')\n",
" print(\"Taking FedAvg of models of all collaborators\")\n",
" self.model = FedAvg([input.model for input in inputs])\n",
"\n",
" self.next(self.internal_loop)\n",
"\n",
" @aggregator\n",
" def internal_loop(self):\n",
" if self.current_round == self.n_rounds:\n",
" self.next(self.end)\n",
" else:\n",
" self.current_round += 1\n",
" self.next(self.aggregated_model_validation, foreach='collaborators')\n",
"\n",
" @aggregator\n",
" def end(self):\n",
" print(\"Reached the end of the training flow; the model is ready to use!\")\n",
" loss, accuracy = self.model.evaluate(\n",
" X_test, Y_test,\n",
" 10000,\n",
" verbose=1)\n",
" accuracy_percentage = accuracy * 100\n",
" print(f\"Final Loss, Accuracy numbers: Avg. loss: {loss}, Accuracy: {accuracy_percentage:.2f}%\")"
]
},
{
"cell_type": "markdown",
"id": "cb67be11",
"metadata": {},
"source": [
"#### STEP#6: Call KerasMNISTWorkflow to train the model.\n",
"\n",
"At this point we are ready to train the model with the dataset downloaded from MNIST. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "366ee972",
"metadata": {},
"outputs": [],
"source": [
"flflow = KerasMNISTWorkflow(model, rounds=3, checkpoint=True)\n",
"flflow.runtime = local_runtime\n",
"flflow.run()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.9"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading