Skip to content

Commit

Permalink
Workflow based tutorial for Tensorflow CNN training on MNIST data
Browse files Browse the repository at this point in the history
  • Loading branch information
snehal-das committed Nov 20, 2024
1 parent 3c983ef commit aac5531
Showing 1 changed file with 320 additions and 0 deletions.
320 changes: 320 additions & 0 deletions openfl-tutorials/experimental/Keras_MNIST_with_CPU.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"id": "4dbb89b6",
"metadata": {},
"source": [
"Install pre-requisites and intialize data, model."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f7f98600",
"metadata": {},
"outputs": [],
"source": [
"#Install openfl and required packages for the workflow APIs to function\n",
"%pip install git+https://github.com/securefederatedai/openfl.git\n",
"%pip install -r workflow_interface_requirements.txt\n",
"\n",
"#Install Tensorflow and MNIST dataset if not installed\n",
"%pip install tensorflow==2.13\n",
"\n",
"# Uncomment this if running in Google Colab and set USERNAME if running in docker container.\n",
"# !pip install -r https://raw.githubusercontent.com/intel/openfl/develop/openfl-tutorials/experimental/workflow_interface_requirements.txt\n",
"# import os\n",
"# os.environ[\"USERNAME\"] = \"colab\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c2352436",
"metadata": {},
"outputs": [],
"source": [
"import tensorflow as tf\n",
"import tensorflow.python.keras as keras\n",
"import matplotlib.pyplot as plt\n",
"from keras import backend as K\n",
"from keras.utils import to_categorical\n",
"from keras.datasets import mnist\n",
"\n",
"nb_classes = 10\n",
"(X_train, y_train), (X_test, y_test) = mnist.load_data()\n",
"print(\"X_train original shape\", X_train.shape)\n",
"print(\"y_train original shape\", y_train.shape)\n",
"\n",
"# It is important to make sure that all values are scaled to the range [0..1] before you \n",
"# pass them to a neural network - it is the usual convention for data preparation, \n",
"# and all default weight initializations in neural networks are designed to work with this range.\n",
"# To achieve this:\n",
"# - Covert the integer values [0...255] to float32 \n",
"# - Divide each pixel value by 255 to get values in the range [0...1]\n",
"\n",
"X_train = X_train.astype(\"float32\")\n",
"X_test = X_test.astype(\"float32\")\n",
"X_train /= 255.0\n",
"X_test /= 255.0\n",
"\n",
"print(\"Training matrix shape\", X_train.shape)\n",
"print(\"Testing matrix shape\", X_test.shape)\n",
"\n",
"Y_train = to_categorical(y_train, nb_classes)\n",
"Y_test = to_categorical(y_test, nb_classes)"
]
},
{
"cell_type": "markdown",
"id": "7eb5da5d",
"metadata": {},
"source": [
"At this point, we have installed the necessary pre-requisites, imported required packages and downloaded the dataset.\n",
"Next we define the NN model, pre-process the data for learning and define helper functions for training."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "af50829f",
"metadata": {},
"outputs": [],
"source": [
"from keras.layers import Flatten, Dense, Dropout, Conv2D, MaxPool2D\n",
"from keras.models import Sequential\n",
"from keras.utils import to_categorical\n",
"import numpy as np\n",
"\n",
"# Because the output of a fully-connected layer is not normalized to be between 0 and 1, it cannot be thought of as probability. \n",
"# Moreover, if want outputs to be probabilities of different digits, they all need to add up to 1. \n",
"# To turn output vectors into probability vector, a function called Softmax is often used \n",
"# as the last activation function in a classification neural network. \n",
"# For example, softmax([−1,1,2])=[0.035,0.25,0.705].\n",
"model = Sequential([\n",
" Conv2D(filters=32, kernel_size=(3, 3), activation=\"relu\", input_shape=(28, 28, 1)),\n",
" MaxPool2D(), \n",
" Flatten(), # Converts the multi-dimensional feature map into a one-dimensional vector.\n",
" Dense(512, activation=\"relu\"), # A fully connected layer with 512 neurons, used to learn higher-level abstract features and representations.\n",
" Dropout(0.2), # Randomly sets 20% of the neurons' outputs to zero during training, avoids relying on a specific neuron.\n",
" Dense(512, activation=\"relu\"), # A fully connected layer with 512 neurons, used to learn higher-level abstract features and representations.\n",
" Dropout(0.2), # Randomly sets 20% of the neurons' outputs to zero during training, avoids overfitting.\n",
" Dense(nb_classes, activation=\"softmax\"), # To turn output vectors into probability vector,\n",
"])\n",
"\n",
"model.compile(optimizer=\"adam\", loss=\"categorical_crossentropy\", metrics=[\"accuracy\"])\n",
"print(model.summary())\n",
"\n",
"\n",
"def FedAvg(models):\n",
" new_model = models[0]\n",
" state_dicts = [model.weights for model in models]\n",
" state_dict = new_model.weights\n",
" for idx, _ in enumerate(models[1].weights):\n",
" state_dict[idx] = np.sum(np.array([state[idx]\n",
" for state in state_dicts], dtype=object), axis=0) / len(models)\n",
" new_model.set_weights(state_dict)\n",
" return new_model\n",
"\n",
"def inference(model, test_loader, batch_size):\n",
" x_test, y_test = test_loader\n",
" loss, accuracy = model.evaluate(\n",
" x_test,\n",
" y_test,\n",
" batch_size=batch_size,\n",
" verbose=0\n",
" )\n",
" accuracy_percentage = accuracy * 100\n",
" print(f\"Test set: Avg. loss: {loss}, Accuracy: {accuracy_percentage:.2f}%\")\n",
" return accuracy"
]
},
{
"cell_type": "markdown",
"id": "72bfdbed",
"metadata": {},
"source": [
"Initialize the aggregator and collaborators. Edit collaborator_names to add/remove collaborators."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3a5964c0",
"metadata": {},
"outputs": [],
"source": [
"from openfl.experimental.interface import FLSpec, Aggregator, Collaborator\n",
"from openfl.experimental.runtime import LocalRuntime\n",
"from openfl.experimental.placement import aggregator, collaborator\n",
"\n",
"agg = Aggregator()\n",
"\n",
"collaborator_names = [\"C1\", \"C2\"]\n",
"\n",
"def callable_to_initialize_collaborator_private_attributes(n_collaborators, index, train_dataset, test_dataset, batch_size):\n",
" from openfl.utilities.data_splitters import EqualNumPyDataSplitter\n",
" train_splitter = EqualNumPyDataSplitter()\n",
" test_splitter = EqualNumPyDataSplitter()\n",
"\n",
" X_train, y_train = train_dataset\n",
" X_test, y_test = test_dataset\n",
"\n",
" train_idx = train_splitter.split(y_train, n_collaborators)\n",
" valid_idx = test_splitter.split(y_test, n_collaborators)\n",
"\n",
" train_dataset = X_train[train_idx[index]], y_train[train_idx[index]]\n",
" test_dataset = X_test[valid_idx[index]], y_test[valid_idx[index]]\n",
"\n",
" return {\n",
" \"train_loader\": train_dataset, \"test_loader\": test_dataset,\n",
" \"batch_size\": batch_size\n",
" }\n",
"\n",
"# Setup collaborators private attributes via callable function\n",
"collaborators = []\n",
"for idx, collaborator_name in enumerate(collaborator_names):\n",
" collaborators.append(\n",
" Collaborator(\n",
" name=collaborator_name,\n",
" num_cpus=1,\n",
" num_gpus=0,\n",
" private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n",
" n_collaborators=len(collaborator_names),\n",
" index=idx,\n",
" train_dataset=(X_train, Y_train),\n",
" test_dataset=(X_test, Y_test),\n",
" batch_size=64\n",
" )\n",
" )\n",
"\n",
"local_runtime = LocalRuntime(aggregator=agg, collaborators=collaborators, backend=\"ray\")\n",
"print(f'Local runtime collaborators = {local_runtime.collaborators}')"
]
},
{
"cell_type": "markdown",
"id": "e6ba622b",
"metadata": {},
"source": [
"Define the workflow needed to train the model using the data and participants."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "819d60e6",
"metadata": {},
"outputs": [],
"source": [
"class KerasMNISTWorkflow(FLSpec):\n",
" def __init__(self, model, rounds=3, **kwargs):\n",
" super().__init__(**kwargs)\n",
" self.model = model\n",
" self.n_rounds = rounds\n",
" self.current_round = 1\n",
"\n",
" @aggregator\n",
" def start(self):\n",
" self.collaborators = self.runtime.collaborators\n",
" self.next(self.aggregated_model_validation, foreach='collaborators')\n",
"\n",
" @collaborator\n",
" def aggregated_model_validation(self):\n",
" print(f'Performing aggregated model validation for collaborator {self.input}')\n",
" self.agg_validation_score = inference(self.model, self.test_loader, self.batch_size)\n",
" print(f'{self.input} value of {self.agg_validation_score}')\n",
" self.next(self.train)\n",
"\n",
" @collaborator\n",
" def train(self):\n",
" x_train, y_train = self.train_loader\n",
" history = self.model.fit(\n",
" x_train, y_train,\n",
" batch_size=self.batch_size,\n",
" epochs=1,\n",
" verbose=1,\n",
" )\n",
" self.loss = history.history[\"loss\"][0]\n",
" self.next(self.local_model_validation)\n",
"\n",
" @collaborator\n",
" def local_model_validation(self):\n",
" self.local_validation_score = inference(self.model, self.test_loader, self.batch_size)\n",
" print(\n",
" f'Doing local model validation for collaborator {self.input}: {self.local_validation_score}')\n",
" self.next(self.join)\n",
"\n",
" @aggregator\n",
" def join(self, inputs):\n",
" self.average_loss = sum(input.loss for input in inputs) / len(inputs)\n",
" self.aggregated_model_accuracy = sum(\n",
" input.agg_validation_score for input in inputs) / len(inputs)\n",
" self.local_model_accuracy = sum(\n",
" input.local_validation_score for input in inputs) / len(inputs)\n",
" print(f'Average aggregated model validation values = {self.aggregated_model_accuracy}')\n",
" print(f'Average training loss = {self.average_loss}')\n",
" print(f'Average local model validation values = {self.local_model_accuracy}')\n",
" print(\"Taking FedAvg of models of all collaborators\")\n",
" self.model = FedAvg([input.model for input in inputs])\n",
"\n",
" self.next(self.internal_loop)\n",
"\n",
" @aggregator\n",
" def internal_loop(self):\n",
" if self.current_round == self.n_rounds:\n",
" self.next(self.end)\n",
" else:\n",
" self.current_round += 1\n",
" self.next(self.aggregated_model_validation, foreach='collaborators')\n",
"\n",
" @aggregator\n",
" def end(self):\n",
" print(\"Reached the end of the training flow; the model is ready to use!\")"
]
},
{
"cell_type": "markdown",
"id": "f0ff1a9f",
"metadata": {},
"source": [
"At this point we are ready to train the model with the dataset downloaded from MNIST. Call KerasMNISTWorkflow to train the model."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e6c73353",
"metadata": {},
"outputs": [],
"source": [
"flflow = KerasMNISTWorkflow(model, rounds=3, checkpoint=True)\n",
"flflow.runtime = local_runtime\n",
"flflow.run()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.9"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

0 comments on commit aac5531

Please sign in to comment.