From c2371bec46de355412e728b7f19a380e6e3df78d Mon Sep 17 00:00:00 2001 From: kagrawa2 Date: Tue, 19 Nov 2024 22:28:51 -0800 Subject: [PATCH] Add workflow based linear regression tutorial Signed-off-by: kagrawa2 --- .../Numpy_Linear_Regression_Workflow.ipynb | 465 ++++++++++++++++++ 1 file changed, 465 insertions(+) create mode 100644 openfl-tutorials/experimental/Numpy_Linear_Regression_Workflow.ipynb diff --git a/openfl-tutorials/experimental/Numpy_Linear_Regression_Workflow.ipynb b/openfl-tutorials/experimental/Numpy_Linear_Regression_Workflow.ipynb new file mode 100644 index 00000000000..28e75b28308 --- /dev/null +++ b/openfl-tutorials/experimental/Numpy_Linear_Regression_Workflow.ipynb @@ -0,0 +1,465 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Below code will display the print statement output on screen as well\n", + "import sys\n", + "import io\n", + "#sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')\n", + "\n", + "!pip install git+https://github.com/kagrawa2/openfl.git\n", + "!pip install -r workflow_interface_requirements.txt\n", + "!pip install matplotlib\n", + "\n", + "# Uncomment this if running in Google Colab and set USERNAME if running in docker container.\n", + "#!pip install -r https://raw.githubusercontent.com/intel/openfl/develop/openfl-tutorials/experimental/workflow_interface_requirements.txt\n", + "#import os\n", + "#os.environ[\"USERNAME\"] = \"colab\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import torch as pt\n", + "import torch.utils.data as data\n", + "import torch.nn as nn\n", + "import torch.nn.functional as F\n", + "import torch.optim as optim\n", + "import numpy as np\n", + "from typing import List, Union\n", + "import random\n", + "import matplotlib.pyplot as plt\n", + "%matplotlib inline\n", + "from matplotlib.pylab import rcParams\n", + "\n", + "import warnings\n", + "warnings.filterwarnings(\"ignore\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "rcParams['figure.figsize'] = 7, 5\n", + "\n", + "class LinRegLasso:\n", + " def __init__(self, n_feat: int) -> None:\n", + " self.weights = np.ones((n_feat + 1)) # (n_feat + 1,) weights + bias\n", + "\n", + " def predict(self, feature_vector: Union[np.ndarray, List[int]]) -> float:\n", + " feature_vector = np.array(feature_vector)\n", + " if len(feature_vector.shape) == 1:\n", + " feature_vector = feature_vector[:,np.newaxis]\n", + " assert feature_vector.shape[-1] == self.weights.shape[0] - 1, \\\n", + " f\"sample shape is {feature_vector.shape} and weights shape is f{self.weights}\"\n", + "\n", + " return self.weights @ np.concatenate((feature_vector.T, [[1]*feature_vector.shape[0]]))\n", + "\n", + " def mse(self, X: np.ndarray, Y: np.ndarray) -> float:\n", + " Y_hat = self.predict(X)\n", + " return np.sum((Y - Y_hat)**2) / Y.shape[0]\n", + "\n", + " def _update_weights(self, X: np.ndarray, Y: np.ndarray, lr: float, wd: float) -> None:\n", + " predictions = self.predict(X)\n", + " error = Y - predictions # (n_samples,)\n", + " X_with_bias = np.concatenate((X.T, [[1]*X.shape[0]])).T\n", + " updates = -2 * X_with_bias.T @ error / Y.shape[0]\n", + " regression_term = np.sign(self.weights)\n", + "\n", + " self.weights = self.weights - lr * updates + wd * regression_term\n", + "\n", + " def fit(self, X: np.ndarray, Y: np.ndarray,\n", + " n_epochs: int, lr: float, wd: float,\n", + " silent: bool=False) -> None:\n", + " for i in range(n_epochs):\n", + " self._update_weights(X, Y, lr, wd)\n", + " mse = self.mse(X, Y)\n", + " if not silent:\n", + " print(f'epoch: {i}, \\t MSE: {mse}')\n", + "\n", + " def train(self, train_data, lr, wd, epoches):\n", + " # Initialize lists to store all data\n", + " all_X = []\n", + " all_Y = []\n", + "\n", + " # Iterate through the DataLoader to get all data\n", + " for batch_idx, (X_batch, Y_batch) in enumerate(train_data):\n", + " all_X.append(X_batch.numpy()) # Convert to NumPy and append\n", + " all_Y.append(Y_batch.numpy()) # Convert to NumPy and append\n", + "\n", + " # Concatenate all batches into NumPy arrays\n", + " X = np.concatenate(all_X, axis=0)\n", + " Y = np.concatenate(all_Y, axis=0)\n", + " self.fit(X, Y, epoches, lr, wd, silent=True)\n", + " train_mse = self.mse(X, Y)\n", + " return train_mse\n", + "\n", + "def validate(my_model, val_loader):\n", + " total_mse = 0.0\n", + " num_samples = 0\n", + "\n", + " for batch_idx, (X, Y) in enumerate(val_loader):\n", + " X = X.numpy()\n", + " Y = Y.numpy()\n", + "\n", + " total_mse += my_model.mse(X, Y) * len(X) # Multiply MSE by batch size\n", + " num_samples += len(X) # Count total number of samples\n", + "\n", + " validation_mse = total_mse / num_samples # Average MSE across all samples\n", + " return validation_mse" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Define input array with angles from 60deg to 300deg converted to radians\n", + "x = np.array([i*np.pi/180 for i in range(60,300,4)])\n", + "np.random.seed(10) # Setting seed for reproducibility\n", + "y = np.sin(x) + np.random.normal(0,0.15,len(x))\n", + "#plt.plot(x,y,'.')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "lr_model = LinRegLasso(1)\n", + "wd = 0.0001\n", + "lr = 0.08\n", + "epochs = 100\n", + "\n", + "print(f\"Initial MSE: {lr_model.mse(x,y)}\")\n", + "lr_model.fit(x[:,np.newaxis],y, epochs, lr, wd, silent=True)\n", + "print(f\"Final MSE: {lr_model.mse(x,y)}\")\n", + "print(f\"Final parameters: {lr_model.weights}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def FedAvg(models, weights=None):\n", + " \"\"\"\n", + " Simulates federated averaging for LinRegLasso models.\n", + "\n", + " Args:\n", + " models (list): List of LinRegLasso model instances from collaborators.\n", + "\n", + " Returns:\n", + " LinRegLasso: Updated global model with averaged weights.\n", + " \"\"\"\n", + " new_model = models[0] # Use the first model as a base\n", + " all_weights = [model.weights for model in models] # Get weights from all models\n", + " new_model.weights = np.average(all_weights, axis=0, weights=weights) # Average the weights\n", + " return new_model" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from openfl.experimental.interface import FLSpec, Aggregator, Collaborator\n", + "from openfl.experimental.runtime import LocalRuntime\n", + "from openfl.experimental.placement import aggregator, collaborator\n", + "\n", + "class FederatedFlow(FLSpec):\n", + "\n", + " def __init__(self, model=None, optimizer=None, rounds=3, **kwargs):\n", + " super().__init__(**kwargs)\n", + " self.model = model\n", + " self.optimizer = optimizer\n", + " self.rounds = rounds\n", + " self.aggregated_mse_history = []\n", + " self.train_loss_history = [] # Initialize loss history\n", + "\n", + " @aggregator\n", + " def start(self):\n", + " print(f'Performing initialization for model')\n", + " self.collaborators = self.runtime.collaborators\n", + " self.private = 10\n", + " self.current_round = 0\n", + " self.next(self.aggregated_model_validation, foreach='collaborators')\n", + "\n", + " @collaborator\n", + " def aggregated_model_validation(self):\n", + " print(f'Performing aggregated model validation for collaborator {self.input}')\n", + " #print(f'Private {self.private}')\n", + " self.agg_validation_score = validate(self.model, self.test_loader)\n", + " print(f'{self.input} value of {self.agg_validation_score}')\n", + " self.next(self.train)\n", + "\n", + " @collaborator\n", + " def train(self):\n", + " self.wd = 0.0001\n", + " self.lr = 0.08\n", + " self.epochs = 100\n", + " self.loss = self.model.train(self.train_loader, self.lr, self.wd, self.epochs)\n", + " self.training_completed = True\n", + " self.next(self.local_model_validation)\n", + "\n", + " @collaborator\n", + " def local_model_validation(self):\n", + " self.local_validation_score = validate(self.model, self.test_loader)\n", + " print(\n", + " f'Doing local model validation for collaborator {self.input}: {self.local_validation_score}')\n", + " self.next(self.join, exclude=['training_completed'])\n", + "\n", + " @aggregator\n", + " def join(self, inputs):\n", + " self.average_train_loss = sum(input.loss for input in inputs) / len(inputs)\n", + " self.aggregated_model_mse = sum(\n", + " input.agg_validation_score for input in inputs) / len(inputs)\n", + " self.local_model_mse = sum(\n", + " input.local_validation_score for input in inputs) / len(inputs)\n", + " self.train_loss_history.append(self.average_train_loss) # Append loss to history\n", + " self.aggregated_mse_history.append(self.aggregated_model_mse)\n", + " print(f'Average aggregated model validation values = {self.aggregated_model_mse}')\n", + " print(f'Average training loss = {self.average_train_loss}')\n", + " print(f'Average local model validation values = {self.local_model_mse}')\n", + " self.model = FedAvg([input.model for input in inputs])\n", + " self.optimizer = [input.optimizer for input in inputs][0]\n", + " \n", + " self.current_round += 1\n", + " if self.current_round < self.rounds:\n", + " self.next(self.aggregated_model_validation,\n", + " foreach='collaborators', exclude=['private'])\n", + " else:\n", + " self.next(self.end)\n", + "\n", + " @aggregator\n", + " def end(self):\n", + " print(f'This is the end of the flow')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def generate_synthetic(rank: int, n_samples: int, noise: float):\n", + " \"\"\"\n", + " Generate synthetic data for linear regression.\n", + "\n", + " Args:\n", + " rank (int): Seed for random number generation.\n", + " n_samples (int): Number of samples to generate.\n", + " noise (float): Standard deviation of the Gaussian noise added to the y values.\n", + "\n", + " Returns:\n", + " np.ndarray: Generated x values.\n", + " np.ndarray: Generated y values.\n", + " \"\"\"\n", + " np.random.seed(rank)\n", + " n_samples = max(n_samples, 10)\n", + " interval = 240\n", + " x_start = 60\n", + "\n", + " x = np.random.rand(n_samples, 1) * interval + x_start\n", + " x *= np.pi / 180\n", + "\n", + " y = np.sin(x) + np.random.normal(0, noise, size=(n_samples,1))\n", + " y = y.reshape(-1)\n", + "\n", + " return x, y\n", + "\n", + "class SyntheticFederatedDataset:\n", + " def __init__(self, num_collaborators=2, batch_size=1, num_samples=10, **kwargs):\n", + " self.batch_size = batch_size\n", + " X, y = generate_synthetic(rank=42, n_samples=num_samples, noise=0.1)\n", + " X = np.array(X, dtype=np.float32)\n", + " y = np.array(y, dtype=np.float32)\n", + "\n", + " self.X_train_all = X[:int(0.8 * len(X))]\n", + " self.y_train_all = y[:int(0.8 * len(y))]\n", + " \n", + " self.X_test_all = X[int(0.8 * len(X)):]\n", + " self.y_test_all = y[int(0.8 * len(y)):]\n", + "\n", + " min_samples = max(len(self.X_train_all) // num_collaborators, 1)\n", + "\n", + " while len(self.X_test_all) < num_collaborators or any(len(chunk) < 1 for chunk in self.X_train_all):\n", + " X, y = generate_synthetic(rank=42, n_samples=len(self.X_train_all) + len(self.X_valid_all) + 1, noise=0.1)\n", + " X = np.array(X, dtype=np.float32)\n", + " y = np.array(y, dtype=np.float32)\n", + " self.X_train_all = X[:int(0.9 * len(X))]\n", + " self.X_test_all = X[int(0.9 * len(X)):]\n", + " self.y_train_all = y[:int(0.9 * len(y))]\n", + " self.y_test_all = y[int(0.9 * len(y)):]\n", + "\n", + " self.X_train_all = np.array_split(self.X_train_all, num_collaborators)\n", + " self.X_test_all = np.array_split(self.X_test_all, num_collaborators)\n", + " self.y_train_all = np.array_split(self.y_train_all, num_collaborators)\n", + " self.y_test_all = np.array_split(self.y_test_all, num_collaborators)\n", + "\n", + " def split(self, index):\n", + " return {\n", + " \"train_loader\":\n", + " data.DataLoader(\n", + " data.TensorDataset(\n", + " pt.from_numpy(self.X_train_all[index]),\n", + " pt.from_numpy(self.y_train_all[index])\n", + " ),\n", + " batch_size=self.batch_size, shuffle=True\n", + " ),\n", + " \"test_loader\":\n", + " data.DataLoader(\n", + " data.TensorDataset(\n", + " pt.from_numpy(self.X_test_all[index]),\n", + " pt.from_numpy(self.y_test_all[index])\n", + " ),\n", + " batch_size=self.batch_size, shuffle=True\n", + " )\n", + " }" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "aggregator = Aggregator()\n", + "aggregator.private_attributes = {}\n", + "\n", + "collaborator_names = ['Coll-1', 'Coll-2', 'Coll-3', 'Coll-4']\n", + "collaborators = [Collaborator(name=name) for name in collaborator_names]\n", + "synthetic_federated_dataset = SyntheticFederatedDataset(num_collaborators=len(collaborator_names), num_samples=2000, batch_size=20)\n", + "\n", + "def callable_to_initialize_collaborator_private_attributes(index):\n", + " private_attributes = synthetic_federated_dataset.split(index)\n", + " return private_attributes\n", + "\n", + "collaborators = []\n", + "for idx, collaborator_name in enumerate(collaborator_names):\n", + " collaborators.append(\n", + " Collaborator(\n", + " name=collaborator_name, num_cpus=0.0, num_gpus=0.0,\n", + " private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n", + " index=idx\n", + " )\n", + " )\n", + "\n", + "local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='single_process')\n", + "print(f'Local runtime collaborators = {local_runtime.collaborators}')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "model = lr_model\n", + "optimizer = None\n", + "flflow = FederatedFlow(model, optimizer, rounds=10, checkpoint=True)\n", + "flflow.runtime = local_runtime\n", + "flflow.run()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(f'\\nFinal aggregated MSE for {flflow.rounds} rounds of training: {flflow.aggregated_model_mse}')\n", + "print(f'\\nFinal loss model for {flflow.rounds} rounds of training: {flflow.train_loss_history}')\n", + "print(f'\\nFinal parameters: {flflow.model.weights}')\n", + "print(f'\\n Aggregated model MSE History : {flflow.aggregated_mse_history}')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "plt.plot(range(1, flflow.rounds + 1), flflow.train_loss_history)\n", + "plt.xlabel('Epoch')\n", + "plt.ylabel('Loss (MSE)')\n", + "plt.title('Loss Function during Training')\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "plt.plot(range(1, flflow.rounds + 1), flflow.aggregated_mse_history)\n", + "plt.xlabel('Round')\n", + "plt.ylabel('Aggregated Model MSE')\n", + "plt.title('Aggregated Model MSE over Rounds')\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "n_cols = 20\n", + "n_samples = 4\n", + "interval = 240\n", + "x_start = 60\n", + "noise = 0.3\n", + "\n", + "X = None\n", + "final_model = flflow.model\n", + "for rank in range(n_cols):\n", + " np.random.seed(rank) # Setting seed for reproducibility\n", + " x = np.random.rand(n_samples, 1) * interval + x_start\n", + " x *= np.pi / 180\n", + " X = x if X is None else np.vstack((X,x))\n", + " y = np.sin(x) + np.random.normal(0, noise, size=(n_samples, 1))\n", + " plt.plot(x,y,'+')\n", + " \n", + "X.sort() \n", + "Y_hat = final_model.predict(X)\n", + "plt.plot(X,Y_hat,'--')" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}