From 1fd421575cecbb6680d265ba7f8b4bd1283933e8 Mon Sep 17 00:00:00 2001 From: rajith Date: Thu, 21 Nov 2024 16:16:50 +0530 Subject: [PATCH] adding scikit learn linear regression workflow api --- ...kit_Learn_Linear_Regression_Workflow.ipynb | 572 ++++++++++++++++++ .../workflow_interface_requirements.txt | 7 + 2 files changed, 579 insertions(+) create mode 100644 openfl-tutorials/experimental/106_Scikit_Learn_Linear_Regression_Workflow.ipynb diff --git a/openfl-tutorials/experimental/106_Scikit_Learn_Linear_Regression_Workflow.ipynb b/openfl-tutorials/experimental/106_Scikit_Learn_Linear_Regression_Workflow.ipynb new file mode 100644 index 0000000000..15b4ed0587 --- /dev/null +++ b/openfl-tutorials/experimental/106_Scikit_Learn_Linear_Regression_Workflow.ipynb @@ -0,0 +1,572 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "9a61e30e-83d1-422d-aa0a-ebd4376197ec", + "metadata": {}, + "source": [ + "# Scikit-learn Linear Regression Tutorial using Workflow Interface with Ridge Regularization\n", + "\n", + "\n", + "This tutorial demonstrates how to train a linear regression model using scikit-learn with Ridge regularization on a dataset, leveraging the new FedAI Workflow Interface. The Workflow Interface provides a novel way to compose federated learning experiments with OpenFL, enabling researchers to handle non-IID data and perform federated averaging with optimizations like FedProx. Through this tutorial, you will learn how to set up the federated learning environment, define the flow, and execute the training process across multiple collaborators\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "id": "c4b334ef-6a72-4b82-b978-1401973d0512", + "metadata": { + "tags": [] + }, + "source": [ + "## We will use MSE as loss function and Ridge weights regularization\n", + "![image.png](https://www.analyticsvidhya.com/wp-content/uploads/2016/01/eq5-1.png)" + ] + }, + { + "cell_type": "markdown", + "id": "b9e4fca5-559a-4fe7-9d3a-8f365637a0dd", + "metadata": {}, + "source": [ + "## What is it?\n", + "\n", + "The Workflow Interface is a new way of composing federated learning experiments with OpenFL. It was developed through conversations with researchers and existing users who had novel use cases that didn't quite fit the standard horizontal federated learning paradigm." + ] + }, + { + "cell_type": "markdown", + "id": "1cef8056-1d16-4157-8686-7e2165d570bb", + "metadata": {}, + "source": [ + "## Getting Started" + ] + }, + { + "cell_type": "markdown", + "id": "2843ae32-23b7-4323-a468-0426c9daabf0", + "metadata": {}, + "source": [ + "First we start by installing the necessary dependencies for the workflow interface" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "689ee822", + "metadata": {}, + "outputs": [], + "source": [ + "!pip install -r workflow_interface_requirements.txt" + ] + }, + { + "cell_type": "markdown", + "id": "3b436c8b", + "metadata": {}, + "source": [ + "Now, we import the relevant modules and do some basic initializations" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6c9eee14-22a1-4d48-a7da-e68d01037cd4", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from typing import List, Union\n", + "\n", + "from sklearn.linear_model import Lasso\n", + "from sklearn.preprocessing import StandardScaler\n", + "from sklearn.datasets import make_regression\n", + "from sklearn.metrics import mean_squared_error\n", + "import numpy as np\n", + "import random\n", + "import matplotlib.pyplot as plt\n", + "%matplotlib inline\n", + "from matplotlib.pylab import rcParams\n", + "rcParams['figure.figsize'] = 7, 5" + ] + }, + { + "cell_type": "markdown", + "id": "ea9cc215-b48e-4776-9c55-c04e7574e156", + "metadata": {}, + "source": [ + "## Implementing a Scikit Linear Regression Model with Lasso Regularization\n", + "\n", + "The following section provides an implementation of a linear regression model using scikit-learn's Lasso (L1 regularization). The SklearnLinearRegressionLasso class includes methods for fitting the model, making predictions, calculating mean squared error (MSE), and printing the model parameters." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f4cc8ec2-b818-4db8-8700-39c1a12917df", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "class SklearnLinearRegressionLasso:\n", + " def __init__(self, n_feat: int, alpha: float = 1.0) -> None:\n", + " self.model = Lasso(alpha=alpha)\n", + " self.scaler = StandardScaler()\n", + " self.weights = np.ones((n_feat + 1)) \n", + " \n", + " def predict(self, feature_vector: Union[np.ndarray, List[int]]) -> float:\n", + " '''\n", + " feature_vector may be a list or have shape (n_feat,)\n", + " or it may be a bunch of vectors (n_vec, nfeat)\n", + " '''\n", + " feature_vector = np.array(feature_vector)\n", + " if len(feature_vector.shape) == 1:\n", + " feature_vector = feature_vector[:,np.newaxis]\n", + " \n", + " feature_vector = self.scaler.transform(feature_vector)\n", + " return self.model.predict(feature_vector)\n", + " \n", + " def mse(self, X: np.ndarray, Y: np.ndarray) -> float:\n", + " Y_predict = self.predict(X)\n", + " return mean_squared_error(Y, Y_predict)\n", + " \n", + " def fit(self, X: np.ndarray, Y: np.ndarray, silent: bool=False) -> None:\n", + " \n", + " X = self.scaler.fit_transform(X)\n", + " self.model.fit(X, Y)\n", + " mse = self.mse(X, Y)\n", + " #self.weights[:-1] = self.model.coef_\n", + " #self.weights[-1] = self.model.intercept_\n", + " if not silent:\n", + " print(f'MSE: {mse}')\n", + " \n", + " def print_parameters(self) -> None:\n", + " print('Final parameters: ')\n", + " print(f'Weights: {self.model.coef_}')\n", + " print(f'Bias: {self.model.intercept_}')\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "af89e7e5-6cfc-46bc-acd2-7d5bfb373091", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Define input array with angles from 60deg to 400deg converted to radians\n", + "x = np.array([i*np.pi/180 for i in range(60,400,4)])\n", + "np.random.seed(10) # Setting seed for reproducibility\n", + "y = np.sin(x) + np.random.normal(0,0.15,len(x))\n", + "# plt.plot(x,y,'.')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ffefca2b-d7f6-4111-8872-c017c182a2de", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Initialize the model\n", + "lr_model = SklearnLinearRegressionLasso(n_feat=1, alpha=0.1)\n", + "\n", + "# Fit the model\n", + "lr_model.fit(x[:,np.newaxis], y)\n", + "\n", + "#print the final parameters\n", + "lr_model.print_parameters()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "410f2d80-989a-43ab-958f-7b68fd8f2e90", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# We can also solve this 1D problem using Numpy\n", + "numpy_solution = np.polyfit(x,y,1)\n", + "predictor_np = np.poly1d(numpy_solution)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6cb323db-9f3a-42af-94da-4b170adef867", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Predict using the model\n", + "y_hat = lr_model.predict(x)\n", + "# Plot the results\n", + "y_np = predictor_np(x)\n", + "plt.plot(x,y,'.')\n", + "plt.plot(x,y_hat,'.')\n", + "plt.plot(x,y_np,'--')" + ] + }, + { + "cell_type": "markdown", + "id": "ffd4d2d7-5537-496a-88c1-301da87d979c", + "metadata": {}, + "source": [ + "## Now we run the same training on federated learning workflow api" + ] + }, + { + "cell_type": "markdown", + "id": "66cb4ecf-cb27-4367-891a-6cea2f1d347f", + "metadata": {}, + "source": [ + "## Test on a Federation" + ] + }, + { + "cell_type": "markdown", + "id": "08527aab-4b0f-472b-af0c-27ed4ade85c1", + "metadata": {}, + "source": [ + "## Import required libraries for federated learning" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1b3c0039-e1f7-4047-b98b-a2d4bd42f015", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Import ncessary libraries\n", + "import numpy as np\n", + "from sklearn.linear_model import Lasso\n", + "from sklearn.metrics import mean_squared_error\n", + "from sklearn.datasets import make_regression\n", + "from openfl.experimental.interface import FLSpec\n", + "from openfl.experimental.placement import aggregator, collaborator\n", + "from openfl.experimental.runtime import FederatedRuntime\n", + "from sklearn.model_selection import train_test_split\n", + "\n", + "from tensorflow.keras.utils import to_categorical\n", + "from tensorflow.keras.datasets import mnist\n", + "import numpy as np\n", + "\n", + "VALID_PERCENT = 0.3\n", + "\n", + "# Splitting dataset into train and test set \n", + "X_train, X_test, Y_train, Y_test = train_test_split(x, y, test_size=1/3, random_state=0)\n", + "\n", + "\n", + "print(\"Training matrix shape\", X_train.shape)\n", + "print(\"Test matrix shape\", X_test.shape)" + ] + }, + { + "cell_type": "markdown", + "id": "052a4195-a410-4983-84b9-942159d9d345", + "metadata": {}, + "source": [ + "## Shard Splitter Class\n", + "Define a helper class to split the data into shards for federated learning." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b1985ac9-a2b1-4561-a962-6adfe35c3b97", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Import necessary libraries for federated learning\n", + "from openfl.experimental.interface import Collaborator, Aggregator\n", + "from openfl.experimental.runtime import LocalRuntime\n", + "\n", + "# Define a callable to initialize collaborator private attributes\n", + "def callable_to_initialize_collaborator_private_attributes(n_collaborators, index, train_dataset, test_dataset, batch_size):\n", + " \n", + " train_splitter = ShardSplitter(n_collaborators)\n", + " X_train, Y_train = train_dataset\n", + " X_test, Y_test = test_dataset\n", + "\n", + " train_idx = train_splitter.split(X_train, Y_train)\n", + " valid_idx = train_splitter.split(X_test, Y_test)\n", + "\n", + " train_dataset = X_train[train_idx[index]], Y_train[train_idx[index]]\n", + " test_dataset = X_test[valid_idx[index]], Y_test[valid_idx[index]]\n", + "\n", + " return {\n", + " \"train_loader\": train_dataset, \"test_loader\": test_dataset,\n", + " \"batch_size\": batch_size\n", + " }\n", + " \n", + "# # Setup participants\n", + "aggregator = Aggregator()\n", + "# aggregator.private_attributes = {}\n", + "collaborators = []\n", + "collaborator_names = ['Portland', 'Seattle', 'Chandler','Bangalore']\n", + "for idx, collaborator_name in enumerate(collaborator_names):\n", + " collaborators.append(\n", + " Collaborator(\n", + " name=collaborator_name, num_cpus=0, num_gpus=0.3,\n", + " private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n", + " n_collaborators=len(collaborator_names), index=idx, train_dataset=(X_train, Y_train),\n", + " test_dataset=(X_test, Y_test), batch_size=32\n", + " )\n", + " )\n", + "local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='single_process')\n", + "print(f'Local runtime collaborators = {local_runtime.collaborators}')\n" + ] + }, + { + "cell_type": "markdown", + "id": "a5cf193b-62a2-403a-96b1-5fa716d9087f", + "metadata": {}, + "source": [ + "## Federated Learning Helper Functions\n", + "Define helper functions for training and validating the federated models." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "42187367-6a96-4d78-a576-f7154e8f987f", + "metadata": {}, + "outputs": [], + "source": [ + "# Define a class to split the dataset into shards\n", + "class ShardSplitter:\n", + " def __init__(self, num_shards):\n", + " self.num_shards = num_shards\n", + "\n", + " def split(self, X, y):\n", + " \"\"\"Split the given 2D numpy arrays X and y into equal shards and return list of indexes for each shard.\"\"\"\n", + " num_samples = X.shape[0]\n", + " shard_size = num_samples // self.num_shards\n", + " indexes = np.arange(num_samples)\n", + " #np.random.shuffle(indexes)\n", + " \n", + " shards = []\n", + " for i in range(self.num_shards):\n", + " start_idx = i * shard_size\n", + " if i == self.num_shards - 1:\n", + " # Include any remaining samples in the last shard\n", + " end_idx = num_samples\n", + " else:\n", + " end_idx = start_idx + shard_size\n", + " shards.append(indexes[start_idx:end_idx])\n", + " \n", + " return shards" + ] + }, + { + "cell_type": "markdown", + "id": "a915fa13-fc10-4b02-808c-ad2c05cee297", + "metadata": {}, + "source": [ + "## Define Federated Averaging Method\n", + "The FedAvg method is used to average the models from all the collaborators after training." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "65e2b0a2-5a79-4f92-b255-4c8d3e28b635", + "metadata": {}, + "outputs": [], + "source": [ + "def inference(model, test_loader, batch_size):\n", + " x_test, y_test = test_loader\n", + " loss, accuracy = model.evaluate(\n", + " x_test,\n", + " y_test,\n", + " batch_size=batch_size,\n", + " verbose=1\n", + " )\n", + " accuracy_percentage = accuracy * 100\n", + " print(f\"Test set: Avg. loss: {loss}, Accuracy: {accuracy_percentage:.2f}%\")\n", + " return accuracy\n", + " \n", + "# Federated Averaging for Lasso models\n", + "def FedAvg(models):\n", + " new_model = models[0]\n", + " coef_list = [model.model.coef_ for model in models]\n", + " intercept_list = [model.model.intercept_ for model in models]\n", + " new_model.coef_ = np.mean(coef_list, axis=0)\n", + " new_model.intercept_ = np.mean(intercept_list, axis=0)\n", + " return new_model" + ] + }, + { + "cell_type": "markdown", + "id": "8a3aefde-45a3-4bba-8b5b-6040808de966", + "metadata": {}, + "source": [ + "## Define Federated Learning Workflow\n", + "Define the workflow for federated learning using OpenFL's FLSpec." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c04b4ab2-1d40-44c7-907b-a6a7d176c959", + "metadata": {}, + "outputs": [], + "source": [ + "# Define the federated learning workflow\n", + "from openfl.experimental.placement import aggregator, collaborator\n", + "\n", + "# Federated Learning Workflow using OpenFL's Workflow API\n", + "class FederatedLassoFlow(FLSpec):\n", + " def __init__(self, model, num_rounds=3):\n", + " self.model = model\n", + " self.num_rounds = num_rounds\n", + " self.current_round = 1\n", + " self._checkpoint = False\n", + "\n", + " @aggregator\n", + " def start(self):\n", + " self.current_round = 0\n", + " self.collaborators = self.runtime.collaborators # Fetch the collaborators dynamically\n", + " self.next(self.aggregated_model_validation, foreach='collaborators')\n", + "\n", + "\n", + " @collaborator\n", + " def aggregated_model_validation(self):\n", + " x_test, y_test = self.test_loader\n", + " mse = self.model.mse(x_test, y_test)\n", + " print(f\"aggregation model validation MSE: {mse:.4f}\")\n", + " self.aggregated_mse = mse\n", + " self.next(self.train)\n", + "\n", + " @collaborator\n", + " def train(self):\n", + " x_train, y_train = self.train_loader\n", + " print(f'x_train shape: {x_train.shape}, y_train shape: {y_train.shape}')\n", + " history = self.model.fit(\n", + " x_train[:,np.newaxis], y_train\n", + " )\n", + " self.next(self.local_model_validation)\n", + "\n", + " @collaborator\n", + " def local_model_validation(self):\n", + " \"\"\"Validate the model on local test data.\"\"\"\n", + " \n", + " x_test, y_test = self.test_loader\n", + " mse = self.model.mse(x_test, y_test)\n", + " print(f\"Local model validation MSE: {mse:.4f}\")\n", + " self.local_mse = mse\n", + " self.next(self.join)\n", + "\n", + "\n", + " @aggregator\n", + " def join(self, inputs):\n", + "\n", + " self.aggregated_model_mse = sum(\n", + " input.aggregated_mse for input in inputs) / len(inputs)\n", + " self.local_model_mse = sum(\n", + " input.local_mse for input in inputs) / len(inputs)\n", + " print(f'Average aggregated model MSE = {self.aggregated_model_mse}')\n", + " print(f'Average local model MSE = {self.local_model_mse}')\n", + " \n", + " print(\"Taking FedAvg of models of all collaborators\")\n", + " self.model = FedAvg([input.model for input in inputs])\n", + "\n", + " self.next(self.internal_loop)\n", + "\n", + " @aggregator\n", + " def internal_loop(self):\n", + " if self.current_round == self.num_rounds:\n", + " self.next(self.end)\n", + " else:\n", + " self.current_round += 1\n", + " print(f\"current round : {self.current_round}\")\n", + " self.next(self.aggregated_model_validation, foreach='collaborators')\n", + "\n", + " @aggregator\n", + " def end(self):\n", + " print(f\"Federated learning complete after {self.num_rounds} rounds.\")\n", + " final_predictions = self.model.predict(X_test)\n", + " final_mse = self.model.mse(Y_test, final_predictions)\n", + " print(f\"Final aggregated model MSE on test data: {final_mse:.4f}\")" + ] + }, + { + "cell_type": "markdown", + "id": "3255cac0-caf3-4713-beef-9ff32fe73372", + "metadata": {}, + "source": [ + "## Start the Federated Learning Process\n", + "Create an instance of FederatedLassoFlow and run it with the new larger dataset." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8ff0ffff-6e84-4b3e-b537-d66cade93cac", + "metadata": {}, + "outputs": [], + "source": [ + "# Initialize and run the federated learning workflow\n", + "federated_flow = FederatedLassoFlow(model=lr_model, num_rounds=10)\n", + "\n", + "# Set the runtime for federated learning\n", + "federated_flow.runtime = local_runtime\n", + "\n", + "# Start the federated learning process\n", + "federated_flow.run()" + ] + }, + { + "cell_type": "markdown", + "id": "ddfb3ac6-d14a-4e00-83a9-04195b1efdf8", + "metadata": {}, + "source": [ + "## 🎉 Congratulations! 🎉" + ] + }, + { + "cell_type": "markdown", + "id": "cbb985a7-1a10-410e-99d3-cebdf2b809a2", + "metadata": {}, + "source": [ + "Now that you've completed workflow interface notebook for **scikit-learn Linear Regression** using federated learning.\n", + "\n", + "### Happy learning and happy coding with OpenFL! 🎉" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/openfl-tutorials/experimental/workflow_interface_requirements.txt b/openfl-tutorials/experimental/workflow_interface_requirements.txt index e589695b92..2012313c7e 100644 --- a/openfl-tutorials/experimental/workflow_interface_requirements.txt +++ b/openfl-tutorials/experimental/workflow_interface_requirements.txt @@ -2,8 +2,15 @@ astor==0.8.1 chardet charset-normalizer dill==0.3.6 +matplotlib>=2.0.0 metaflow==2.7.15 +mistune>=2.0.3 # not directly required, pinned by Snyk to avoid a vulnerability nbdev==2.3.12 +numpy>=1.13.3 +openfl>=1.2.1 ray==2.9.2 +scikit-learn>=0.24.1 +setuptools>=65.5.1 # not directly required, pinned by Snyk to avoid a vulnerability torch torchvision +wheel>=0.38.0 # not directly required, pinned by Snyk to avoid a vulnerability