diff --git a/openfl-tutorials/experimental/workflow/404_Keras_MNIST_with_FedProx.ipynb b/openfl-tutorials/experimental/workflow/404_Keras_MNIST_with_FedProx.ipynb new file mode 100644 index 0000000000..69dbd4159a --- /dev/null +++ b/openfl-tutorials/experimental/workflow/404_Keras_MNIST_with_FedProx.ipynb @@ -0,0 +1,469 @@ +{ + "cells": [ + { + "attachments": {}, + "cell_type": "markdown", + "id": "14821d97", + "metadata": {}, + "source": [ + "# Workflow API Tutorial: Keras MNIST" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "bd059520", + "metadata": {}, + "source": [ + "This tutorial demonstrates how to train a model using TensorFlow Keras on the MNIST dataset, leveraging the new OpenFL Workflow Interface. The Workflow Interface provides a novel way to compose federated learning experiments with OpenFL, enabling researchers to handle non-IID data and perform federated averaging with optimizations like FedProx. Through this tutorial, you will learn how to set up the federated learning environment, define the flow, and execute the training process across multiple collaborators.\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "39c3d86a", + "metadata": {}, + "source": [ + "# What is it?" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "a7989e72", + "metadata": {}, + "source": [ + "The workflow interface is a new way of composing federated learning expermients with OpenFL. It was born through conversations with researchers and existing users who had novel use cases that didn't quite fit the standard horizontal federated learning paradigm. " + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "fc8e35da", + "metadata": {}, + "source": [ + "# Getting Started" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "4dbb89b6", + "metadata": {}, + "source": [ + "First we start by installing the necessary dependencies for the workflow interface. Note that this tuorial uses Keras 3, make sure you use python 3.9 or higher." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f7f98600", + "metadata": {}, + "outputs": [], + "source": [ + "%pip install tensorflow\n", + "%pip install openfl\n", + "%pip install -r workflow_interface_requirements.txt" + ] + }, + { + "cell_type": "markdown", + "id": "b6fecf9d", + "metadata": {}, + "source": [ + "Now, we import the relevant modules and do some basic initializations " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "88fdb191", + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import tensorflow as tf\n", + "\n", + "tf.data.experimental.enable_debug_mode()\n", + "tf.random.set_seed(0)\n", + "np.random.seed(0)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "7237eac4", + "metadata": {}, + "source": [ + "The FedAvg method is used in our federated learning architecture to aggregate model weights from multiple collaborators. In this approach, each collaborator trains a local model on their local data and then sends the model weights to a central aggregator. The aggregator runs the FedAvgWeights method to calculate the aggregated model weights by averaging the weights from all collaborators. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f2536320", + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "\n", + "def FedAvg(models):\n", + " new_model = models[0]\n", + " state_dicts = [model.weights for model in models]\n", + " state_dict = new_model.weights\n", + " for idx, _ in enumerate(models[1].weights):\n", + " state_dict[idx] = np.sum(np.array([state[idx]\n", + " for state in state_dicts], dtype=object), axis=0) / len(models)\n", + " new_model.set_weights(state_dict)\n", + " return new_model" + ] + }, + { + "cell_type": "markdown", + "id": "2b869066", + "metadata": {}, + "source": [ + "Now we come to the flow definition. The OpenFL Workflow Interface adopts the conventions set by Metaflow, that every workflow begins with `start` and concludes with the `end` task. The aggregator begins with an optionally passed in model and optimizer. The aggregator begins the flow with the `start` task, where the list of collaborators is extracted from the runtime (`self.collaborators = self.runtime.collaborators`) and is then used as the list of participants to run the task listed in `self.next`, `aggregated_model_validation`. The model, optimizer, and anything that is not explicitly excluded from the next function will be passed from the `start` function on the aggregator to the `aggregated_model_validation` task on the collaborator. Where the tasks run is determined by the placement decorator that precedes each task definition (`@aggregator` or `@collaborator`). Once each of the collaborators (defined in the runtime) complete the `aggregated_model_validation` task, they pass their current state onto the `train` task, from `train` to `local_model_validation`, and then finally to `join` at the aggregator. It is in `join` that an average is taken of the model weights, and the next round can begin." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cf76f85b", + "metadata": {}, + "outputs": [], + "source": [ + "from openfl.experimental.workflow.interface import FLSpec\n", + "from openfl.experimental.workflow.placement import aggregator, collaborator\n", + "import tensorflow.keras as keras\n", + "from tensorflow.keras import Sequential\n", + "from tensorflow.keras.layers import Dense\n", + "\n", + "def inference(model, test_loader, batch_size):\n", + " x_test, y_test = test_loader\n", + " loss, accuracy = model.evaluate(\n", + " x_test,\n", + " y_test,\n", + " batch_size=batch_size,\n", + " verbose=1\n", + " )\n", + " accuracy_percentage = accuracy * 100\n", + " print(f\"Test set: Avg. loss: {loss}, Accuracy: {accuracy_percentage:.2f}%\")\n", + " return accuracy\n", + "\n", + "class KerasMNISTFlow(FLSpec):\n", + " def __init__(self, model, rounds=3, **kwargs):\n", + " super().__init__(**kwargs)\n", + " self.model = model\n", + " self.n_rounds = rounds\n", + " self.current_round = 1\n", + "\n", + " @aggregator\n", + " def start(self):\n", + " self.collaborators = self.runtime.collaborators\n", + " self.next(self.aggregated_model_validation, foreach='collaborators')\n", + "\n", + " @collaborator\n", + " def aggregated_model_validation(self):\n", + " print(f'Performing aggregated model validation for collaborator {self.input}')\n", + " self.agg_validation_score = inference(self.model, self.test_loader, self.batch_size)\n", + " print(f'{self.input} value of {self.agg_validation_score}')\n", + " self.next(self.train)\n", + "\n", + " @collaborator\n", + " def train(self):\n", + " x_train, y_train = self.train_loader\n", + " print(f'x_train shape: {x_train.shape}, y_train shape: {y_train.shape}')\n", + " history = self.model.fit(\n", + " x_train, y_train,\n", + " batch_size=self.batch_size,\n", + " epochs=1,\n", + " verbose=1,\n", + " )\n", + " self.loss = history.history[\"loss\"][0]\n", + " self.next(self.local_model_validation)\n", + "\n", + " @collaborator\n", + " def local_model_validation(self):\n", + " self.local_validation_score = inference(self.model, self.test_loader, self.batch_size)\n", + " print(\n", + " f'Doing local model validation for collaborator {self.input}: {self.local_validation_score}')\n", + " self.next(self.join)\n", + "\n", + " @aggregator\n", + " def join(self, inputs):\n", + " self.average_loss = sum(input.loss for input in inputs) / len(inputs)\n", + " self.aggregated_model_accuracy = sum(\n", + " input.agg_validation_score for input in inputs) / len(inputs)\n", + " self.local_model_accuracy = sum(\n", + " input.local_validation_score for input in inputs) / len(inputs)\n", + " print(f'Average aggregated model validation values = {self.aggregated_model_accuracy}')\n", + " print(f'Average training loss = {self.average_loss}')\n", + " print(f'Average local model validation values = {self.local_model_accuracy}')\n", + " print(\"Taking FedAvg of models of all collaborators\")\n", + " self.model = FedAvg([input.model for input in inputs])\n", + "\n", + " self.next(self.internal_loop)\n", + "\n", + " @aggregator\n", + " def internal_loop(self):\n", + " if self.current_round == self.n_rounds:\n", + " self.next(self.end)\n", + " else:\n", + " self.current_round += 1\n", + " self.next(self.aggregated_model_validation, foreach='collaborators')\n", + "\n", + " @aggregator\n", + " def end(self):\n", + " print(f'This is the end of the flow')" + ] + }, + { + "cell_type": "markdown", + "id": "7777069b", + "metadata": {}, + "source": [ + "We simulate the data sharing in a lognormal distribution. We split the train data set to a train data set and local validation data set such that we can test the aggregated model on a new fresh data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "91ee01ea", + "metadata": {}, + "outputs": [], + "source": [ + "from tensorflow.keras.utils import to_categorical\n", + "from tensorflow.keras.datasets import mnist\n", + "import numpy as np\n", + "\n", + "VALID_PERCENT = 0.3\n", + "\n", + "nb_classes = 10\n", + "(X_train, y_train), (X_test, y_test) = mnist.load_data()\n", + "\n", + "X_train = X_train.astype(\"float32\")\n", + "X_test = X_test.astype(\"float32\")\n", + "X_train /= 255.0\n", + "X_test /= 255.0\n", + "\n", + "\n", + "split_on = int((1 - VALID_PERCENT) * len(X_train))\n", + "\n", + "train_images = X_train[0:split_on,:,:]\n", + "train_labels = to_categorical(y_train)[0:split_on,:]\n", + "\n", + "test_images = X_train[split_on:,:,:]\n", + "test_labels = to_categorical(y_train)[split_on:,:]\n", + "\n", + "valid_images = X_test\n", + "valid_labels = to_categorical(y_test)\n", + "\n", + "print(\"Training matrix shape\", train_images.shape)\n", + "print(\"Validation matrix shape\", valid_images.shape)\n", + "print(\"Test matrix shape\", test_images.shape)\n", + "\n", + "class LogNormalNumPyDataSplitter:\n", + " def __init__(self, num_clients, num_classes=10, mu=0, sigma=1):\n", + " self.num_clients = num_clients\n", + " self.num_classes = num_classes\n", + " self.mu = mu\n", + " self.sigma = sigma\n", + "\n", + " def split(self, images, labels):\n", + " # Convert labels to one-hot encoding if they are not already\n", + " if len(labels.shape) == 1 or labels.shape[1] == 1:\n", + " labels = tf.keras.utils.to_categorical(labels, num_classes=self.num_classes)\n", + " \n", + " # Get the class indices for each label\n", + " labels = np.argmax(labels, axis=1)\n", + " \n", + " # Initialize the indices for each client\n", + " client_indices = [[] for _ in range(self.num_clients)]\n", + " \n", + " # Calculate the lognormal proportions\n", + " props = np.random.lognormal(self.mu, self.sigma, (self.num_classes, self.num_clients))\n", + " props = props / np.sum(props, axis=1, keepdims=True)\n", + " \n", + " # Distribute the indices based on the lognormal proportions\n", + " for class_idx in range(self.num_classes):\n", + " class_indices = np.where(labels == class_idx)[0]\n", + " np.random.shuffle(class_indices)\n", + " \n", + " # Calculate the split points based on the proportions\n", + " split_points = (np.cumsum(props[class_idx])[:-1] * len(class_indices)).astype(int)\n", + " \n", + " split_indices = np.split(class_indices, split_points)\n", + " \n", + " for client_idx, indices in enumerate(split_indices):\n", + " client_indices[client_idx].extend(indices)\n", + " \n", + " return client_indices\n" + ] + }, + { + "cell_type": "markdown", + "id": "a093c92f", + "metadata": {}, + "source": [ + "Now we initiate federated components.\n", + "You'll notice in the `FederatedFlow` definition above that there were certain attributes that the flow was not initialized with, namely the `train_loader` and `test_loader` for each of the collaborators. These are **private_attributes** of the particular participant and (as the name suggests) are accessible ONLY to the particular participant's through its task. Additionally these private attributes are always filtered out of the current state when transferring from collaborator to aggregator, and vice versa.\n", + " \n", + "Users can directly specify a collaborator's private attributes via `collaborator.private_attributes` which is a dictionary where key is name of the attribute and value is the object that is made accessible to collaborator. In this example, we segment shards of the MNIST dataset for four collaborators: `Portland`, `Seattle`, `Chandler` and `Bangalore`. Each shard / slice of the dataset is assigned to collaborator's private_attribute.\n", + " \n", + "Note that the private attributes are flexible, and user can choose to pass in a completely different type of object to any of the collaborators or aggregator (with an arbitrary name)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "95469819", + "metadata": {}, + "outputs": [], + "source": [ + "from openfl.experimental.workflow.interface import Collaborator, Aggregator\n", + "from openfl.experimental.workflow.runtime import LocalRuntime\n", + "\n", + "def callable_to_initialize_collaborator_private_attributes(n_collaborators, index, train_dataset, test_dataset, batch_size):\n", + " train_splitter = LogNormalNumPyDataSplitter(num_clients=n_collaborators,num_classes=10, mu=0, sigma=1)\n", + " test_splitter = LogNormalNumPyDataSplitter(num_clients=n_collaborators,num_classes=10, mu=0, sigma=1)\n", + "\n", + "\n", + " X_train, y_train = train_dataset\n", + " X_test, y_test = test_dataset\n", + " train_idx = train_splitter.split(X_train, y_train)\n", + " valid_idx = test_splitter.split(X_test, y_test)\n", + "\n", + " train_dataset = X_train[train_idx[index]], y_train[train_idx[index]]\n", + " test_dataset = X_test[valid_idx[index]], y_test[valid_idx[index]]\n", + "\n", + " return {\n", + " \"train_loader\": train_dataset, \"test_loader\": test_dataset,\n", + " \"batch_size\": batch_size\n", + " }\n", + " \n", + "# # Setup participants\n", + "aggregator = Aggregator()\n", + "# aggregator.private_attributes = {}\n", + "collaborators = []\n", + "collaborator_names = ['Portland', 'Seattle', 'Chandler','Bangalore']\n", + "for idx, collaborator_name in enumerate(collaborator_names):\n", + " collaborators.append(\n", + " Collaborator(\n", + " name=collaborator_name, num_cpus=1, num_gpus=0,\n", + " private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n", + " n_collaborators=len(collaborator_names), index=idx, train_dataset=(train_images, train_labels),\n", + " test_dataset=(test_images, test_labels), batch_size=32\n", + " )\n", + " )\n", + "local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='single_process')\n", + "print(f'Local runtime collaborators = {local_runtime.collaborators}')\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now that we have all the building blocks in place, we define the model and run a validation session on the non trained model." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ca9a1118", + "metadata": {}, + "outputs": [], + "source": [ + "from tensorflow.keras.layers import Flatten, Dense, Dropout, Conv2D, MaxPool2D\n", + "from tensorflow.keras.models import Sequential\n", + "from tensorflow.keras.datasets import mnist\n", + "from tensorflow.keras.utils import to_categorical\n", + "from openfl.utilities.optimizers.keras import FedProxOptimizer\n", + "model = Sequential([\n", + " Conv2D(filters=32, kernel_size=(3, 3), activation=\"relu\", input_shape=(28, 28, 1)),\n", + " MaxPool2D(),\n", + " Flatten(),\n", + " Dense(512, activation=\"relu\"),\n", + " Dropout(0.2),\n", + " Dense(512, activation=\"relu\"),\n", + " Dropout(0.2),\n", + " Dense(nb_classes, activation=\"softmax\"),\n", + "])\n", + "\n", + "model.compile(optimizer=FedProxOptimizer(mu=0.01, learning_rate=0.01), loss=keras.losses.categorical_crossentropy, metrics=[\"accuracy\"])\n", + "model.summary()\n", + "\n", + "flflow = KerasMNISTFlow(model, rounds=3, checkpoint=True)\n", + "flflow.runtime = local_runtime\n", + "inference(flflow.model, (test_images, test_labels), 32)" + ] + }, + { + "cell_type": "markdown", + "id": "2b8a8315", + "metadata": {}, + "source": [ + "Now, let's train the model using the federated flow as we defined above." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "569f9ae2", + "metadata": {}, + "outputs": [], + "source": [ + "flflow.run()" + ] + }, + { + "cell_type": "markdown", + "id": "dce61095", + "metadata": {}, + "source": [ + "Now, let's train the model using the federated setup" + ] + }, + { + "cell_type": "markdown", + "id": "f278871e", + "metadata": {}, + "source": [ + "Now, after the training, let's evaluate the aggregated model accuracy again" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "289215be", + "metadata": {}, + "outputs": [], + "source": [ + "inference(flflow.model, (test_images, test_labels), 32)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "426f2395", + "metadata": {}, + "source": [ + "# Congratulations!\n", + "Now that you've completed your first workflow interface quickstart notebook, see some of the more advanced things you can do in our other tutorials, including:\n", + "\n", + "- Using the LocalRuntime Ray Backend for dedicated GPU access\n", + "- Vertical Federated Learning\n", + "- Model Watermarking\n", + "- Differential Privacy\n", + "- And More!" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/openfl/utilities/optimizers/keras/fedprox.py b/openfl/utilities/optimizers/keras/fedprox.py index 2e87648c20..64d467cdc8 100644 --- a/openfl/utilities/optimizers/keras/fedprox.py +++ b/openfl/utilities/optimizers/keras/fedprox.py @@ -6,12 +6,11 @@ import tensorflow as tf import tensorflow.keras as keras -from tensorflow.python.ops import standard_ops @keras.utils.register_keras_serializable() class FedProxOptimizer(keras.optimizers.Optimizer): - """FedProx optimizer. + """FedProx optimizer (Keras3 based API). Implements the FedProx algorithm as a Keras optimizer. FedProx is a federated learning optimization algorithm designed to handle non-IID data. @@ -24,123 +23,51 @@ class FedProxOptimizer(keras.optimizers.Optimizer): learning_rate (float): The learning rate for the optimizer. mu (float): The proximal term coefficient. """ - - def __init__(self, learning_rate=0.01, mu=0.01, name="FedProxOptimizer", **kwargs): - """ - Initialize the FedProxOptimizer. - - Args: - learning_rate (float, optional): The learning rate for the - optimizer. Defaults to 0.01. - mu (float, optional): The proximal term coefficient. Defaults} - to 0.01. - name (str, optional): The name of the optimizer. Defaults to - 'FedProxOptimizer'. - **kwargs: Additional keyword arguments. - """ - super().__init__(name=name, **kwargs) - - self._set_hyper("learning_rate", learning_rate) - self._set_hyper("mu", mu) - - self._lr_t = None - self._mu_t = None - - def _prepare(self, var_list): - """ - Prepare the optimizer's state. - - Args: - var_list (list): List of variables to be optimized. - """ - self._lr_t = tf.convert_to_tensor(self._get_hyper("learning_rate"), name="lr") - self._mu_t = tf.convert_to_tensor(self._get_hyper("mu"), name="mu") - - def _create_slots(self, var_list): - """Create slots for the optimizer's state. - - Args: - var_list (list): List of variables to be optimized. - """ - for v in var_list: - self.add_slot(v, "vstar") - - def _resource_apply_dense(self, grad, var): - """Apply gradients to variables. - - Args: - grad: Gradients. - var: Variables. - - Returns: - A tf.Operation that applies the specified gradients. - """ - lr_t = tf.cast(self._lr_t, var.dtype.base_dtype) - mu_t = tf.cast(self._mu_t, var.dtype.base_dtype) - vstar = self.get_slot(var, "vstar") - - var_update = var.assign_sub(lr_t * (grad + mu_t * (var - vstar))) - - return tf.group( - *[ - var_update, - ] - ) - - def _apply_sparse_shared(self, grad, var, indices, scatter_add): - """Apply sparse gradients to variables. - - Args: - grad: Gradients. - var: Variables. - indices: A tensor of indices into the first dimension of `var`. - scatter_add: A scatter operation. - - Returns: - A tf.Operation that applies the specified gradients. - """ - lr_t = tf.cast(self._lr_t, var.dtype.base_dtype) - mu_t = tf.cast(self._mu_t, var.dtype.base_dtype) - vstar = self.get_slot(var, "vstar") - v_diff = vstar.assign(mu_t * (var - vstar), use_locking=self._use_locking) - - with tf.control_dependencies([v_diff]): - scaled_grad = scatter_add(vstar, indices, grad) - var_update = var.assign_sub(lr_t * scaled_grad) - - return tf.group( - *[ - var_update, - ] + def __init__( + self, + learning_rate=0.01, + mu=0.0, + name="FedProxOptimizer", + **kwargs, + ): + super().__init__( + learning_rate=learning_rate, + name=name, + **kwargs, ) + self.mu = mu - def _resource_apply_sparse(self, grad, var): - """ - Apply sparse gradients to variables. + def build(self, variables): + """Initialize optimizer variables. Args: - grad: Gradients. - var: Variables. - - Returns: - A tf.Operation that applies the specified gradients. + var_list: list of model variables to build FedProx variables on. """ - return self._apply_sparse_shared(grad.values, var, grad.indices, standard_ops.scatter_add) + if self.built: + return + super().build(variables) + self.vstars = [] + for variable in variables: + self.vstars.append( + self.add_variable_from_reference( + reference_variable=variable, name="vstar" + ) + ) + + def update_step(self, gradient, variable, learning_rate): + """Update step given gradient and the associated model variable.""" + lr_t = tf.cast(learning_rate, variable.dtype) + mu_t = tf.cast(self.mu, variable.dtype) + gradient_t = tf.cast(gradient, variable.dtype) + vstar = self.vstars[self._get_variable_index(variable)] + + self.assign_sub(variable, lr_t * (gradient_t + mu_t * (variable - vstar))) def get_config(self): - """Return the config of the optimizer. - - An optimizer config is a Python dictionary (serializable) - containing the configuration of an optimizer. - The same optimizer can be reinstantiated later - (without any saved state) from this configuration. - - Returns: - dict: The optimizer configuration. - """ - base_config = super().get_config() - return { - **base_config, - "lr": self._serialize_hyperparameter("learning_rate"), - "mu": self._serialize_hyperparameter("mu"), - } + config = super().get_config() + config.update( + { + "mu": self.mu, + } + ) + return config