From 4ea9eba0b15db668301bed92d928f852d3f0cd54 Mon Sep 17 00:00:00 2001 From: Noopur Date: Wed, 20 Nov 2024 14:23:30 +0530 Subject: [PATCH 1/3] TaskRunner E2E Workflow: corrections and enhancements (#1161) * TaskRunner E2E Workflow: corrections and enhancements Signed-off-by: noopur * Comment the fork skip part for testing Signed-off-by: noopur * Let wf run for all repos Signed-off-by: noopur * Remove the redundant comments Signed-off-by: noopur --------- Signed-off-by: noopur --- .github/workflows/task_runner_e2e.yml | 38 ++++++++++++--------- tests/end_to_end/pytest.ini | 2 +- tests/end_to_end/utils/federation_helper.py | 8 ++--- tests/end_to_end/utils/summary_helper.py | 27 +++++++++------ 4 files changed, 42 insertions(+), 33 deletions(-) diff --git a/.github/workflows/task_runner_e2e.yml b/.github/workflows/task_runner_e2e.yml index 9603db81cf..98ffde8433 100644 --- a/.github/workflows/task_runner_e2e.yml +++ b/.github/workflows/task_runner_e2e.yml @@ -30,7 +30,7 @@ env: NUM_COLLABORATORS: ${{ inputs.num_collaborators || '2' }} jobs: - test: + test_with_tls: name: tr_tls runs-on: ubuntu-22.04 timeout-minutes: 120 # 2 hours @@ -71,28 +71,30 @@ jobs: - name: Run Task Runner E2E tests with TLS id: run_tests run: | - python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py -m ${{ env.MODEL_NAME }} --num_rounds $NUM_ROUNDS --num_collaborators $NUM_COLLABORATORS --model_name ${{ env.MODEL_NAME }} + python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \ + -m ${{ env.MODEL_NAME }} --model_name ${{ env.MODEL_NAME }} \ + --num_rounds $NUM_ROUNDS --num_collaborators $NUM_COLLABORATORS echo "Task runner end to end test run completed" - - name: Print test summary # Print the test summary only if the tests were run + - name: Print test summary id: print_test_summary - if: steps.run_tests.outcome == 'success' || steps.run_tests.outcome == 'failure' + if: ${{ always() }} run: | export PYTHONPATH="$PYTHONPATH:." python tests/end_to_end/utils/summary_helper.py echo "Test summary printed" - - name: Tar files # Tar the test results only if the tests were run + - name: Tar files id: tar_files - if: steps.run_tests.outcome == 'success' || steps.run_tests.outcome == 'failure' + if: ${{ always() }} run: tar -cvf result.tar results - - name: Upload Artifacts # Upload the test results only if the tar was created + - name: Upload Artifacts id: upload_artifacts uses: actions/upload-artifact@v4 - if: steps.tar_files.outcome == 'success' + if: ${{ always() }} with: - name: task_runner_tls_${{ env.MODEL_NAME }}_python${{ env.PYTHON_VERSION }}_${{ github.run_id }} + name: tr_tls_${{ env.MODEL_NAME }}_python${{ env.PYTHON_VERSION }}_${{ github.run_id }} path: result.tar test_with_non_tls: @@ -136,26 +138,28 @@ jobs: - name: Run Task Runner E2E tests without TLS id: run_tests run: | - python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py -m ${{ env.MODEL_NAME }} --num_rounds $NUM_ROUNDS --num_collaborators $NUM_COLLABORATORS --disable_tls + python -m pytest -s tests/end_to_end/test_suites/task_runner_tests.py \ + -m ${{ env.MODEL_NAME }} --model_name ${{ env.MODEL_NAME }} \ + --num_rounds $NUM_ROUNDS --num_collaborators $NUM_COLLABORATORS --disable_tls echo "Task runner end to end test run completed" - - name: Print test summary # Print the test summary only if the tests were run + - name: Print test summary id: print_test_summary - if: steps.run_tests.outcome == 'success' || steps.run_tests.outcome == 'failure' + if: ${{ always() }} run: | export PYTHONPATH="$PYTHONPATH:." python tests/end_to_end/utils/summary_helper.py echo "Test summary printed" - - name: Tar files # Tar the test results only if the tests were run + - name: Tar files id: tar_files - if: steps.run_tests.outcome == 'success' || steps.run_tests.outcome == 'failure' + if: ${{ always() }} run: tar -cvf result.tar results - - name: Upload Artifacts # Upload the test results only if the tar was created + - name: Upload Artifacts id: upload_artifacts uses: actions/upload-artifact@v4 - if: steps.tar_files.outcome == 'success' + if: ${{ always() }} with: - name: task_runner_non_tls_${{ env.MODEL_NAME }}_python${{ env.PYTHON_VERSION }}_${{ github.run_id }} + name: tr_non_tls_${{ env.MODEL_NAME }}_python${{ env.PYTHON_VERSION }}_${{ github.run_id }} path: result.tar diff --git a/tests/end_to_end/pytest.ini b/tests/end_to_end/pytest.ini index 8d18441dd6..9896220535 100644 --- a/tests/end_to_end/pytest.ini +++ b/tests/end_to_end/pytest.ini @@ -1,5 +1,5 @@ [pytest] -addopts = -ra -q -s --junitxml=results/results.xml +addopts = -ra -q -s --junitxml=results/results.xml -x testpaths = test_suites junit_family = xunit2 results_dir = results diff --git a/tests/end_to_end/utils/federation_helper.py b/tests/end_to_end/utils/federation_helper.py index 1da1c68012..361e220131 100644 --- a/tests/end_to_end/utils/federation_helper.py +++ b/tests/end_to_end/utils/federation_helper.py @@ -26,7 +26,7 @@ def setup_pki(fed_obj): fed_obj.aggregator.generate_sign_request() fed_obj.model_owner.certify_aggregator(fed_obj.aggregator.agg_domain_name) except Exception as e: - log.error(f"Failed to perform aggregator operations: {e}") + log.error(f"Failed to perform PKI setup for aggregator: {e}") raise e # Collaborator and model owner operations @@ -38,11 +38,11 @@ def setup_pki(fed_obj): fed_obj.model_owner.certify_collaborator(collaborator.collaborator_name) collaborator.import_pki() except Exception as e: - log.error(f"Failed to perform collaborator operations: {e}") + log.error(f"Failed to perform PKI setup for {collaborator.collaborator_name}: {e}") raise e success = True - log.info("CSR operations completed successfully for all participants") + log.info("PKI setup successfully for all participants") return success @@ -93,7 +93,7 @@ def verify_federation_run_completion(fed_obj, results): # Result will contain a list of boolean values for all the participants. # True - successful completion, False - failed/incomplete results = [f.result() for f in futures] - log.info(f"Results: {results}") + log.info(f"Results from all the participants: {results}") # If any of the participant failed, return False, else return True return all(results) diff --git a/tests/end_to_end/utils/summary_helper.py b/tests/end_to_end/utils/summary_helper.py index e82ecfe2c2..685b94bb6a 100644 --- a/tests/end_to_end/utils/summary_helper.py +++ b/tests/end_to_end/utils/summary_helper.py @@ -21,9 +21,10 @@ def get_aggregated_accuracy(agg_log_file): Returns: agg_accuracy: the aggregated accuracy """ + agg_accuracy = "Not Found" if not os.path.exists(agg_log_file): print(f"Aggregator log file {agg_log_file} not found. Cannot get aggregated accuracy") - return "Not Found" + return agg_accuracy # Example line(s) containing spaces and special characters: """ @@ -33,17 +34,17 @@ def get_aggregated_accuracy(agg_log_file): try: with open(agg_log_file, 'r') as f: for line in f: - if "metric_origin" in line and "aggregator" in line and "aggregated_model_validation" in line: + if "'metric_origin': 'aggregator'" in line and "aggregated_model_validation" in line: line = line.split("aggregator.py:")[0].strip() # If the line does not contain closing bracket "}", then concatenate the next line reqd_line = line if "}" in line else line + next(f).strip() agg_accuracy = eval(reqd_line.split("METRIC")[1].strip('"'))["metric_value"] - return agg_accuracy - + break except Exception as e: # Do not fail the test if the accuracy cannot be fetched print(f"Error while reading aggregator log file: {e}") - return "Not Found" + + return agg_accuracy def get_test_status(result): @@ -54,16 +55,17 @@ def get_test_status(result): Returns status of the test status """ - status = "FAILED" + status, err_msg = "FAILED", "NA" if "failure" in result.tag or "error" in result.tag: # If the result has a tag "failure", set status as "FAIL" status = "FAILED" + err_msg = result.get("message").split("\n")[0] elif "skipped" in result.tag: # If the result has a tag "skipped", set status as "SKIPPED" status = "SKIPPED" else: status = "PASSED" - return status + return status, err_msg def get_testcase_result(): @@ -84,11 +86,13 @@ def get_testcase_result(): # Successful test won't have any result/subtag if len(testcase) == 0: database_dict["result"] = "PASSED" + database_dict["err_msg"] = "NA" # Iterate over each result in testsuite for result in testcase: - status = get_test_status(result) + status, err_msg = get_test_status(result) database_dict["result"] = status + database_dict["err_msg"] = err_msg # Append the dictionary to database_list database_list.append(database_dict) @@ -110,6 +114,7 @@ def get_testcase_result(): if not model_name: print("MODEL_NAME is not set, cannot find out aggregator logs") + agg_accuracy = "Not Found" else: workspace_name = "workspace_" + model_name agg_log_file = os.path.join("results", workspace_name, "aggregator.log") @@ -118,7 +123,7 @@ def get_testcase_result(): # Write the results to GitHub step summary with open(os.getenv('GITHUB_STEP_SUMMARY'), 'a') as fh: # DO NOT change the print statements - print("| Name | Time (in seconds) | Result | Collaborators | Rounds to train | Score (if applicable) |", file=fh) - print("| ------------- | ------------- | ------------- | ------------- | ------------- | ------------- |", file=fh) + print("| Name | Time (in seconds) | Result | Error (if any) | Collaborators | Rounds to train | Score (if applicable) |", file=fh) + print("| ------------- | ------------- | ------------- | ------------- | ------------- | ------------- | ------------- |", file=fh) for item in result: - print(f"| {item['name']} | {item['time']} | {item['result']} | {num_cols} | {num_rounds} | {agg_accuracy} |", file=fh) + print(f"| {item['name']} | {item['time']} | {item['result']} | {item['err_msg']} | {num_cols} | {num_rounds} | {agg_accuracy} |", file=fh) From 918e1253aaeb681b221c728a055192a0d3440dc4 Mon Sep 17 00:00:00 2001 From: Ynon Flum Date: Wed, 20 Nov 2024 16:27:21 +0200 Subject: [PATCH 2/3] Add pytorch MNIST Workflow tutorial (#1158) Signed-off-by: Ynon Flum --- ...Prox_PyTorch_MNIST_Workflow_Tutorial.ipynb | 625 ++++++++++++++++++ ...> 402_FedProx_with_Synthetic_nonIID.ipynb} | 0 ...regator_Validation_Ray_Watermarking.ipynb} | 0 3 files changed, 625 insertions(+) create mode 100644 openfl-tutorials/experimental/401_Federated_FedProx_PyTorch_MNIST_Workflow_Tutorial.ipynb rename openfl-tutorials/experimental/{401_FedProx_with_Synthetic_nonIID.ipynb => 402_FedProx_with_Synthetic_nonIID.ipynb} (100%) rename openfl-tutorials/experimental/{401_MNIST_Aggregator_Validation_Ray_Watermarking.ipynb => 402_MNIST_Aggregator_Validation_Ray_Watermarking.ipynb} (100%) diff --git a/openfl-tutorials/experimental/401_Federated_FedProx_PyTorch_MNIST_Workflow_Tutorial.ipynb b/openfl-tutorials/experimental/401_Federated_FedProx_PyTorch_MNIST_Workflow_Tutorial.ipynb new file mode 100644 index 0000000000..6b6d8225a0 --- /dev/null +++ b/openfl-tutorials/experimental/401_Federated_FedProx_PyTorch_MNIST_Workflow_Tutorial.ipynb @@ -0,0 +1,625 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# FedProx PyTorch MNIST Tutorial using Workflow API\n", + "This notebook sets up a distributed training federation which runs the `FedProx`[https://arxiv.org/abs/1812.06127] algorithm using OpenFL's `Workflow API`[https://openfl.readthedocs.io/en/latest/about/features_index/workflowinterface.html] locally using a `LocalRuntime`[https://openfl.readthedocs.io/en/latest/about/features_index/workflowinterface.html#runtimes] - scalable to a federated setting in the future.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Import the relevant libraries" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import torch\n", + "import torch.nn as nn\n", + "import torch.nn.functional as F\n", + "\n", + "import torch.utils\n", + "import torch.utils.data\n", + "import torchvision\n", + "import torchvision.transforms as transforms\n", + "\n", + "from openfl.utilities.optimizers.torch.fedprox import FedProxAdam\n", + "\n", + "from openfl.experimental.interface import FLSpec, Aggregator, Collaborator\n", + "from openfl.experimental.runtime import LocalRuntime\n", + "from openfl.experimental.placement import aggregator, collaborator" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Define the model:" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [], + "source": [ + "class Net(nn.Module):\n", + " def __init__(self):\n", + " super(Net, self).__init__()\n", + " self.conv1 = nn.Conv2d(1, 16, 3)\n", + " self.pool = nn.MaxPool2d(2, 2)\n", + " self.conv2 = nn.Conv2d(16, 32, 3)\n", + " self.fc1 = nn.Linear(32 * 5 * 5, 32)\n", + " self.fc2 = nn.Linear(32, 84)\n", + " self.fc3 = nn.Linear(84, 10)\n", + "\n", + " def forward(self, x):\n", + " x = self.pool(F.relu(self.conv1(x)))\n", + " x = self.pool(F.relu(self.conv2(x)))\n", + " x = x.view(x.size(0),-1)\n", + " x = F.relu(self.fc1(x))\n", + " x = F.relu(self.fc2(x))\n", + " x = self.fc3(x)\n", + " return F.log_softmax(x, dim=1)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Set up the dataset:" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": {}, + "outputs": [], + "source": [ + "transform = transforms.Compose(\n", + " [transforms.ToTensor(),\n", + " transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])\n", + "\n", + "mnist_train = torchvision.datasets.MNIST(\n", + " \"./files/\",\n", + " train=True,\n", + " download=True,\n", + " transform=transform,\n", + ")\n", + "\n", + "mnist_test = torchvision.datasets.MNIST(\n", + " \"./files/\",\n", + " train=False,\n", + " download=True,\n", + " transform=transform,\n", + ")\n", + "\n", + "class CustomDataset(torch.utils.data.Dataset):\n", + " \"\"\"Dataset enumeration as tensors\"\"\"\n", + " def __init__(self, images, labels):\n", + " self.images = images\n", + " self.labels = labels\n", + "\n", + " def __len__(self):\n", + " return len(self.images)\n", + "\n", + " def __getitem__(self, idx):\n", + " image = self.images[idx]\n", + " label = self.labels[idx]\n", + " return image, label" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The next step is setting up the participants, an `Aggregator` and a few `Collaborator`s which will train the model, partition the dataset between the collaborators, and pass them to the appropriate runtime environment (in our case, a `LocalRuntime`).\n" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [], + "source": [ + "def one_hot(labels, classes):\n", + " return np.eye(classes)[labels]\n", + "\n", + "# Setup participants\n", + "aggregator_ = Aggregator()\n", + "aggregator_.private_attributes = {}\n", + "\n", + "# Setup collaborators with private attributes\n", + "collaborator_names = [f'collaborator{i}' for i in range(4)]\n", + "collaborators = [Collaborator(name=name) for name in collaborator_names]\n", + "batch_size_train = 1024\n", + "batch_size_test = 1024\n", + "log_interval = 10\n", + "\n", + "for idx, collaborator_ in enumerate(collaborators):\n", + " train_images, train_labels = mnist_train.train_data, np.array(mnist_train.train_labels)\n", + " train_images = torch.from_numpy(np.expand_dims(train_images, axis=1)).float()\n", + " train_labels = one_hot(train_labels, 10)\n", + "\n", + " valid_images, valid_labels = mnist_test.test_data, np.array(mnist_test.test_labels)\n", + " valid_images = torch.from_numpy(np.expand_dims(valid_images, axis=1)).float()\n", + "\n", + " collaborator_.private_attributes = {\n", + " 'train_loader': torch.utils.data.DataLoader(\n", + " CustomDataset(train_images[idx::len(collaborators)], \n", + " train_labels[idx::len(collaborators)]), \n", + " batch_size=batch_size_train, \n", + " shuffle=True),\n", + " 'test_loader': torch.utils.data.DataLoader(\n", + " CustomDataset(valid_images[idx::len(collaborators)], \n", + " valid_labels[idx::len(collaborators)]), \n", + " batch_size=batch_size_test, \n", + " shuffle=True)\n", + " }\n", + "\n", + "local_runtime = LocalRuntime(aggregator=aggregator_, collaborators=collaborators, backend='single_process')\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Define an aggregation algorithm, optimizer and a loss function:" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": {}, + "outputs": [], + "source": [ + "# Aggregation algorithm\n", + "def FedAvg(models, weights=None):\n", + " new_model = models[0]\n", + " new_state_dict = dict()\n", + " for key in new_model.state_dict().keys():\n", + " new_state_dict[key] = torch.from_numpy(np.average([model.state_dict()[key].numpy() for model in models],\n", + " axis=0, \n", + " weights=weights))\n", + "\n", + " new_model.load_state_dict(new_state_dict)\n", + " return new_model\n", + "\n", + "def get_optimizer(model):\n", + " return FedProxAdam(model.parameters(), lr=1e-3, mu=0.01)\n", + "\n", + "def cross_entropy(output, target):\n", + " \"\"\"Binary cross-entropy loss function\"\"\"\n", + " return F.binary_cross_entropy_with_logits(input=output,target=target.float())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Set up work to be executed by the aggregator and the collaborators by extending `FLSpec`:" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Aggregator step \"start\" registered\n", + "Collaborator step \"aggregated_model_validation\" registered\n", + "Collaborator step \"train\" registered\n", + "Collaborator step \"local_model_validation\" registered\n", + "Aggregator step \"join\" registered\n", + "Aggregator step \"end\" registered\n" + ] + } + ], + "source": [ + "class FederatedFlow(FLSpec):\n", + " def __init__(self, model=None, optimizer=None, rounds=10, **kwargs):\n", + " super().__init__(**kwargs)\n", + " self.model = model\n", + " self.optimizer = optimizer\n", + " self.rounds = rounds\n", + " self.loss = 0.\n", + "\n", + " @aggregator\n", + " def start(self):\n", + " print(f'Performing initialization for model')\n", + " self.collaborators = self.runtime.collaborators\n", + " self.current_round = 0\n", + " self.next(self.aggregated_model_validation, foreach='collaborators')\n", + "\n", + " def compute_accuracy(self, data_loader):\n", + " self.model.eval()\n", + " test_loss = 0\n", + " correct = 0\n", + " with torch.no_grad():\n", + " for data, target in data_loader:\n", + " output = self.model(data)\n", + " test_loss += F.cross_entropy(output, target, size_average=False).item()\n", + " pred = output.data.max(1, keepdim=True)[1]\n", + " correct += pred.eq(target.data.view_as(pred)).sum()\n", + "\n", + " test_loss /= len(data_loader.dataset)\n", + " print('\\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\\n'.format(\n", + " test_loss, correct, len(data_loader.dataset),\n", + " 100. * correct / len(data_loader.dataset)))\n", + " accuracy = float(correct / len(data_loader.dataset))\n", + " return accuracy\n", + "\n", + " @collaborator\n", + " def aggregated_model_validation(self):\n", + " print(f'Performing aggregated model validation for collaborator {self.input}, model: {id(self.model)}')\n", + " self.agg_validation_score = self.compute_accuracy(self.test_loader)\n", + " self.next(self.train)\n", + "\n", + " @collaborator\n", + " def train(self):\n", + " # Log after processing a quarter of the samples\n", + " log_threshold = .25\n", + "\n", + " self.model.train()\n", + " self.optimizer = get_optimizer(self.model)\n", + " for batch_idx, (data, target) in enumerate(self.train_loader):\n", + " self.optimizer.zero_grad()\n", + " output = self.model(data)\n", + " loss = F.cross_entropy(output, target)\n", + " loss.backward()\n", + " self.optimizer.step()\n", + "\n", + " if (len(data) * batch_idx) / len(self.train_loader.dataset) >= log_threshold:\n", + " print('Train Epoch: [{}/{} ({:.0f}%)]\\tLoss: {:.6f}'.format(\n", + " batch_idx * len(data), len(self.train_loader.dataset),\n", + " 100. * batch_idx / len(self.train_loader), loss.item()))\n", + " self.loss = loss.item()\n", + " log_threshold += .25\n", + " torch.save(self.model.state_dict(), 'model.pth')\n", + " torch.save(self.optimizer.state_dict(), 'optimizer.pth')\n", + " \n", + " self.next(self.local_model_validation)\n", + "\n", + " @collaborator\n", + " def local_model_validation(self):\n", + " print(f'Performing local model validation for collaborator {self.input}')\n", + " self.local_validation_score = self.compute_accuracy(self.test_loader)\n", + " print(\n", + " f'Done with local model validation for collaborator {self.input}, Accuracy: {self.local_validation_score}')\n", + " self.next(self.join)\n", + "\n", + " @aggregator\n", + " def join(self, inputs):\n", + " self.model = FedAvg([input.model for input in inputs])\n", + " self.optimizer = inputs[0].optimizer\n", + " self.current_round += 1\n", + "\n", + " self.average_loss = sum(input.loss for input in inputs) / len(inputs)\n", + " self.aggregated_model_accuracy = sum(\n", + " input.agg_validation_score for input in inputs) / len(inputs)\n", + " self.local_model_accuracy = sum(\n", + " input.local_validation_score for input in inputs) / len(inputs)\n", + " print(f'Average aggregated model accuracy = {self.aggregated_model_accuracy}')\n", + " print(f'Average training loss = {self.average_loss}')\n", + " print(f'Average local model validation values = {self.local_model_accuracy}')\n", + "\n", + " if self.current_round < self.rounds:\n", + " self.next(self.aggregated_model_validation, foreach='collaborators')\n", + " else:\n", + " self.next(self.end)\n", + "\n", + " @aggregator\n", + " def end(self):\n", + " print(f'Flow ended')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Finally, run the federation:" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "Calling start\n", + "\u001b[94mPerforming initialization for model\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling aggregated_model_validation\n", + "\u001b[94mPerforming aggregated model validation for collaborator collaborator0, model: 140162497619616\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 4.6833, Accuracy: 171/2500 (7%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling train\n", + "\u001b[94mTrain Epoch: [4096/15000 (27%)]\tLoss: 1.889274\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [8192/15000 (53%)]\tLoss: 1.279191\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [11264/15000 (73%)]\tLoss: 0.994200\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling local_model_validation\n", + "\u001b[94mPerforming local model validation for collaborator collaborator0\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.7548, Accuracy: 1929/2500 (77%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mDone with local model validation for collaborator collaborator0, Accuracy: 0.7716000080108643\u001b[0m\u001b[94m\n", + "\u001b[0mShould transfer from local_model_validation to join\n", + "\n", + "Calling aggregated_model_validation\n", + "\u001b[94mPerforming aggregated model validation for collaborator collaborator1, model: 140158910463952\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 4.7259, Accuracy: 173/2500 (7%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling train\n", + "\u001b[94mTrain Epoch: [4096/15000 (27%)]\tLoss: 1.675623\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [8192/15000 (53%)]\tLoss: 1.068585\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [11264/15000 (73%)]\tLoss: 0.687561\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling local_model_validation\n", + "\u001b[94mPerforming local model validation for collaborator collaborator1\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.6366, Accuracy: 2004/2500 (80%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mDone with local model validation for collaborator collaborator1, Accuracy: 0.8015999794006348\u001b[0m\u001b[94m\n", + "\u001b[0mShould transfer from local_model_validation to join\n", + "\n", + "Calling aggregated_model_validation\n", + "\u001b[94mPerforming aggregated model validation for collaborator collaborator2, model: 140162497661872\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 4.6549, Accuracy: 215/2500 (9%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling train\n", + "\u001b[94mTrain Epoch: [4096/15000 (27%)]\tLoss: 1.879489\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [8192/15000 (53%)]\tLoss: 1.325507\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [11264/15000 (73%)]\tLoss: 0.968176\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling local_model_validation\n", + "\u001b[94mPerforming local model validation for collaborator collaborator2\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.7462, Accuracy: 1901/2500 (76%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mDone with local model validation for collaborator collaborator2, Accuracy: 0.7603999972343445\u001b[0m\u001b[94m\n", + "\u001b[0mShould transfer from local_model_validation to join\n", + "\n", + "Calling aggregated_model_validation\n", + "\u001b[94mPerforming aggregated model validation for collaborator collaborator3, model: 140162498346528\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 4.7129, Accuracy: 193/2500 (8%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling train\n", + "\u001b[94mTrain Epoch: [4096/15000 (27%)]\tLoss: 1.720635\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [8192/15000 (53%)]\tLoss: 1.061211\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [11264/15000 (73%)]\tLoss: 0.762026\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling local_model_validation\n", + "\u001b[94mPerforming local model validation for collaborator collaborator3\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.6378, Accuracy: 1992/2500 (80%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mDone with local model validation for collaborator collaborator3, Accuracy: 0.7968000173568726\u001b[0m\u001b[94m\n", + "\u001b[0mShould transfer from local_model_validation to join\n", + "\n", + "Calling join\n", + "\u001b[94mAverage aggregated model accuracy = 0.07520000264048576\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mAverage training loss = 0.8529909627063148\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mAverage local model validation values = 0.782600000500679\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling aggregated_model_validation\n", + "\u001b[94mPerforming aggregated model validation for collaborator collaborator0, model: 140158910552480\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.6740, Accuracy: 1996/2500 (80%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling train\n", + "\u001b[94mTrain Epoch: [4096/15000 (27%)]\tLoss: 0.974921\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [8192/15000 (53%)]\tLoss: 0.633429\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [11264/15000 (73%)]\tLoss: 0.591566\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling local_model_validation\n", + "\u001b[94mPerforming local model validation for collaborator collaborator0\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.3951, Accuracy: 2214/2500 (89%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mDone with local model validation for collaborator collaborator0, Accuracy: 0.8855999708175659\u001b[0m\u001b[94m\n", + "\u001b[0mShould transfer from local_model_validation to join\n", + "\n", + "Calling aggregated_model_validation\n", + "\u001b[94mPerforming aggregated model validation for collaborator collaborator1, model: 140162497608672\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.6877, Accuracy: 1981/2500 (79%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling train\n", + "\u001b[94mTrain Epoch: [4096/15000 (27%)]\tLoss: 0.824028\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [8192/15000 (53%)]\tLoss: 0.515538\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [11264/15000 (73%)]\tLoss: 0.410188\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling local_model_validation\n", + "\u001b[94mPerforming local model validation for collaborator collaborator1\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.4668, Accuracy: 2166/2500 (87%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mDone with local model validation for collaborator collaborator1, Accuracy: 0.8664000034332275\u001b[0m\u001b[94m\n", + "\u001b[0mShould transfer from local_model_validation to join\n", + "\n", + "Calling aggregated_model_validation\n", + "\u001b[94mPerforming aggregated model validation for collaborator collaborator2, model: 140162498107328\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.6919, Accuracy: 1981/2500 (79%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling train\n", + "\u001b[94mTrain Epoch: [4096/15000 (27%)]\tLoss: 1.025180\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [8192/15000 (53%)]\tLoss: 0.616896\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [11264/15000 (73%)]\tLoss: 0.483282\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling local_model_validation\n", + "\u001b[94mPerforming local model validation for collaborator collaborator2\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.4406, Accuracy: 2163/2500 (87%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mDone with local model validation for collaborator collaborator2, Accuracy: 0.8651999831199646\u001b[0m\u001b[94m\n", + "\u001b[0mShould transfer from local_model_validation to join\n", + "\n", + "Calling aggregated_model_validation\n", + "\u001b[94mPerforming aggregated model validation for collaborator collaborator3, model: 140162498345664\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.6698, Accuracy: 2000/2500 (80%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling train\n", + "\u001b[94mTrain Epoch: [4096/15000 (27%)]\tLoss: 0.725868\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [8192/15000 (53%)]\tLoss: 0.450241\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [11264/15000 (73%)]\tLoss: 0.388554\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling local_model_validation\n", + "\u001b[94mPerforming local model validation for collaborator collaborator3\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.4106, Accuracy: 2211/2500 (88%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mDone with local model validation for collaborator collaborator3, Accuracy: 0.8844000101089478\u001b[0m\u001b[94m\n", + "\u001b[0mShould transfer from local_model_validation to join\n", + "\n", + "Calling join\n", + "\u001b[94mAverage aggregated model accuracy = 0.7958000004291534\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mAverage training loss = 0.4683974838455107\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mAverage local model validation values = 0.8753999918699265\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling aggregated_model_validation\n", + "\u001b[94mPerforming aggregated model validation for collaborator collaborator0, model: 140162648091376\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.3590, Accuracy: 2230/2500 (89%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling train\n", + "\u001b[94mTrain Epoch: [4096/15000 (27%)]\tLoss: 0.406638\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [8192/15000 (53%)]\tLoss: 0.313662\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [11264/15000 (73%)]\tLoss: 0.326520\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling local_model_validation\n", + "\u001b[94mPerforming local model validation for collaborator collaborator0\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.2096, Accuracy: 2338/2500 (94%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mDone with local model validation for collaborator collaborator0, Accuracy: 0.9351999759674072\u001b[0m\u001b[94m\n", + "\u001b[0mShould transfer from local_model_validation to join\n", + "\n", + "Calling aggregated_model_validation\n", + "\u001b[94mPerforming aggregated model validation for collaborator collaborator1, model: 140162646717344\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.3773, Accuracy: 2228/2500 (89%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling train\n", + "\u001b[94mTrain Epoch: [4096/15000 (27%)]\tLoss: 0.392126\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [8192/15000 (53%)]\tLoss: 0.228912\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [11264/15000 (73%)]\tLoss: 0.200197\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling local_model_validation\n", + "\u001b[94mPerforming local model validation for collaborator collaborator1\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.2601, Accuracy: 2317/2500 (93%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mDone with local model validation for collaborator collaborator1, Accuracy: 0.926800012588501\u001b[0m\u001b[94m\n", + "\u001b[0mShould transfer from local_model_validation to join\n", + "\n", + "Calling aggregated_model_validation\n", + "\u001b[94mPerforming aggregated model validation for collaborator collaborator2, model: 140162498503728\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.3683, Accuracy: 2240/2500 (90%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling train\n", + "\u001b[94mTrain Epoch: [4096/15000 (27%)]\tLoss: 0.583415\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [8192/15000 (53%)]\tLoss: 0.407979\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [11264/15000 (73%)]\tLoss: 0.299050\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling local_model_validation\n", + "\u001b[94mPerforming local model validation for collaborator collaborator2\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.2664, Accuracy: 2305/2500 (92%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mDone with local model validation for collaborator collaborator2, Accuracy: 0.921999990940094\u001b[0m\u001b[94m\n", + "\u001b[0mShould transfer from local_model_validation to join\n", + "\n", + "Calling aggregated_model_validation\n", + "\u001b[94mPerforming aggregated model validation for collaborator collaborator3, model: 140162497621488\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.3626, Accuracy: 2226/2500 (89%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling train\n", + "\u001b[94mTrain Epoch: [4096/15000 (27%)]\tLoss: 0.371595\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [8192/15000 (53%)]\tLoss: 0.234668\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mTrain Epoch: [11264/15000 (73%)]\tLoss: 0.177007\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling local_model_validation\n", + "\u001b[94mPerforming local model validation for collaborator collaborator3\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94m\n", + "Test set: Avg. loss: 0.2615, Accuracy: 2305/2500 (92%)\n", + "\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mDone with local model validation for collaborator collaborator3, Accuracy: 0.921999990940094\u001b[0m\u001b[94m\n", + "\u001b[0mShould transfer from local_model_validation to join\n", + "\n", + "Calling join\n", + "\u001b[94mAverage aggregated model accuracy = 0.8924000114202499\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mAverage training loss = 0.250693485351252\u001b[0m\u001b[94m\n", + "\u001b[0m\u001b[94mAverage local model validation values = 0.926499992609024\u001b[0m\u001b[94m\n", + "\u001b[0m\n", + "Calling end\n", + "\u001b[94mFlow ended\u001b[0m\u001b[94m\n", + "\u001b[0m" + ] + } + ], + "source": [ + "model = Net()\n", + "flflow = FederatedFlow(model, get_optimizer(model), rounds=3, checkpoint=False)\n", + "flflow.runtime = local_runtime\n", + "flflow.run()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/openfl-tutorials/experimental/401_FedProx_with_Synthetic_nonIID.ipynb b/openfl-tutorials/experimental/402_FedProx_with_Synthetic_nonIID.ipynb similarity index 100% rename from openfl-tutorials/experimental/401_FedProx_with_Synthetic_nonIID.ipynb rename to openfl-tutorials/experimental/402_FedProx_with_Synthetic_nonIID.ipynb diff --git a/openfl-tutorials/experimental/401_MNIST_Aggregator_Validation_Ray_Watermarking.ipynb b/openfl-tutorials/experimental/402_MNIST_Aggregator_Validation_Ray_Watermarking.ipynb similarity index 100% rename from openfl-tutorials/experimental/401_MNIST_Aggregator_Validation_Ray_Watermarking.ipynb rename to openfl-tutorials/experimental/402_MNIST_Aggregator_Validation_Ray_Watermarking.ipynb From 7ed9ebcf45c86f0bab5be81953c007cf173e28fc Mon Sep 17 00:00:00 2001 From: Kush Agrawal Date: Thu, 21 Nov 2024 13:14:48 +0530 Subject: [PATCH 3/3] Add workflow based linear regression tutorial (#1162) Signed-off-by: kagrawa2 --- ...105_Numpy_Linear_Regression_Workflow.ipynb | 575 ++++++++++++++++++ 1 file changed, 575 insertions(+) create mode 100644 openfl-tutorials/experimental/105_Numpy_Linear_Regression_Workflow.ipynb diff --git a/openfl-tutorials/experimental/105_Numpy_Linear_Regression_Workflow.ipynb b/openfl-tutorials/experimental/105_Numpy_Linear_Regression_Workflow.ipynb new file mode 100644 index 0000000000..fe06ac4987 --- /dev/null +++ b/openfl-tutorials/experimental/105_Numpy_Linear_Regression_Workflow.ipynb @@ -0,0 +1,575 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Linear Regression Tutorial using Workflow API\n", + "\n", + "This notebook demonstrates a linear regression tutorial using the Workflow Interface. The key steps include:\n", + "\n", + "1. **Model Definition**:\n", + " - Define a linear regression model with Mean Squared Error (MSE) as the loss function.\n", + "\n", + "2. **Synthetic Data Generation**:\n", + " - Generate synthetic datasets for training and validation.\n", + " - Shard the dataset among multiple collaborators for federated learning.\n", + "\n", + "3. **Federated Learning Workflow**:\n", + " - Define a federated learning workflow using the `FederatedFlow` class.\n", + " - Implement tasks for the aggregator and collaborators, including model validation, training, and aggregation.\n", + "\n", + "4. **Training and Evaluation**:\n", + " - Train the model locally to establish a baseline.\n", + " - Execute the federated learning workflow to train the model across multiple collaborators.\n", + " - Plot and analyze the aggregated model's performance over multiple rounds.\n", + "\n", + "By the end of this tutorial, you will understand how to set up and execute a federated learning using workflow interface for a linear regression model." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Getting Started\n", + "\n", + "First we start by installing the necessary dependencies for the workflow interface and importing the relevant libraries" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Below code will display the print statement output on screen as well\n", + "import sys\n", + "sys.stdout = open('/dev/stdout', 'w')\n", + "\n", + "!pip install git+https://github.com/securefederatedai/openfl.git\n", + "!pip install -r workflow_interface_requirements.txt\n", + "!pip install matplotlib\n", + "\n", + "# Uncomment this if running in Google Colab and set USERNAME if running in docker container.\n", + "#!pip install -r https://raw.githubusercontent.com/intel/openfl/develop/openfl-tutorials/experimental/workflow_interface_requirements.txt\n", + "#import os\n", + "#os.environ[\"USERNAME\"] = \"colab\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import torch as pt\n", + "import torch.utils.data as data\n", + "import torch.nn as nn\n", + "import torch.nn.functional as F\n", + "import torch.optim as optim\n", + "import numpy as np\n", + "from typing import List, Union\n", + "import random\n", + "import matplotlib.pyplot as plt\n", + "%matplotlib inline\n", + "from matplotlib.pylab import rcParams\n", + "\n", + "import warnings\n", + "warnings.filterwarnings(\"ignore\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, define a linear regression model with Mean Squared Error (MSE) as the loss function. Train this linear model locally on a synthetic dataset to establish a baseline solution." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class LinRegLasso:\n", + " def __init__(self, n_feat: int) -> None:\n", + " self.weights = np.ones((n_feat + 1)) # (n_feat + 1,) weights + bias\n", + "\n", + " def predict(self, feature_vector: Union[np.ndarray, List[int]]) -> float:\n", + " '''\n", + " feature_vector may be a list or have shape (n_feat,)\n", + " or it may be a bunch of vectors (n_vec, nfeat)\n", + " '''\n", + " feature_vector = np.array(feature_vector)\n", + " if len(feature_vector.shape) == 1:\n", + " feature_vector = feature_vector[:,np.newaxis]\n", + " assert feature_vector.shape[-1] == self.weights.shape[0] - 1, \\\n", + " f\"sample shape is {feature_vector.shape} and weights shape is f{self.weights}\"\n", + "\n", + " return self.weights @ np.concatenate((feature_vector.T, [[1]*feature_vector.shape[0]]))\n", + "\n", + " def mse(self, X: np.ndarray, Y: np.ndarray) -> float:\n", + " Y_hat = self.predict(X)\n", + " return np.sum((Y - Y_hat)**2) / Y.shape[0]\n", + "\n", + " def _update_weights(self, X: np.ndarray, Y: np.ndarray, lr: float, wd: float) -> None:\n", + " '''\n", + " X: (n_samples, n_features)\n", + " Y: (n_samples,)\n", + " self.weights: (n_features + 1)\n", + " \n", + " Cost function is MSE: (y - W*X - b)**2;\n", + " its derivative with resp to any x is -2*X*(y - W*X - b),\n", + " and with resp to b is -2*(y - W*X - b).\n", + " \n", + " Regularisation function is L1 |W|;\n", + " its derivative is SIGN(w)\n", + " '''\n", + " predictions = self.predict(X)\n", + " error = Y - predictions # (n_samples,)\n", + " X_with_bias = np.concatenate((X.T, [[1]*X.shape[0]])).T\n", + " updates = -2 * X_with_bias.T @ error / Y.shape[0]\n", + " regression_term = np.sign(self.weights)\n", + "\n", + " self.weights = self.weights - lr * updates + wd * regression_term\n", + "\n", + " def fit(self, X: np.ndarray, Y: np.ndarray,\n", + " n_epochs: int, lr: float, wd: float,\n", + " silent: bool=False) -> None:\n", + " for i in range(n_epochs):\n", + " self._update_weights(X, Y, lr, wd)\n", + " mse = self.mse(X, Y)\n", + " if not silent:\n", + " print(f'epoch: {i}, \\t MSE: {mse}')\n", + "\n", + " def train(self, train_data, lr, wd, epochs):\n", + " # Initialize lists to store all data\n", + " all_X = []\n", + " all_Y = []\n", + "\n", + " # Iterate through the DataLoader to get all data\n", + " for batch_idx, (X_batch, Y_batch) in enumerate(train_data):\n", + " all_X.append(X_batch.numpy()) # Convert to NumPy and append\n", + " all_Y.append(Y_batch.numpy()) # Convert to NumPy and append\n", + "\n", + " # Concatenate all batches into NumPy arrays\n", + " X = np.concatenate(all_X, axis=0)\n", + " Y = np.concatenate(all_Y, axis=0)\n", + " self.fit(X, Y, epochs, lr, wd, silent=True)\n", + " train_mse = self.mse(X, Y)\n", + " return train_mse\n", + "\n", + "def validate(my_model, val_loader):\n", + " total_mse = 0.0\n", + " num_samples = 0\n", + "\n", + " for batch_idx, (X, Y) in enumerate(val_loader):\n", + " X = X.numpy()\n", + " Y = Y.numpy()\n", + "\n", + " total_mse += my_model.mse(X, Y) * len(X) # Multiply MSE by batch size\n", + " num_samples += len(X) # Count total number of samples\n", + "\n", + " validation_mse = total_mse / num_samples # Average MSE across all samples\n", + " return validation_mse" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Define input array with angles from 60deg to 300deg converted to radians\n", + "x = np.array([i*np.pi/180 for i in range(60,300,4)])\n", + "np.random.seed(10) # Setting seed for reproducibility\n", + "y = np.sin(x) + np.random.normal(0,0.15,len(x))\n", + "#plt.plot(x,y,'.')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "lr_model = LinRegLasso(1)\n", + "wd = 0.0001\n", + "lr = 0.08\n", + "epochs = 100\n", + "\n", + "print(f\"Initial MSE: {lr_model.mse(x,y)}\")\n", + "lr_model.fit(x[:,np.newaxis],y, epochs, lr, wd, silent=True)\n", + "print(f\"Final MSE: {lr_model.mse(x,y)}\")\n", + "print(f\"Final parameters: {lr_model.weights}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Federated Averaging Function\n", + "\n", + "The following cell defines the `FedAvg` function, which simulates federated averaging for `LinRegLasso` models. This function takes a list of model instances from collaborators and optionally a list of weights for weighted averaging. It returns an updated global model with averaged weights." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def FedAvg(models, weights=None):\n", + " \"\"\"\n", + " Simulates federated averaging for LinRegLasso models.\n", + "\n", + " Args:\n", + " models (list): List of LinRegLasso model instances from collaborators.\n", + " weights (list, optional): List of weights for each model, used for weighted averaging. Defaults to None, indicating equal weights.\n", + "\n", + " Returns:\n", + " LinRegLasso: Updated global model with averaged weights.\n", + " \"\"\"\n", + " new_model = models[0] # Use the first model as a base\n", + " all_weights = [model.weights for model in models] # Get weights from all models\n", + " new_model.weights = np.average(all_weights, axis=0, weights=weights) # Average the weights\n", + " return new_model" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We define a federated learning workflow using the `FederatedFlow` class, which extends `FLSpec`. The workflow includes defining the tasks which will be executed by aggregator and collabarators." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from openfl.experimental.interface import FLSpec, Aggregator, Collaborator\n", + "from openfl.experimental.runtime import LocalRuntime\n", + "from openfl.experimental.placement import aggregator, collaborator\n", + "\n", + "class FederatedFlow(FLSpec):\n", + "\n", + " def __init__(self, model=None, optimizer=None, rounds=3, **kwargs):\n", + " super().__init__(**kwargs)\n", + " self.model = model\n", + " self.rounds = rounds\n", + " self.aggregated_mse_history = []\n", + " self.train_loss_history = []\n", + "\n", + " @aggregator\n", + " def start(self):\n", + " print(f'Performing initialization for model')\n", + " self.collaborators = self.runtime.collaborators\n", + " self.current_round = 0\n", + " self.next(self.aggregated_model_validation, foreach='collaborators')\n", + "\n", + " @collaborator\n", + " def aggregated_model_validation(self):\n", + " print(f'Performing aggregated model validation for collaborator {self.input}')\n", + " self.agg_validation_score = validate(self.model, self.test_loader)\n", + " print(f'{self.input} value of {self.agg_validation_score}')\n", + " self.next(self.train)\n", + "\n", + " @collaborator\n", + " def train(self):\n", + " self.wd = 0.0001\n", + " self.lr = 0.08\n", + " self.epochs = 100\n", + " self.loss = self.model.train(self.train_loader, self.lr, self.wd, self.epochs)\n", + " self.next(self.local_model_validation)\n", + "\n", + " @collaborator\n", + " def local_model_validation(self):\n", + " self.local_validation_score = validate(self.model, self.test_loader)\n", + " print(\n", + " f'Doing local model validation for collaborator {self.input}: {self.local_validation_score}')\n", + " self.next(self.join)\n", + "\n", + " @aggregator\n", + " def join(self, inputs):\n", + " self.average_train_loss = sum(input.loss for input in inputs) / len(inputs)\n", + " self.aggregated_model_mse = sum(\n", + " input.agg_validation_score for input in inputs) / len(inputs)\n", + " self.local_model_mse = sum(\n", + " input.local_validation_score for input in inputs) / len(inputs)\n", + " self.train_loss_history.append(self.average_train_loss)\n", + " self.aggregated_mse_history.append(self.aggregated_model_mse)\n", + " print(f'Average aggregated model validation values = {self.aggregated_model_mse}')\n", + " print(f'Average training loss = {self.average_train_loss}')\n", + " print(f'Average local model validation values = {self.local_model_mse}')\n", + " self.model = FedAvg([input.model for input in inputs])\n", + " \n", + " self.current_round += 1\n", + " if self.current_round < self.rounds:\n", + " self.next(self.aggregated_model_validation,\n", + " foreach='collaborators', exclude=['private'])\n", + " else:\n", + " self.next(self.end)\n", + "\n", + " @aggregator\n", + " def end(self):\n", + " print(f'This is the end of the flow')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we can generate the training and validation datasets and shard the dataset among the collaborators. This allows each collaborator to have their own subset of the data for federated learning. The `split` method provides data loaders for training and testing for each collaborator." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def generate_synthetic(rank: int, n_samples: int, noise: float):\n", + " \"\"\"\n", + " Generate synthetic data for linear regression.\n", + "\n", + " Args:\n", + " rank (int): Seed for random number generation.\n", + " n_samples (int): Number of samples to generate.\n", + " noise (float): Standard deviation of the Gaussian noise added to the y values.\n", + "\n", + " Returns:\n", + " np.ndarray: Generated x values.\n", + " np.ndarray: Generated y values.\n", + " \"\"\"\n", + " np.random.seed(rank)\n", + " n_samples = max(n_samples, 10)\n", + " interval = 240\n", + " x_start = 60\n", + "\n", + " x = np.random.rand(n_samples, 1) * interval + x_start\n", + " x *= np.pi / 180\n", + "\n", + " y = np.sin(x) + np.random.normal(0, noise, size=(n_samples,1))\n", + " y = y.reshape(-1)\n", + "\n", + " return x, y\n", + "\n", + "class SyntheticFederatedDataset:\n", + " def __init__(self, num_collaborators=2, batch_size=1, num_samples=10, **kwargs):\n", + " self.batch_size = batch_size\n", + " X, y = generate_synthetic(rank=42, n_samples=num_samples, noise=0.1)\n", + " X = np.array(X, dtype=np.float32)\n", + " y = np.array(y, dtype=np.float32)\n", + "\n", + " self.X_train_all = X[:int(0.8 * len(X))]\n", + " self.y_train_all = y[:int(0.8 * len(y))]\n", + " \n", + " self.X_test_all = X[int(0.8 * len(X)):]\n", + " self.y_test_all = y[int(0.8 * len(y)):]\n", + "\n", + " min_samples = max(len(self.X_train_all) // num_collaborators, 1)\n", + "\n", + " while len(self.X_test_all) < num_collaborators or any(len(chunk) < 1 for chunk in self.X_train_all):\n", + " X, y = generate_synthetic(rank=42, n_samples=len(self.X_train_all) + len(self.X_valid_all) + 1, noise=0.1)\n", + " X = np.array(X, dtype=np.float32)\n", + " y = np.array(y, dtype=np.float32)\n", + " self.X_train_all = X[:int(0.9 * len(X))]\n", + " self.X_test_all = X[int(0.9 * len(X)):]\n", + " self.y_train_all = y[:int(0.9 * len(y))]\n", + " self.y_test_all = y[int(0.9 * len(y)):]\n", + "\n", + " self.X_train_all = np.array_split(self.X_train_all, num_collaborators)\n", + " self.X_test_all = np.array_split(self.X_test_all, num_collaborators)\n", + " self.y_train_all = np.array_split(self.y_train_all, num_collaborators)\n", + " self.y_test_all = np.array_split(self.y_test_all, num_collaborators)\n", + "\n", + " def split(self, index):\n", + " return {\n", + " \"train_loader\":\n", + " data.DataLoader(\n", + " data.TensorDataset(\n", + " pt.from_numpy(self.X_train_all[index]),\n", + " pt.from_numpy(self.y_train_all[index])\n", + " ),\n", + " batch_size=self.batch_size, shuffle=True\n", + " ),\n", + " \"test_loader\":\n", + " data.DataLoader(\n", + " data.TensorDataset(\n", + " pt.from_numpy(self.X_test_all[index]),\n", + " pt.from_numpy(self.y_test_all[index])\n", + " ),\n", + " batch_size=self.batch_size, shuffle=True\n", + " )\n", + " }" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We will set up the aggregator and collaborators for the federated learning and initialize the local runtime environment. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "aggregator = Aggregator()\n", + "aggregator.private_attributes = {}\n", + "\n", + "collaborator_names = ['Bangalore', 'Paris', 'Texas', 'Seoul']\n", + "collaborators = [Collaborator(name=name) for name in collaborator_names]\n", + "synthetic_federated_dataset = SyntheticFederatedDataset(num_collaborators=len(collaborator_names), num_samples=2000, batch_size=20)\n", + "\n", + "def callable_to_initialize_collaborator_private_attributes(index):\n", + " private_attributes = synthetic_federated_dataset.split(index)\n", + " return private_attributes\n", + "\n", + "collaborators = []\n", + "for idx, collaborator_name in enumerate(collaborator_names):\n", + " collaborators.append(\n", + " Collaborator(\n", + " name=collaborator_name, num_cpus=1.0, num_gpus=0.0,\n", + " private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n", + " index=idx\n", + " )\n", + " )\n", + "\n", + "local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='single_process')\n", + "print(f'Local runtime collaborators = {local_runtime.collaborators}')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now that we have our flow and runtime defined, let's run the experiment!" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "model = lr_model\n", + "optimizer = None\n", + "flflow = FederatedFlow(model, optimizer, rounds=10, checkpoint=True)\n", + "flflow.runtime = local_runtime\n", + "flflow.run()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(f'\\nFinal aggregated MSE for {flflow.rounds} rounds of training: {flflow.aggregated_model_mse}')\n", + "print(f'\\nFinal loss model for {flflow.rounds} rounds of training: {flflow.train_loss_history}')\n", + "print(f'\\nFinal parameters: {flflow.model.weights}')\n", + "print(f'\\n Aggregated model MSE History : {flflow.aggregated_mse_history}')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Plot the training Mean Squared Error (MSE) over the training rounds." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "plt.plot(range(1, flflow.rounds + 1), flflow.train_loss_history)\n", + "plt.xlabel('Epoch')\n", + "plt.ylabel('Loss (MSE)')\n", + "plt.title('Loss Function during Training')\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Plot the aggregated model Mean Squared Error (MSE) over the training rounds." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "plt.plot(range(1, flflow.rounds + 1), flflow.aggregated_mse_history)\n", + "plt.xlabel('Round')\n", + "plt.ylabel('Aggregated Model MSE')\n", + "plt.title('Aggregated Model MSE over Rounds')\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we can validate how our final trained model performs on any random dataset." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "n_cols = 20\n", + "n_samples = 4\n", + "interval = 240\n", + "x_start = 60\n", + "noise = 0.3\n", + "\n", + "X = None\n", + "final_model = flflow.model # Get the final model after training\n", + "for rank in range(n_cols):\n", + " np.random.seed(rank) # Setting seed for reproducibility\n", + " x = np.random.rand(n_samples, 1) * interval + x_start\n", + " x *= np.pi / 180\n", + " X = x if X is None else np.vstack((X,x))\n", + " y = np.sin(x) + np.random.normal(0, noise, size=(n_samples, 1))\n", + " plt.plot(x,y,'+')\n", + " \n", + "X.sort() \n", + "Y_hat = final_model.predict(X)\n", + "plt.plot(X,Y_hat,'--')" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}