Skip to content

Commit

Permalink
Updated workflow_interface tutorials 1001, 104_keras, 401_MNIST
Browse files Browse the repository at this point in the history
Signed-off-by: Ishant Thakare <[email protected]>
  • Loading branch information
ishant162 committed May 31, 2024
1 parent a920ecf commit aa2692b
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -738,69 +738,151 @@
"watermark_retrain_learning_rate = 5e-3"
]
},
{
"cell_type": "markdown",
"id": "5e4ac3ad",
"metadata": {},
"source": [
"You'll notice in the `FederatedFlow_MNIST_Watermarking` definition above that there were certain attributes that the flow was not initialized with, namely the `train_loader` and `test_loader` for each of the collaborators. These are **private_attributes** of the particular participant and (as the name suggests) are accessible ONLY to the particular participant's through its task. Additionally these private attributes are always filtered out of the current state when transferring from collaborator to aggregator, and vice versa.\n",
"\n",
"Users can directly specify a collaborator's private attributes via `collaborator.private_attributes` which is a dictionary where key is name of the attribute and value is the object that is made accessible to collaborator.\n",
"\n",
"For more detailed information on specifying these private attributes, please refer to the first quick start [notebook](https://github.com/intel/openfl/blob/develop/openfl-tutorials/experimental/Workflow_Interface_101_MNIST.ipynb)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c5f6e104",
"id": "e5f10d5d",
"metadata": {},
"outputs": [],
"source": [
"#| export\n",
"\n",
"def callable_to_initialize_aggregator_private_attributes(watermark_data, batch_size):\n",
" return {\n",
" \"watermark_data_loader\": torch.utils.data.DataLoader(\n",
" watermark_data, batch_size=batch_size, shuffle=True\n",
"# Setup Aggregator with private attributes\n",
"aggregator = Aggregator()\n",
"aggregator.private_attributes = {\n",
" \"watermark_data_loader\": torch.utils.data.DataLoader(\n",
" watermark_data, batch_size=batch_size_watermark, shuffle=True\n",
" ),\n",
" \"pretrain_epochs\": 25,\n",
" \"retrain_epochs\": 25,\n",
" \"watermark_acc_threshold\": 0.98,\n",
" \"watermark_pretraining_completed\": False,\n",
"}\n",
"\n",
"# Setup Collaborators with private attributes\n",
"collaborator_names = ['Portland', 'Seattle', 'Chandler','Bangalore']\n",
"print(f\"Creating collaborators {collaborator_names}\")\n",
"collaborators = [Collaborator(name=name) for name in collaborator_names]\n",
"\n",
"for idx, collaborator in enumerate(collaborators):\n",
" local_train = deepcopy(mnist_train)\n",
" local_test = deepcopy(mnist_test)\n",
" local_train.data = mnist_train.data[idx :: len(collaborators)]\n",
" local_train.targets = mnist_train.targets[idx :: len(collaborators)]\n",
" local_test.data = mnist_test.data[idx :: len(collaborators)]\n",
" local_test.targets = mnist_test.targets[idx :: len(collaborators)]\n",
" collaborator.private_attributes = {\n",
" \"train_loader\": torch.utils.data.DataLoader(\n",
" local_train, batch_size=batch_size_train, shuffle=True\n",
" ),\n",
" \"test_loader\": torch.utils.data.DataLoader(\n",
" local_test, batch_size=batch_size_train, shuffle=True\n",
" ),\n",
" \"pretrain_epochs\": 25,\n",
" \"retrain_epochs\": 25,\n",
" \"watermark_acc_threshold\": 0.98,\n",
" }\n",
"\n",
"# Setup Aggregator private attributes via callable function\n",
"aggregator = Aggregator(\n",
" name=\"agg\",\n",
" private_attributes_callable=callable_to_initialize_aggregator_private_attributes,\n",
" watermark_data=watermark_data,\n",
" batch_size=batch_size_watermark,\n",
" )\n",
"local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators)\n",
"print(f\"Local runtime collaborators = {local_runtime.collaborators}\")"
]
},
{
"cell_type": "markdown",
"id": "5177f137",
"metadata": {},
"source": [
"### Alternate method to specify private attributes\n",
"\n",
"collaborator_names = [\n",
" \"Portland\",\n",
" \"Seattle\",\n",
" \"Chandler\",\n",
" \"Bangalore\",\n",
" \"New Delhi\",\n",
"]\n",
"n_collaborators = len(collaborator_names)\n",
"\n",
"def callable_to_initialize_collaborator_private_attributes(index, n_collaborators, batch_size, train_dataset, test_dataset):\n",
" train = deepcopy(train_dataset)\n",
" test = deepcopy(test_dataset)\n",
" train.data = train_dataset.data[index::n_collaborators]\n",
" train.targets = train_dataset.targets[index::n_collaborators]\n",
" test.data = test_dataset.data[index::n_collaborators]\n",
" test.targets = test_dataset.targets[index::n_collaborators]\n",
"\n",
" return {\n",
" \"train_loader\": torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True),\n",
" \"test_loader\": torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=True),\n",
" }\n",
"There is another way users can initialize private attributes in which private attributes need to be set using a (user defined) callback function while instantiating the participant. The callback function returns the private attributes (`train_loader` & `test_loader`) in form of a dictionary where the key is the attribute name, and the value is the object that will be made accessible to that participant's task.\n",
"\n",
"# Setup Collaborators private attributes via callable function\n",
"collaborators = []\n",
"for idx, collaborator_name in enumerate(collaborator_names):\n",
" collaborators.append(\n",
" Collaborator(\n",
" name=collaborator_name, num_cpus=0, num_gpus=0,\n",
" private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n",
" index=idx, n_collaborators=n_collaborators,\n",
" train_dataset=mnist_train, test_dataset=mnist_test, batch_size=64\n",
" )\n",
" )\n",
"In the following example callback function, `callable_to_initialize_collaborator_private_attributes`, returns the private attributes `train_loader` and `test_loader` of the collaborator.\n",
"\n",
"local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend=\"ray\")\n",
"print(f\"Local runtime collaborators = {local_runtime.collaborators}\")"
"Detailed information on specifying private attributes using callback function is provided in this [documentation](https://github.com/intel/openfl/blob/develop/docs/about/features_index/workflowinterface.rst). \n",
"\n",
">To use this method, uncomment the following cell and comment out the previous cell which initializes private attributes directly."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c5f6e104",
"metadata": {},
"outputs": [],
"source": [
"# #| export\n",
"\n",
"# def callable_to_initialize_aggregator_private_attributes(watermark_data, batch_size):\n",
"# return {\n",
"# \"watermark_data_loader\": torch.utils.data.DataLoader(\n",
"# watermark_data, batch_size=batch_size, shuffle=True\n",
"# ),\n",
"# \"pretrain_epochs\": 25,\n",
"# \"retrain_epochs\": 25,\n",
"# \"watermark_acc_threshold\": 0.98,\n",
"# }\n",
"\n",
"# # Setup Aggregator private attributes via callable function\n",
"# aggregator = Aggregator(\n",
"# name=\"agg\",\n",
"# private_attributes_callable=callable_to_initialize_aggregator_private_attributes,\n",
"# watermark_data=watermark_data,\n",
"# batch_size=batch_size_watermark,\n",
"# )\n",
"\n",
"# collaborator_names = [\n",
"# \"Portland\",\n",
"# \"Seattle\",\n",
"# \"Chandler\",\n",
"# \"Bangalore\",\n",
"# \"New Delhi\",\n",
"# ]\n",
"# n_collaborators = len(collaborator_names)\n",
"\n",
"# def callable_to_initialize_collaborator_private_attributes(index, n_collaborators, batch_size, train_dataset, test_dataset):\n",
"# train = deepcopy(train_dataset)\n",
"# test = deepcopy(test_dataset)\n",
"# train.data = train_dataset.data[index::n_collaborators]\n",
"# train.targets = train_dataset.targets[index::n_collaborators]\n",
"# test.data = test_dataset.data[index::n_collaborators]\n",
"# test.targets = test_dataset.targets[index::n_collaborators]\n",
"\n",
"# return {\n",
"# \"train_loader\": torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True),\n",
"# \"test_loader\": torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=True),\n",
"# }\n",
"\n",
"# # Setup Collaborators private attributes via callable function\n",
"# collaborators = []\n",
"# for idx, collaborator_name in enumerate(collaborator_names):\n",
"# collaborators.append(\n",
"# Collaborator(\n",
"# name=collaborator_name, num_cpus=0, num_gpus=0,\n",
"# private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n",
"# index=idx, n_collaborators=n_collaborators,\n",
"# train_dataset=mnist_train, test_dataset=mnist_test, batch_size=64\n",
"# )\n",
"# )\n",
"\n",
"# local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend=\"ray\")\n",
"# print(f\"Local runtime collaborators = {local_runtime.collaborators}\")"
]
},
{
"cell_type": "markdown",
"id": "2a4ec6cc",
"metadata": {},
"source": [
">NOTE: If both methods for specifying private attributes are used, the private attributes will only be set by the latter method. Additionally, the code for both methods will be included in your generated workspace."
]
},
{
Expand Down Expand Up @@ -901,91 +983,91 @@
"id": "ff55808c-c340-476b-a543-58d43451c54e",
"metadata": {},
"source": [
"**Workspace Activation and Creation**\r\n",
"1. Activate the experimental aggregator-based workflow:\r\n",
"\r\n",
" `fx experimental activate`\r\n",
"\r\n",
" This will create an 'experimental' directory under ~/.openfl/\r\n",
"3. Create a workspace using the custom template:\r\n",
"\r\n",
" `fx workspace create --prefix workspace_path --custom_template /home/$USER/generated-workspace`\r\n",
"4. Change to the workspace directory:\r\n",
"\r\n",
" `cd workspace_path`\r\n",
"\r\n",
"**Workspace Initialization and Certification**\r\n",
"1. Initialize the FL plan and auto-populate the fully qualified domain name (FQDN) of the aggregator node:\r\n",
"\r\n",
" `fx plan initialize`\r\n",
"2. Certify the workspace:\r\n",
"\r\n",
" `fx workspace certify`\r\n",
" \r\n",
"**Aggregator Setup and Workspace Export**\r\n",
"1. Run the aggregator certificate creation command:\r\n",
"\r\n",
" `fx aggregator generate-cert-request`\r\n",
"\r\n",
" `fx aggregator certify`\r\n",
"2. Export the workspace for collaboration:\r\n",
"\r\n",
" `fx workspace export`\r\n",
" \r\n",
"**Collaborator Node Setup**\r\n",
"\r\n",
"***On the Collaborator Node:***\r\n",
"\r\n",
"1. Copy the workspace archive from the aggregator node to the collaborator nodes. Import the workspace archive:\r\n",
"\r\n",
" `fx workspace import --archive WORKSPACE.zip`\r\n",
" \r\n",
" `cd workspace_path`\r\n",
"3. Generate a collaborator certificate request:\r\n",
"\r\n",
" `fx collaborator generate-cert-request -n {COL_LABEL}`\r\n",
"\r\n",
"***On the Aggregator Node (Certificate Authority):***\r\n",
"\r\n",
"3. Sign the Collaborator Certificate Signing Request (CSR) Package from collaborator nodes:\r\n",
"\r\n",
" `fx collaborator certify --request-pkg /PATH/TO/col_{COL_LABEL}_to_agg_cert_request.zip`\r\n",
"\r\n",
"***On the Collaborator Node:***\r\n",
"\r\n",
"4. Import the signed certificate and certificate chain into the workspace:\r\n",
"\r\n",
" `fx collaborator certify --import /PATH/TO/agg_to_col_{COL_LABEL}_signed_cert.zip`\r\n",
" \r\n",
"**Final Workspace Activation**\r\n",
"***On the Aggregator Node:***\r\n",
"\r\n",
"1. Start the Aggregator:\r\n",
"\r\n",
" `fx aggregator start`\r\n",
" \r\n",
" The Aggregator is now running and waiting for Collaborators to connect.\r\n",
"\r\n",
"***On the Collaborator Nodes:***\r\n",
"\r\n",
"2. Run the Collaborator:\r\n",
"\r\n",
" `fx collaborator start -n {COL_LABEL}`\r\n",
"\r\n",
"**Workspace Deactivation**\r\n",
"1. To deactivate the experimental aggregator-based workflow and switch back to original aggregator-based workflow:\r\n",
"\r\n",
" `fx experimental deactivate`\r\n",
"\r\n",
" This will remove the 'experimental' directory under ~/.openfl/\r\n"
"**Workspace Activation and Creation**\n",
"1. Activate the experimental aggregator-based workflow:\n",
"\n",
" `fx experimental activate`\n",
"\n",
" This will create an 'experimental' directory under ~/.openfl/\n",
"3. Create a workspace using the custom template:\n",
"\n",
" `fx workspace create --prefix workspace_path --custom_template /home/$USER/generated-workspace`\n",
"4. Change to the workspace directory:\n",
"\n",
" `cd workspace_path`\n",
"\n",
"**Workspace Initialization and Certification**\n",
"1. Initialize the FL plan and auto-populate the fully qualified domain name (FQDN) of the aggregator node:\n",
"\n",
" `fx plan initialize`\n",
"2. Certify the workspace:\n",
"\n",
" `fx workspace certify`\n",
" \n",
"**Aggregator Setup and Workspace Export**\n",
"1. Run the aggregator certificate creation command:\n",
"\n",
" `fx aggregator generate-cert-request`\n",
"\n",
" `fx aggregator certify`\n",
"2. Export the workspace for collaboration:\n",
"\n",
" `fx workspace export`\n",
" \n",
"**Collaborator Node Setup**\n",
"\n",
"***On the Collaborator Node:***\n",
"\n",
"1. Copy the workspace archive from the aggregator node to the collaborator nodes. Import the workspace archive:\n",
"\n",
" `fx workspace import --archive WORKSPACE.zip`\n",
" \n",
" `cd workspace_path`\n",
"3. Generate a collaborator certificate request:\n",
"\n",
" `fx collaborator generate-cert-request -n {COL_LABEL}`\n",
"\n",
"***On the Aggregator Node (Certificate Authority):***\n",
"\n",
"3. Sign the Collaborator Certificate Signing Request (CSR) Package from collaborator nodes:\n",
"\n",
" `fx collaborator certify --request-pkg /PATH/TO/col_{COL_LABEL}_to_agg_cert_request.zip`\n",
"\n",
"***On the Collaborator Node:***\n",
"\n",
"4. Import the signed certificate and certificate chain into the workspace:\n",
"\n",
" `fx collaborator certify --import /PATH/TO/agg_to_col_{COL_LABEL}_signed_cert.zip`\n",
" \n",
"**Final Workspace Activation**\n",
"***On the Aggregator Node:***\n",
"\n",
"1. Start the Aggregator:\n",
"\n",
" `fx aggregator start`\n",
" \n",
" The Aggregator is now running and waiting for Collaborators to connect.\n",
"\n",
"***On the Collaborator Nodes:***\n",
"\n",
"2. Run the Collaborator:\n",
"\n",
" `fx collaborator start -n {COL_LABEL}`\n",
"\n",
"**Workspace Deactivation**\n",
"1. To deactivate the experimental aggregator-based workflow and switch back to original aggregator-based workflow:\n",
"\n",
" `fx experimental deactivate`\n",
"\n",
" This will remove the 'experimental' directory under ~/.openfl/\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "v_o",
"display_name": "openfl-wip",
"language": "python",
"name": "v_o"
"name": "python3"
},
"language_info": {
"codemirror_mode": {
Expand All @@ -997,7 +1079,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
"version": "3.8.19"
}
},
"nbformat": 4,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"source": [
"!pip install git+https://github.com/intel/openfl.git\n",
"!pip install -r requirements_workflow_interface.txt\n",
"!pip install tensorflow==2.7.0\n",
"!pip install tensorflow\n",
"\n",
"# Uncomment this if running in Google Colab\n",
"# !pip install -r https://raw.githubusercontent.com/intel/openfl/develop/openfl-tutorials/experimental/requirements_workflow_interface.txt\n",
Expand Down Expand Up @@ -349,7 +349,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.18"
"version": "3.8.19"
},
"orig_nbformat": 4
},
Expand Down
Loading

0 comments on commit aa2692b

Please sign in to comment.