From aa2692b6ef49b0bef68c5ab1746f63b627117735 Mon Sep 17 00:00:00 2001 From: Ishant Thakare Date: Fri, 31 May 2024 13:28:12 +0530 Subject: [PATCH] Updated workflow_interface tutorials 1001, 104_keras, 401_MNIST Signed-off-by: Ishant Thakare --- ...kspace_Creation_from_JupyterNotebook.ipynb | 340 +++++++++++------- ...w_Interface_104_Keras_MNIST_with_GPU.ipynb | 4 +- ...gregator_Validation_Ray_Watermarking.ipynb | 16 +- .../104_keras_mnist/requirements.txt | 2 +- 4 files changed, 223 insertions(+), 139 deletions(-) diff --git a/openfl-tutorials/experimental/Workflow_Interface_1001_Workspace_Creation_from_JupyterNotebook.ipynb b/openfl-tutorials/experimental/Workflow_Interface_1001_Workspace_Creation_from_JupyterNotebook.ipynb index dd3926f52e..dd286a8a80 100644 --- a/openfl-tutorials/experimental/Workflow_Interface_1001_Workspace_Creation_from_JupyterNotebook.ipynb +++ b/openfl-tutorials/experimental/Workflow_Interface_1001_Workspace_Creation_from_JupyterNotebook.ipynb @@ -738,69 +738,151 @@ "watermark_retrain_learning_rate = 5e-3" ] }, + { + "cell_type": "markdown", + "id": "5e4ac3ad", + "metadata": {}, + "source": [ + "You'll notice in the `FederatedFlow_MNIST_Watermarking` definition above that there were certain attributes that the flow was not initialized with, namely the `train_loader` and `test_loader` for each of the collaborators. These are **private_attributes** of the particular participant and (as the name suggests) are accessible ONLY to the particular participant's through its task. Additionally these private attributes are always filtered out of the current state when transferring from collaborator to aggregator, and vice versa.\n", + "\n", + "Users can directly specify a collaborator's private attributes via `collaborator.private_attributes` which is a dictionary where key is name of the attribute and value is the object that is made accessible to collaborator.\n", + "\n", + "For more detailed information on specifying these private attributes, please refer to the first quick start [notebook](https://github.com/intel/openfl/blob/develop/openfl-tutorials/experimental/Workflow_Interface_101_MNIST.ipynb)" + ] + }, { "cell_type": "code", "execution_count": null, - "id": "c5f6e104", + "id": "e5f10d5d", "metadata": {}, "outputs": [], "source": [ "#| export\n", "\n", - "def callable_to_initialize_aggregator_private_attributes(watermark_data, batch_size):\n", - " return {\n", - " \"watermark_data_loader\": torch.utils.data.DataLoader(\n", - " watermark_data, batch_size=batch_size, shuffle=True\n", + "# Setup Aggregator with private attributes\n", + "aggregator = Aggregator()\n", + "aggregator.private_attributes = {\n", + " \"watermark_data_loader\": torch.utils.data.DataLoader(\n", + " watermark_data, batch_size=batch_size_watermark, shuffle=True\n", + " ),\n", + " \"pretrain_epochs\": 25,\n", + " \"retrain_epochs\": 25,\n", + " \"watermark_acc_threshold\": 0.98,\n", + " \"watermark_pretraining_completed\": False,\n", + "}\n", + "\n", + "# Setup Collaborators with private attributes\n", + "collaborator_names = ['Portland', 'Seattle', 'Chandler','Bangalore']\n", + "print(f\"Creating collaborators {collaborator_names}\")\n", + "collaborators = [Collaborator(name=name) for name in collaborator_names]\n", + "\n", + "for idx, collaborator in enumerate(collaborators):\n", + " local_train = deepcopy(mnist_train)\n", + " local_test = deepcopy(mnist_test)\n", + " local_train.data = mnist_train.data[idx :: len(collaborators)]\n", + " local_train.targets = mnist_train.targets[idx :: len(collaborators)]\n", + " local_test.data = mnist_test.data[idx :: len(collaborators)]\n", + " local_test.targets = mnist_test.targets[idx :: len(collaborators)]\n", + " collaborator.private_attributes = {\n", + " \"train_loader\": torch.utils.data.DataLoader(\n", + " local_train, batch_size=batch_size_train, shuffle=True\n", + " ),\n", + " \"test_loader\": torch.utils.data.DataLoader(\n", + " local_test, batch_size=batch_size_train, shuffle=True\n", " ),\n", - " \"pretrain_epochs\": 25,\n", - " \"retrain_epochs\": 25,\n", - " \"watermark_acc_threshold\": 0.98,\n", " }\n", "\n", - "# Setup Aggregator private attributes via callable function\n", - "aggregator = Aggregator(\n", - " name=\"agg\",\n", - " private_attributes_callable=callable_to_initialize_aggregator_private_attributes,\n", - " watermark_data=watermark_data,\n", - " batch_size=batch_size_watermark,\n", - " )\n", + "local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators)\n", + "print(f\"Local runtime collaborators = {local_runtime.collaborators}\")" + ] + }, + { + "cell_type": "markdown", + "id": "5177f137", + "metadata": {}, + "source": [ + "### Alternate method to specify private attributes\n", "\n", - "collaborator_names = [\n", - " \"Portland\",\n", - " \"Seattle\",\n", - " \"Chandler\",\n", - " \"Bangalore\",\n", - " \"New Delhi\",\n", - "]\n", - "n_collaborators = len(collaborator_names)\n", - "\n", - "def callable_to_initialize_collaborator_private_attributes(index, n_collaborators, batch_size, train_dataset, test_dataset):\n", - " train = deepcopy(train_dataset)\n", - " test = deepcopy(test_dataset)\n", - " train.data = train_dataset.data[index::n_collaborators]\n", - " train.targets = train_dataset.targets[index::n_collaborators]\n", - " test.data = test_dataset.data[index::n_collaborators]\n", - " test.targets = test_dataset.targets[index::n_collaborators]\n", - "\n", - " return {\n", - " \"train_loader\": torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True),\n", - " \"test_loader\": torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=True),\n", - " }\n", + "There is another way users can initialize private attributes in which private attributes need to be set using a (user defined) callback function while instantiating the participant. The callback function returns the private attributes (`train_loader` & `test_loader`) in form of a dictionary where the key is the attribute name, and the value is the object that will be made accessible to that participant's task.\n", "\n", - "# Setup Collaborators private attributes via callable function\n", - "collaborators = []\n", - "for idx, collaborator_name in enumerate(collaborator_names):\n", - " collaborators.append(\n", - " Collaborator(\n", - " name=collaborator_name, num_cpus=0, num_gpus=0,\n", - " private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n", - " index=idx, n_collaborators=n_collaborators,\n", - " train_dataset=mnist_train, test_dataset=mnist_test, batch_size=64\n", - " )\n", - " )\n", + "In the following example callback function, `callable_to_initialize_collaborator_private_attributes`, returns the private attributes `train_loader` and `test_loader` of the collaborator.\n", "\n", - "local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend=\"ray\")\n", - "print(f\"Local runtime collaborators = {local_runtime.collaborators}\")" + "Detailed information on specifying private attributes using callback function is provided in this [documentation](https://github.com/intel/openfl/blob/develop/docs/about/features_index/workflowinterface.rst). \n", + "\n", + ">To use this method, uncomment the following cell and comment out the previous cell which initializes private attributes directly." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c5f6e104", + "metadata": {}, + "outputs": [], + "source": [ + "# #| export\n", + "\n", + "# def callable_to_initialize_aggregator_private_attributes(watermark_data, batch_size):\n", + "# return {\n", + "# \"watermark_data_loader\": torch.utils.data.DataLoader(\n", + "# watermark_data, batch_size=batch_size, shuffle=True\n", + "# ),\n", + "# \"pretrain_epochs\": 25,\n", + "# \"retrain_epochs\": 25,\n", + "# \"watermark_acc_threshold\": 0.98,\n", + "# }\n", + "\n", + "# # Setup Aggregator private attributes via callable function\n", + "# aggregator = Aggregator(\n", + "# name=\"agg\",\n", + "# private_attributes_callable=callable_to_initialize_aggregator_private_attributes,\n", + "# watermark_data=watermark_data,\n", + "# batch_size=batch_size_watermark,\n", + "# )\n", + "\n", + "# collaborator_names = [\n", + "# \"Portland\",\n", + "# \"Seattle\",\n", + "# \"Chandler\",\n", + "# \"Bangalore\",\n", + "# \"New Delhi\",\n", + "# ]\n", + "# n_collaborators = len(collaborator_names)\n", + "\n", + "# def callable_to_initialize_collaborator_private_attributes(index, n_collaborators, batch_size, train_dataset, test_dataset):\n", + "# train = deepcopy(train_dataset)\n", + "# test = deepcopy(test_dataset)\n", + "# train.data = train_dataset.data[index::n_collaborators]\n", + "# train.targets = train_dataset.targets[index::n_collaborators]\n", + "# test.data = test_dataset.data[index::n_collaborators]\n", + "# test.targets = test_dataset.targets[index::n_collaborators]\n", + "\n", + "# return {\n", + "# \"train_loader\": torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True),\n", + "# \"test_loader\": torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=True),\n", + "# }\n", + "\n", + "# # Setup Collaborators private attributes via callable function\n", + "# collaborators = []\n", + "# for idx, collaborator_name in enumerate(collaborator_names):\n", + "# collaborators.append(\n", + "# Collaborator(\n", + "# name=collaborator_name, num_cpus=0, num_gpus=0,\n", + "# private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n", + "# index=idx, n_collaborators=n_collaborators,\n", + "# train_dataset=mnist_train, test_dataset=mnist_test, batch_size=64\n", + "# )\n", + "# )\n", + "\n", + "# local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend=\"ray\")\n", + "# print(f\"Local runtime collaborators = {local_runtime.collaborators}\")" + ] + }, + { + "cell_type": "markdown", + "id": "2a4ec6cc", + "metadata": {}, + "source": [ + ">NOTE: If both methods for specifying private attributes are used, the private attributes will only be set by the latter method. Additionally, the code for both methods will be included in your generated workspace." ] }, { @@ -901,91 +983,91 @@ "id": "ff55808c-c340-476b-a543-58d43451c54e", "metadata": {}, "source": [ - "**Workspace Activation and Creation**\r\n", - "1. Activate the experimental aggregator-based workflow:\r\n", - "\r\n", - " `fx experimental activate`\r\n", - "\r\n", - " This will create an 'experimental' directory under ~/.openfl/\r\n", - "3. Create a workspace using the custom template:\r\n", - "\r\n", - " `fx workspace create --prefix workspace_path --custom_template /home/$USER/generated-workspace`\r\n", - "4. Change to the workspace directory:\r\n", - "\r\n", - " `cd workspace_path`\r\n", - "\r\n", - "**Workspace Initialization and Certification**\r\n", - "1. Initialize the FL plan and auto-populate the fully qualified domain name (FQDN) of the aggregator node:\r\n", - "\r\n", - " `fx plan initialize`\r\n", - "2. Certify the workspace:\r\n", - "\r\n", - " `fx workspace certify`\r\n", - " \r\n", - "**Aggregator Setup and Workspace Export**\r\n", - "1. Run the aggregator certificate creation command:\r\n", - "\r\n", - " `fx aggregator generate-cert-request`\r\n", - "\r\n", - " `fx aggregator certify`\r\n", - "2. Export the workspace for collaboration:\r\n", - "\r\n", - " `fx workspace export`\r\n", - " \r\n", - "**Collaborator Node Setup**\r\n", - "\r\n", - "***On the Collaborator Node:***\r\n", - "\r\n", - "1. Copy the workspace archive from the aggregator node to the collaborator nodes. Import the workspace archive:\r\n", - "\r\n", - " `fx workspace import --archive WORKSPACE.zip`\r\n", - " \r\n", - " `cd workspace_path`\r\n", - "3. Generate a collaborator certificate request:\r\n", - "\r\n", - " `fx collaborator generate-cert-request -n {COL_LABEL}`\r\n", - "\r\n", - "***On the Aggregator Node (Certificate Authority):***\r\n", - "\r\n", - "3. Sign the Collaborator Certificate Signing Request (CSR) Package from collaborator nodes:\r\n", - "\r\n", - " `fx collaborator certify --request-pkg /PATH/TO/col_{COL_LABEL}_to_agg_cert_request.zip`\r\n", - "\r\n", - "***On the Collaborator Node:***\r\n", - "\r\n", - "4. Import the signed certificate and certificate chain into the workspace:\r\n", - "\r\n", - " `fx collaborator certify --import /PATH/TO/agg_to_col_{COL_LABEL}_signed_cert.zip`\r\n", - " \r\n", - "**Final Workspace Activation**\r\n", - "***On the Aggregator Node:***\r\n", - "\r\n", - "1. Start the Aggregator:\r\n", - "\r\n", - " `fx aggregator start`\r\n", - " \r\n", - " The Aggregator is now running and waiting for Collaborators to connect.\r\n", - "\r\n", - "***On the Collaborator Nodes:***\r\n", - "\r\n", - "2. Run the Collaborator:\r\n", - "\r\n", - " `fx collaborator start -n {COL_LABEL}`\r\n", - "\r\n", - "**Workspace Deactivation**\r\n", - "1. To deactivate the experimental aggregator-based workflow and switch back to original aggregator-based workflow:\r\n", - "\r\n", - " `fx experimental deactivate`\r\n", - "\r\n", - " This will remove the 'experimental' directory under ~/.openfl/\r\n" + "**Workspace Activation and Creation**\n", + "1. Activate the experimental aggregator-based workflow:\n", + "\n", + " `fx experimental activate`\n", + "\n", + " This will create an 'experimental' directory under ~/.openfl/\n", + "3. Create a workspace using the custom template:\n", + "\n", + " `fx workspace create --prefix workspace_path --custom_template /home/$USER/generated-workspace`\n", + "4. Change to the workspace directory:\n", + "\n", + " `cd workspace_path`\n", + "\n", + "**Workspace Initialization and Certification**\n", + "1. Initialize the FL plan and auto-populate the fully qualified domain name (FQDN) of the aggregator node:\n", + "\n", + " `fx plan initialize`\n", + "2. Certify the workspace:\n", + "\n", + " `fx workspace certify`\n", + " \n", + "**Aggregator Setup and Workspace Export**\n", + "1. Run the aggregator certificate creation command:\n", + "\n", + " `fx aggregator generate-cert-request`\n", + "\n", + " `fx aggregator certify`\n", + "2. Export the workspace for collaboration:\n", + "\n", + " `fx workspace export`\n", + " \n", + "**Collaborator Node Setup**\n", + "\n", + "***On the Collaborator Node:***\n", + "\n", + "1. Copy the workspace archive from the aggregator node to the collaborator nodes. Import the workspace archive:\n", + "\n", + " `fx workspace import --archive WORKSPACE.zip`\n", + " \n", + " `cd workspace_path`\n", + "3. Generate a collaborator certificate request:\n", + "\n", + " `fx collaborator generate-cert-request -n {COL_LABEL}`\n", + "\n", + "***On the Aggregator Node (Certificate Authority):***\n", + "\n", + "3. Sign the Collaborator Certificate Signing Request (CSR) Package from collaborator nodes:\n", + "\n", + " `fx collaborator certify --request-pkg /PATH/TO/col_{COL_LABEL}_to_agg_cert_request.zip`\n", + "\n", + "***On the Collaborator Node:***\n", + "\n", + "4. Import the signed certificate and certificate chain into the workspace:\n", + "\n", + " `fx collaborator certify --import /PATH/TO/agg_to_col_{COL_LABEL}_signed_cert.zip`\n", + " \n", + "**Final Workspace Activation**\n", + "***On the Aggregator Node:***\n", + "\n", + "1. Start the Aggregator:\n", + "\n", + " `fx aggregator start`\n", + " \n", + " The Aggregator is now running and waiting for Collaborators to connect.\n", + "\n", + "***On the Collaborator Nodes:***\n", + "\n", + "2. Run the Collaborator:\n", + "\n", + " `fx collaborator start -n {COL_LABEL}`\n", + "\n", + "**Workspace Deactivation**\n", + "1. To deactivate the experimental aggregator-based workflow and switch back to original aggregator-based workflow:\n", + "\n", + " `fx experimental deactivate`\n", + "\n", + " This will remove the 'experimental' directory under ~/.openfl/\n" ] } ], "metadata": { "kernelspec": { - "display_name": "v_o", + "display_name": "openfl-wip", "language": "python", - "name": "v_o" + "name": "python3" }, "language_info": { "codemirror_mode": { @@ -997,7 +1079,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.10" + "version": "3.8.19" } }, "nbformat": 4, diff --git a/openfl-tutorials/experimental/Workflow_Interface_104_Keras_MNIST_with_GPU.ipynb b/openfl-tutorials/experimental/Workflow_Interface_104_Keras_MNIST_with_GPU.ipynb index 0845647d67..a677e8b8c8 100644 --- a/openfl-tutorials/experimental/Workflow_Interface_104_Keras_MNIST_with_GPU.ipynb +++ b/openfl-tutorials/experimental/Workflow_Interface_104_Keras_MNIST_with_GPU.ipynb @@ -37,7 +37,7 @@ "source": [ "!pip install git+https://github.com/intel/openfl.git\n", "!pip install -r requirements_workflow_interface.txt\n", - "!pip install tensorflow==2.7.0\n", + "!pip install tensorflow\n", "\n", "# Uncomment this if running in Google Colab\n", "# !pip install -r https://raw.githubusercontent.com/intel/openfl/develop/openfl-tutorials/experimental/requirements_workflow_interface.txt\n", @@ -349,7 +349,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.18" + "version": "3.8.19" }, "orig_nbformat": 4 }, diff --git a/openfl-tutorials/experimental/Workflow_Interface_401_MNIST_Aggregator_Validation_Ray_Watermarking.ipynb b/openfl-tutorials/experimental/Workflow_Interface_401_MNIST_Aggregator_Validation_Ray_Watermarking.ipynb index f8f782576f..8fdcb0e435 100644 --- a/openfl-tutorials/experimental/Workflow_Interface_401_MNIST_Aggregator_Validation_Ray_Watermarking.ipynb +++ b/openfl-tutorials/experimental/Workflow_Interface_401_MNIST_Aggregator_Validation_Ray_Watermarking.ipynb @@ -426,7 +426,9 @@ " state_dicts = [model.state_dict() for model in models]\n", " state_dict = new_model.state_dict()\n", " for key in models[1].state_dict():\n", - " state_dict[key] = np.average([state[key] for state in state_dicts],axis=0,weights=weights)\n", + " state_dict[key] = torch.from_numpy(np.average([state[key].numpy() for state in state_dicts],\n", + " axis=0, \n", + " weights=weights))\n", " new_model.load_state_dict(state_dict)\n", " return new_model" ] @@ -558,7 +560,7 @@ " exclude=[\"watermark_pretrain_optimizer\", \"watermark_retrain_optimizer\"],\n", " )\n", "\n", - " @collaborator(num_gpus=1)\n", + " @collaborator\n", " def aggregated_model_validation(self):\n", " \"\"\"\n", " Perform Aggregated Model validation on Collaborators.\n", @@ -570,7 +572,7 @@ "\n", " self.next(self.train)\n", "\n", - " @collaborator(num_gpus=1)\n", + " @collaborator\n", " def train(self):\n", " \"\"\"\n", " Train model on Local collab dataset.\n", @@ -594,7 +596,7 @@ " self.next(self.local_model_validation)\n", "\n", "\n", - " @collaborator(num_gpus=1)\n", + " @collaborator\n", " def local_model_validation(self):\n", " \"\"\"\n", " Validate locally trained model.\n", @@ -765,7 +767,7 @@ "outputs": [], "source": [ "# Setup Aggregator with private attributes\n", - "aggregator = Aggregator()\n", + "aggregator = Aggregator(num_gpus=0.0)\n", "\n", "# Setup Collaborators with private attributes\n", "collaborator_names = [\n", @@ -776,7 +778,7 @@ " \"New Delhi\",\n", "]\n", "print(f\"Creating collaborators {collaborator_names}\")\n", - "collaborators = [Collaborator(name=name) for name in collaborator_names]\n", + "collaborators = [Collaborator(name=name, num_gpus=0.0) for name in collaborator_names]\n", "\n", "aggregator_test = deepcopy(mnist_test)\n", "aggregator_test.targets = mnist_test.targets[len(collaborators)::len(collaborators)+1]\n", @@ -900,7 +902,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.6" + "version": "3.8.19" }, "vscode": { "interpreter": { diff --git a/openfl-workspace/experimental/104_keras_mnist/requirements.txt b/openfl-workspace/experimental/104_keras_mnist/requirements.txt index 8e961eac43..0f57144081 100644 --- a/openfl-workspace/experimental/104_keras_mnist/requirements.txt +++ b/openfl-workspace/experimental/104_keras_mnist/requirements.txt @@ -1 +1 @@ -tensorflow==2.11.1 +tensorflow