diff --git a/docs/about/features_index/workflowinterface.rst b/docs/about/features_index/workflowinterface.rst index d09655f96c..942a6c97be 100644 --- a/docs/about/features_index/workflowinterface.rst +++ b/docs/about/features_index/workflowinterface.rst @@ -142,10 +142,40 @@ The workflow interface formulates the experiment as a series of tasks, or a flow Runtimes ======== -A :code:`Runtime` defines where the flow will be executed, who the participants are in the experiment, and the private information that each participant has access to. In this experimental release, single node execution is supported using the :code:`LocalRuntime`. Let's see how a :code:`LocalRuntime` is created: +A :code:`Runtime` defines where the flow will be executed, who the participants are in the experiment, and the private information that each participant has access to. In this experimental release, single node execution is supported using the :code:`LocalRuntime`. Let's see how a :code:`LocalRuntime` is created. .. code-block:: python + + # Setup participants + aggregator = Aggregator() + aggregator.private_attributes = {} + + # Setup collaborators with private attributes + collaborator_names = ['Portland', 'Seattle', 'Chandler','Bangalore'] + collaborators = [Collaborator(name=name) for name in collaborator_names] + for idx, collaborator in enumerate(collaborators): + local_train = deepcopy(mnist_train) + local_test = deepcopy(mnist_test) + local_train.data = mnist_train.data[idx::len(collaborators)] + local_train.targets = mnist_train.targets[idx::len(collaborators)] + local_test.data = mnist_test.data[idx::len(collaborators)] + local_test.targets = mnist_test.targets[idx::len(collaborators)] + collaborator.private_attributes = { + 'train_loader': torch.utils.data.DataLoader(local_train,batch_size=batch_size_train, shuffle=True), + 'test_loader': torch.utils.data.DataLoader(local_test,batch_size=batch_size_train, shuffle=True) + } + + local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='single_process') + +Let's break this down, starting with the :code:`Aggregator` and :code:`Collaborator` components. These components represent the *Participants* in a Federated Learning experiment. Each participant has its own set of *private attributes*. As the name suggests, these *private attributes* represent private information they do not want to share with others, and will be filtered out when there is a transition from the aggregator to the collaborator or vice versa. In the example above each collaborator has it's own `train_dataloader` and `test_dataloader` that are only available when that collaborator is performing it's tasks via `self.train_loader` and `self.test_loader`. Once those collaborators transition to a task at the aggregator, this private information is filtered out and the remaining collaborator state can safely be sent back to the aggregator. + +These *private attributes* need to be set in form of a dictionary(user defined), where the key is the name of the attribute and the value is the object. In this example :code:`collaborator.private_attributes` sets the collaborator *private attributes* :code:`train_loader` and :code:`test_loader` that are accessed by collaborator steps (:code:`aggregated_model_validation`, :code:`train` and :code:`local_model_validation`). +While setting *private attributes* directly through a dictionary is the preferred method, this requires an object to be initialized before the flow begins execution. +In rare cases this can be a problem because certain python objects cannot be serialized. To compensate for these cases, users can delay the *private attributes* object initialization via the use of a callback: + +.. code-block:: python + # Aggregator aggregator_ = Aggregator() @@ -181,25 +211,25 @@ A :code:`Runtime` defines where the flow will be executed, who the participants local_runtime = LocalRuntime(aggregator=aggregator_, collaborators=collaborators) -Let's break this down, starting with the :code:`Aggregator` and :code:`Collaborator` components. These components represent the *Participants* in a Federated Learning experiment. Each participant has its own set of *private attributes* that represent the information / data specific to its role or requirements. As the name suggests these *private attributes* are accessible only to the particular participant, and are appropriately inserted into or filtered out of current Flow state when transferring from between Participants. For e.g. Collaborator private attributes are inserted into :code:`flow` when transitioning from Aggregator to Collaborator and are filtered out when transitioning from Collaborator to Aggregator. -In the above :code:`FederatedFlow`, each collaborator accesses train and test datasets via *private attributes* :code:`train_loader` and :code:`test_loader`. These *private attributes* need to be set using a (user defined) callback function while instantiating the participant. Participant *private attributes* are returned by the callback function in form of a dictionary, where the key is the name of the attribute and the value is the object. +Participant *private attributes* are returned by the callback function in form of a dictionary, where the key is the name of the attribute and the value is the object. In this example callback function :code:`callable_to_initialize_collaborator_private_attributes()` returns :code:`train_loader` and :code:`test_loader` in the form of a dictionary. -In this example callback function :code:`callable_to_initialize_collaborator_private_attributes()` returns the collaborator private attributes :code:`train_loader` and :code:`test_loader` that are accessed by collaborator steps (:code:`aggregated_model_validation`, :code:`train` and :code:`local_model_validation`). Some important points to remember while creating callback function and private attributes are: +**Note:**If both callable and private attributes are provided, the initialization will prioritize the private attributes through the :code:`callable` function. - - Callback Function needs to be defined by the user and should return the *private attributes* required by the participant in form of a key/value pair - - In above example multiple collaborators have the same callback function. Depending on the Federated Learning requirements, user can specify unique callback functions for each Participant - - If no Callback Function is specified then the Participant shall not have any *private attributes* - - Callback function can be provided with any parameters required as arguments. In this example, parameters essential for the callback function are supplied with corresponding values bearing *same names* during the instantiation of the Collaborator +Some important points to remember while creating callback function and private attributes are: + + - Callback Function needs to be defined by the user and should return the *private attributes* required by the participant in form of a key/value pair + - Callback function can be provided with any parameters required as arguments. In this example, parameters essential for the callback function are supplied with corresponding values bearing *same names* during the instantiation of the Collaborator * :code:`index`: Index of the particular collaborator needed to shard the dataset * :code:`n_collaborators`: Total number of collaborators in which the dataset is sharded * :code:`batch_size`: For the train and test loaders * :code:`train_dataset`: Train Dataset to be sharded between n_collaborators * :code:`test_dataset`: Test Dataset to be sharded between n_collaborators - - - Callback function needs to be specified by user while instantiating the participant. Callback function is invoked by the OpenFL runtime at the time participant is created and once created these attributes cannot be modified - - Private attributes are accessible only in the Participant steps + - Callback function needs to be specified by user while instantiating the participant. Callback function is invoked by the OpenFL runtime at the time participant is created and once created these attributes cannot be modified + - If no Callback Function or private attributes is specified then the Participant shall not have any *private attributes* + - In above example multiple collaborators have the same callback function or private attributes. Depending on the Federated Learning requirements, user can specify unique callback function or private attributes for each Participant + - *Private attributes* needs to be set after instantiating the participant. Now let's see how the runtime for a flow is assigned, and the flow gets run: @@ -378,4 +408,4 @@ Our goal is to make it a one line change to configure where and how a flow is ex # A future example of how the same flow could be run on distributed infrastructure federated_runtime = FederatedRuntime(...) flow.runtime = federated_runtime - flow.run() \ No newline at end of file + flow.run() diff --git a/openfl-tutorials/experimental/Workflow_Interface_1001_Workspace_Creation_from_JupyterNotebook.ipynb b/openfl-tutorials/experimental/Workflow_Interface_1001_Workspace_Creation_from_JupyterNotebook.ipynb index dd3926f52e..09ebd7351c 100644 --- a/openfl-tutorials/experimental/Workflow_Interface_1001_Workspace_Creation_from_JupyterNotebook.ipynb +++ b/openfl-tutorials/experimental/Workflow_Interface_1001_Workspace_Creation_from_JupyterNotebook.ipynb @@ -738,69 +738,151 @@ "watermark_retrain_learning_rate = 5e-3" ] }, + { + "cell_type": "markdown", + "id": "5e4ac3ad", + "metadata": {}, + "source": [ + "You'll notice in the `FederatedFlow_MNIST_Watermarking` definition above that there were certain attributes that the flow was not initialized with, namely the `train_loader` and `test_loader` for each of the collaborators. These are **private_attributes** of the particular participant and (as the name suggests) are accessible ONLY to the particular participant's through its task. Additionally these private attributes are always filtered out of the current state when transferring from collaborator to aggregator, and vice versa.\n", + "\n", + "Users can directly specify a collaborator's private attributes via `collaborator.private_attributes` which is a dictionary where key is name of the attribute and value is the object that is made accessible to collaborator.\n", + "\n", + "For more detailed information on specifying these private attributes, please refer to the first quick start [notebook](https://github.com/intel/openfl/blob/develop/openfl-tutorials/experimental/Workflow_Interface_101_MNIST.ipynb)" + ] + }, { "cell_type": "code", "execution_count": null, - "id": "c5f6e104", + "id": "e5f10d5d", "metadata": {}, "outputs": [], "source": [ "#| export\n", "\n", - "def callable_to_initialize_aggregator_private_attributes(watermark_data, batch_size):\n", - " return {\n", - " \"watermark_data_loader\": torch.utils.data.DataLoader(\n", - " watermark_data, batch_size=batch_size, shuffle=True\n", + "# Setup Aggregator with private attributes\n", + "aggregator = Aggregator()\n", + "aggregator.private_attributes = {\n", + " \"watermark_data_loader\": torch.utils.data.DataLoader(\n", + " watermark_data, batch_size=batch_size_watermark, shuffle=True\n", + " ),\n", + " \"pretrain_epochs\": 25,\n", + " \"retrain_epochs\": 25,\n", + " \"watermark_acc_threshold\": 0.98,\n", + " \"watermark_pretraining_completed\": False,\n", + "}\n", + "\n", + "# Setup Collaborators with private attributes\n", + "collaborator_names = ['Portland', 'Seattle', 'Chandler','Bangalore']\n", + "print(f\"Creating collaborators {collaborator_names}\")\n", + "collaborators = [Collaborator(name=name) for name in collaborator_names]\n", + "\n", + "for idx, collaborator in enumerate(collaborators):\n", + " local_train = deepcopy(mnist_train)\n", + " local_test = deepcopy(mnist_test)\n", + " local_train.data = mnist_train.data[idx :: len(collaborators)]\n", + " local_train.targets = mnist_train.targets[idx :: len(collaborators)]\n", + " local_test.data = mnist_test.data[idx :: len(collaborators)]\n", + " local_test.targets = mnist_test.targets[idx :: len(collaborators)]\n", + " collaborator.private_attributes = {\n", + " \"train_loader\": torch.utils.data.DataLoader(\n", + " local_train, batch_size=batch_size_train, shuffle=True\n", + " ),\n", + " \"test_loader\": torch.utils.data.DataLoader(\n", + " local_test, batch_size=batch_size_train, shuffle=True\n", " ),\n", - " \"pretrain_epochs\": 25,\n", - " \"retrain_epochs\": 25,\n", - " \"watermark_acc_threshold\": 0.98,\n", " }\n", "\n", - "# Setup Aggregator private attributes via callable function\n", - "aggregator = Aggregator(\n", - " name=\"agg\",\n", - " private_attributes_callable=callable_to_initialize_aggregator_private_attributes,\n", - " watermark_data=watermark_data,\n", - " batch_size=batch_size_watermark,\n", - " )\n", + "local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators)\n", + "print(f\"Local runtime collaborators = {local_runtime.collaborators}\")" + ] + }, + { + "cell_type": "markdown", + "id": "5177f137", + "metadata": {}, + "source": [ + "### Alternate method to specify private attributes\n", "\n", - "collaborator_names = [\n", - " \"Portland\",\n", - " \"Seattle\",\n", - " \"Chandler\",\n", - " \"Bangalore\",\n", - " \"New Delhi\",\n", - "]\n", - "n_collaborators = len(collaborator_names)\n", - "\n", - "def callable_to_initialize_collaborator_private_attributes(index, n_collaborators, batch_size, train_dataset, test_dataset):\n", - " train = deepcopy(train_dataset)\n", - " test = deepcopy(test_dataset)\n", - " train.data = train_dataset.data[index::n_collaborators]\n", - " train.targets = train_dataset.targets[index::n_collaborators]\n", - " test.data = test_dataset.data[index::n_collaborators]\n", - " test.targets = test_dataset.targets[index::n_collaborators]\n", - "\n", - " return {\n", - " \"train_loader\": torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True),\n", - " \"test_loader\": torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=True),\n", - " }\n", + "There is another way users can initialize private attributes in which private attributes need to be set using a (user defined) callback function while instantiating the participant. The callback function returns the private attributes (`train_loader` & `test_loader`) in form of a dictionary where the key is the attribute name, and the value is the object that will be made accessible to that participant's task.\n", "\n", - "# Setup Collaborators private attributes via callable function\n", - "collaborators = []\n", - "for idx, collaborator_name in enumerate(collaborator_names):\n", - " collaborators.append(\n", - " Collaborator(\n", - " name=collaborator_name, num_cpus=0, num_gpus=0,\n", - " private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n", - " index=idx, n_collaborators=n_collaborators,\n", - " train_dataset=mnist_train, test_dataset=mnist_test, batch_size=64\n", - " )\n", - " )\n", + "In the following example callback function, `callable_to_initialize_collaborator_private_attributes`, returns the private attributes `train_loader` and `test_loader` of the collaborator.\n", "\n", - "local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend=\"ray\")\n", - "print(f\"Local runtime collaborators = {local_runtime.collaborators}\")" + "Detailed information on specifying private attributes using callback function is provided in this [documentation](https://github.com/intel/openfl/blob/develop/docs/about/features_index/workflowinterface.rst). \n", + "\n", + ">To use this method, uncomment the following cell and comment out the previous cell which initializes private attributes directly." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c5f6e104", + "metadata": {}, + "outputs": [], + "source": [ + "# #| export\n", + "\n", + "# def callable_to_initialize_aggregator_private_attributes(watermark_data, batch_size):\n", + "# return {\n", + "# \"watermark_data_loader\": torch.utils.data.DataLoader(\n", + "# watermark_data, batch_size=batch_size, shuffle=True\n", + "# ),\n", + "# \"pretrain_epochs\": 25,\n", + "# \"retrain_epochs\": 25,\n", + "# \"watermark_acc_threshold\": 0.98,\n", + "# }\n", + "\n", + "# # Setup Aggregator private attributes via callable function\n", + "# aggregator = Aggregator(\n", + "# name=\"agg\",\n", + "# private_attributes_callable=callable_to_initialize_aggregator_private_attributes,\n", + "# watermark_data=watermark_data,\n", + "# batch_size=batch_size_watermark,\n", + "# )\n", + "\n", + "# collaborator_names = [\n", + "# \"Portland\",\n", + "# \"Seattle\",\n", + "# \"Chandler\",\n", + "# \"Bangalore\",\n", + "# \"New Delhi\",\n", + "# ]\n", + "# n_collaborators = len(collaborator_names)\n", + "\n", + "# def callable_to_initialize_collaborator_private_attributes(index, n_collaborators, batch_size, train_dataset, test_dataset):\n", + "# train = deepcopy(train_dataset)\n", + "# test = deepcopy(test_dataset)\n", + "# train.data = train_dataset.data[index::n_collaborators]\n", + "# train.targets = train_dataset.targets[index::n_collaborators]\n", + "# test.data = test_dataset.data[index::n_collaborators]\n", + "# test.targets = test_dataset.targets[index::n_collaborators]\n", + "\n", + "# return {\n", + "# \"train_loader\": torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True),\n", + "# \"test_loader\": torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=True),\n", + "# }\n", + "\n", + "# # Setup Collaborators private attributes via callable function\n", + "# collaborators = []\n", + "# for idx, collaborator_name in enumerate(collaborator_names):\n", + "# collaborators.append(\n", + "# Collaborator(\n", + "# name=collaborator_name, num_cpus=0, num_gpus=0,\n", + "# private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n", + "# index=idx, n_collaborators=n_collaborators,\n", + "# train_dataset=mnist_train, test_dataset=mnist_test, batch_size=64\n", + "# )\n", + "# )\n", + "\n", + "# local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend=\"ray\")\n", + "# print(f\"Local runtime collaborators = {local_runtime.collaborators}\")" + ] + }, + { + "cell_type": "markdown", + "id": "2a4ec6cc", + "metadata": {}, + "source": [ + ">NOTE: If both methods for specifying private attributes are used, the private attributes will only be set by the latter method. Additionally, the code for both methods will be included in your generated workspace." ] }, { @@ -892,7 +974,7 @@ "source": [ "## Workspace Usage\n", "\n", - "The workspace crated above can be used by the Aggregator based workflow by using the `fx` commands in the following manner" + "The workspace created above can be used by the Aggregator based workflow by using the `fx` commands in the following manner" ] }, { @@ -901,91 +983,91 @@ "id": "ff55808c-c340-476b-a543-58d43451c54e", "metadata": {}, "source": [ - "**Workspace Activation and Creation**\r\n", - "1. Activate the experimental aggregator-based workflow:\r\n", - "\r\n", - " `fx experimental activate`\r\n", - "\r\n", - " This will create an 'experimental' directory under ~/.openfl/\r\n", - "3. Create a workspace using the custom template:\r\n", - "\r\n", - " `fx workspace create --prefix workspace_path --custom_template /home/$USER/generated-workspace`\r\n", - "4. Change to the workspace directory:\r\n", - "\r\n", - " `cd workspace_path`\r\n", - "\r\n", - "**Workspace Initialization and Certification**\r\n", - "1. Initialize the FL plan and auto-populate the fully qualified domain name (FQDN) of the aggregator node:\r\n", - "\r\n", - " `fx plan initialize`\r\n", - "2. Certify the workspace:\r\n", - "\r\n", - " `fx workspace certify`\r\n", - " \r\n", - "**Aggregator Setup and Workspace Export**\r\n", - "1. Run the aggregator certificate creation command:\r\n", - "\r\n", - " `fx aggregator generate-cert-request`\r\n", - "\r\n", - " `fx aggregator certify`\r\n", - "2. Export the workspace for collaboration:\r\n", - "\r\n", - " `fx workspace export`\r\n", - " \r\n", - "**Collaborator Node Setup**\r\n", - "\r\n", - "***On the Collaborator Node:***\r\n", - "\r\n", - "1. Copy the workspace archive from the aggregator node to the collaborator nodes. Import the workspace archive:\r\n", - "\r\n", - " `fx workspace import --archive WORKSPACE.zip`\r\n", - " \r\n", - " `cd workspace_path`\r\n", - "3. Generate a collaborator certificate request:\r\n", - "\r\n", - " `fx collaborator generate-cert-request -n {COL_LABEL}`\r\n", - "\r\n", - "***On the Aggregator Node (Certificate Authority):***\r\n", - "\r\n", - "3. Sign the Collaborator Certificate Signing Request (CSR) Package from collaborator nodes:\r\n", - "\r\n", - " `fx collaborator certify --request-pkg /PATH/TO/col_{COL_LABEL}_to_agg_cert_request.zip`\r\n", - "\r\n", - "***On the Collaborator Node:***\r\n", - "\r\n", - "4. Import the signed certificate and certificate chain into the workspace:\r\n", - "\r\n", - " `fx collaborator certify --import /PATH/TO/agg_to_col_{COL_LABEL}_signed_cert.zip`\r\n", - " \r\n", - "**Final Workspace Activation**\r\n", - "***On the Aggregator Node:***\r\n", - "\r\n", - "1. Start the Aggregator:\r\n", - "\r\n", - " `fx aggregator start`\r\n", - " \r\n", - " The Aggregator is now running and waiting for Collaborators to connect.\r\n", - "\r\n", - "***On the Collaborator Nodes:***\r\n", - "\r\n", - "2. Run the Collaborator:\r\n", - "\r\n", - " `fx collaborator start -n {COL_LABEL}`\r\n", - "\r\n", - "**Workspace Deactivation**\r\n", - "1. To deactivate the experimental aggregator-based workflow and switch back to original aggregator-based workflow:\r\n", - "\r\n", - " `fx experimental deactivate`\r\n", - "\r\n", - " This will remove the 'experimental' directory under ~/.openfl/\r\n" + "**Workspace Activation and Creation**\n", + "1. Activate the experimental aggregator-based workflow:\n", + "\n", + " `fx experimental activate`\n", + "\n", + " This will create an 'experimental' directory under ~/.openfl/\n", + "3. Create a workspace using the custom template:\n", + "\n", + " `fx workspace create --prefix workspace_path --custom_template /home/$USER/generated-workspace`\n", + "4. Change to the workspace directory:\n", + "\n", + " `cd workspace_path`\n", + "\n", + "**Workspace Initialization and Certification**\n", + "1. Initialize the FL plan and auto-populate the fully qualified domain name (FQDN) of the aggregator node:\n", + "\n", + " `fx plan initialize`\n", + "2. Certify the workspace:\n", + "\n", + " `fx workspace certify`\n", + " \n", + "**Aggregator Setup and Workspace Export**\n", + "1. Run the aggregator certificate creation command:\n", + "\n", + " `fx aggregator generate-cert-request`\n", + "\n", + " `fx aggregator certify`\n", + "2. Export the workspace for collaboration:\n", + "\n", + " `fx workspace export`\n", + " \n", + "**Collaborator Node Setup**\n", + "\n", + "***On the Collaborator Node:***\n", + "\n", + "1. Copy the workspace archive from the aggregator node to the collaborator nodes. Import the workspace archive:\n", + "\n", + " `fx workspace import --archive WORKSPACE.zip`\n", + " \n", + " `cd workspace_path`\n", + "3. Generate a collaborator certificate request:\n", + "\n", + " `fx collaborator generate-cert-request -n {COL_LABEL}`\n", + "\n", + "***On the Aggregator Node (Certificate Authority):***\n", + "\n", + "3. Sign the Collaborator Certificate Signing Request (CSR) Package from collaborator nodes:\n", + "\n", + " `fx collaborator certify --request-pkg /PATH/TO/col_{COL_LABEL}_to_agg_cert_request.zip`\n", + "\n", + "***On the Collaborator Node:***\n", + "\n", + "4. Import the signed certificate and certificate chain into the workspace:\n", + "\n", + " `fx collaborator certify --import /PATH/TO/agg_to_col_{COL_LABEL}_signed_cert.zip`\n", + " \n", + "**Final Workspace Activation**\n", + "***On the Aggregator Node:***\n", + "\n", + "1. Start the Aggregator:\n", + "\n", + " `fx aggregator start`\n", + " \n", + " The Aggregator is now running and waiting for Collaborators to connect.\n", + "\n", + "***On the Collaborator Nodes:***\n", + "\n", + "2. Run the Collaborator:\n", + "\n", + " `fx collaborator start -n {COL_LABEL}`\n", + "\n", + "**Workspace Deactivation**\n", + "1. To deactivate the experimental aggregator-based workflow and switch back to original aggregator-based workflow:\n", + "\n", + " `fx experimental deactivate`\n", + "\n", + " This will remove the 'experimental' directory under ~/.openfl/\n" ] } ], "metadata": { "kernelspec": { - "display_name": "v_o", + "display_name": "openfl-wip", "language": "python", - "name": "v_o" + "name": "python3" }, "language_info": { "codemirror_mode": { @@ -997,7 +1079,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.10" + "version": "3.8.19" } }, "nbformat": 4, diff --git a/openfl-tutorials/experimental/Workflow_Interface_101_MNIST.ipynb b/openfl-tutorials/experimental/Workflow_Interface_101_MNIST.ipynb index 4c61b6427e..3c246f0289 100644 --- a/openfl-tutorials/experimental/Workflow_Interface_101_MNIST.ipynb +++ b/openfl-tutorials/experimental/Workflow_Interface_101_MNIST.ipynb @@ -322,9 +322,13 @@ "id": "2aabf61e", "metadata": {}, "source": [ - "Note that the private attributes are flexible, and you can choose to pass in a completely different type of object to any of the collaborators or aggregator (with an arbitrary name). These private attributes will always be filtered out of the current state when transferring from collaborator to aggregator, or vice versa. \n", - "\n", - "Private attributes can be set using callback function while instantiating the participant. Parameters required by the callback function are specified as arguments while instantiating the participant. In this example callback function, `callable_to_initialize_collaborator_private_attributes`, returns the private attributes `train_loader` and `test_loader` of the collaborator. Parameters required by the callback function `index`, `n_collaborators`, `batch_size`, `train_dataset`, `test_dataset` are passed appropriate values with the same names in the Collaborator constructor." + "You'll notice in the `FederatedFlow` definition above that there were certain attributes that the flow was not initialized with, namely the `train_loader` and `test_loader` for each of the collaborators. These are **private_attributes** of the particular participant and (as the name suggests) are accessible ONLY to the particular participant's through its task. Additionally these private attributes are always filtered out of the current state when transferring from collaborator to aggregator, and vice versa.\n", + " \n", + "Users can directly specify a collaborator's private attributes via `collaborator.private_attributes` which is a dictionary where key is name of the attribute and value is the object that is made accessible to collaborator. In this example, we segment shards of the MNIST dataset for four collaborators: `Portland`, `Seattle`, `Chandler` and `Bangalore`. Each shard / slice of the dataset is assigned to collaborator's private_attribute.\n", + " \n", + "Note that the private attributes are flexible, and user can choose to pass in a completely different type of object to any of the collaborators or aggregator (with an arbitrary name).\n", + " \n", + "Subsequent tutorials shall show examples to assign private_attributes for aggregator and another methodology of specifying private attributes via a callable." ] }, { @@ -334,41 +338,26 @@ "metadata": {}, "outputs": [], "source": [ - "# Aggregator\n", - "aggregator_ = Aggregator()\n", - "\n", - "collaborator_names = [\"Portland\", \"Seattle\", \"Chandler\", \"Bangalore\"]\n", + "# Setup participants\n", + "aggregator = Aggregator()\n", + "aggregator.private_attributes = {}\n", "\n", - "def callable_to_initialize_collaborator_private_attributes(index, n_collaborators, batch_size, train_dataset, test_dataset):\n", - " train = deepcopy(train_dataset)\n", - " test = deepcopy(test_dataset)\n", - " train.data = train_dataset.data[index::n_collaborators]\n", - " train.targets = train_dataset.targets[index::n_collaborators]\n", - " test.data = test_dataset.data[index::n_collaborators]\n", - " test.targets = test_dataset.targets[index::n_collaborators]\n", - "\n", - " return {\n", - " \"train_loader\": torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True),\n", - " \"test_loader\": torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=True),\n", + "# Setup collaborators with private attributes\n", + "collaborator_names = ['Portland', 'Seattle', 'Chandler','Bangalore']\n", + "collaborators = [Collaborator(name=name) for name in collaborator_names]\n", + "for idx, collaborator in enumerate(collaborators):\n", + " local_train = deepcopy(mnist_train)\n", + " local_test = deepcopy(mnist_test)\n", + " local_train.data = mnist_train.data[idx::len(collaborators)]\n", + " local_train.targets = mnist_train.targets[idx::len(collaborators)]\n", + " local_test.data = mnist_test.data[idx::len(collaborators)]\n", + " local_test.targets = mnist_test.targets[idx::len(collaborators)]\n", + " collaborator.private_attributes = {\n", + " 'train_loader': torch.utils.data.DataLoader(local_train,batch_size=batch_size_train, shuffle=True),\n", + " 'test_loader': torch.utils.data.DataLoader(local_test,batch_size=batch_size_train, shuffle=True)\n", " }\n", "\n", - "# Setup collaborators private attributes via callable function\n", - "collaborators = []\n", - "for idx, collaborator_name in enumerate(collaborator_names):\n", - " collaborators.append(\n", - " Collaborator(\n", - " name=collaborator_name,\n", - " private_attributes_callable=callable_to_initialize_collaborator_private_attributes,\n", - " index=idx, \n", - " n_collaborators=len(collaborator_names),\n", - " train_dataset=mnist_train, \n", - " test_dataset=mnist_test, \n", - " batch_size=64\n", - " )\n", - " )\n", - "\n", - "local_runtime = LocalRuntime(aggregator=aggregator_, collaborators=collaborators,\n", - " backend=\"ray\")\n", + "local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='single_process')\n", "print(f'Local runtime collaborators = {local_runtime.collaborators}')" ] }, @@ -391,7 +380,7 @@ "model = None\n", "best_model = None\n", "optimizer = None\n", - "flflow = FederatedFlow(model, optimizer, checkpoint=True)\n", + "flflow = FederatedFlow(model, optimizer, rounds=2, checkpoint=True)\n", "flflow.runtime = local_runtime\n", "flflow.run()" ] @@ -435,7 +424,7 @@ "metadata": {}, "outputs": [], "source": [ - "flflow2 = FederatedFlow(model=flflow.model, optimizer=flflow.optimizer, checkpoint=True)\n", + "flflow2 = FederatedFlow(model=flflow.model, optimizer=flflow.optimizer, rounds=2, checkpoint=True)\n", "flflow2.runtime = local_runtime\n", "flflow2.run()" ] diff --git a/openfl-tutorials/experimental/Workflow_Interface_104_Keras_MNIST_with_GPU.ipynb b/openfl-tutorials/experimental/Workflow_Interface_104_Keras_MNIST_with_GPU.ipynb index 0845647d67..a677e8b8c8 100644 --- a/openfl-tutorials/experimental/Workflow_Interface_104_Keras_MNIST_with_GPU.ipynb +++ b/openfl-tutorials/experimental/Workflow_Interface_104_Keras_MNIST_with_GPU.ipynb @@ -37,7 +37,7 @@ "source": [ "!pip install git+https://github.com/intel/openfl.git\n", "!pip install -r requirements_workflow_interface.txt\n", - "!pip install tensorflow==2.7.0\n", + "!pip install tensorflow\n", "\n", "# Uncomment this if running in Google Colab\n", "# !pip install -r https://raw.githubusercontent.com/intel/openfl/develop/openfl-tutorials/experimental/requirements_workflow_interface.txt\n", @@ -349,7 +349,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.18" + "version": "3.8.19" }, "orig_nbformat": 4 }, diff --git a/openfl-tutorials/experimental/Workflow_Interface_401_MNIST_Aggregator_Validation_Ray_Watermarking.ipynb b/openfl-tutorials/experimental/Workflow_Interface_401_MNIST_Aggregator_Validation_Ray_Watermarking.ipynb index f8f782576f..8fdcb0e435 100644 --- a/openfl-tutorials/experimental/Workflow_Interface_401_MNIST_Aggregator_Validation_Ray_Watermarking.ipynb +++ b/openfl-tutorials/experimental/Workflow_Interface_401_MNIST_Aggregator_Validation_Ray_Watermarking.ipynb @@ -426,7 +426,9 @@ " state_dicts = [model.state_dict() for model in models]\n", " state_dict = new_model.state_dict()\n", " for key in models[1].state_dict():\n", - " state_dict[key] = np.average([state[key] for state in state_dicts],axis=0,weights=weights)\n", + " state_dict[key] = torch.from_numpy(np.average([state[key].numpy() for state in state_dicts],\n", + " axis=0, \n", + " weights=weights))\n", " new_model.load_state_dict(state_dict)\n", " return new_model" ] @@ -558,7 +560,7 @@ " exclude=[\"watermark_pretrain_optimizer\", \"watermark_retrain_optimizer\"],\n", " )\n", "\n", - " @collaborator(num_gpus=1)\n", + " @collaborator\n", " def aggregated_model_validation(self):\n", " \"\"\"\n", " Perform Aggregated Model validation on Collaborators.\n", @@ -570,7 +572,7 @@ "\n", " self.next(self.train)\n", "\n", - " @collaborator(num_gpus=1)\n", + " @collaborator\n", " def train(self):\n", " \"\"\"\n", " Train model on Local collab dataset.\n", @@ -594,7 +596,7 @@ " self.next(self.local_model_validation)\n", "\n", "\n", - " @collaborator(num_gpus=1)\n", + " @collaborator\n", " def local_model_validation(self):\n", " \"\"\"\n", " Validate locally trained model.\n", @@ -765,7 +767,7 @@ "outputs": [], "source": [ "# Setup Aggregator with private attributes\n", - "aggregator = Aggregator()\n", + "aggregator = Aggregator(num_gpus=0.0)\n", "\n", "# Setup Collaborators with private attributes\n", "collaborator_names = [\n", @@ -776,7 +778,7 @@ " \"New Delhi\",\n", "]\n", "print(f\"Creating collaborators {collaborator_names}\")\n", - "collaborators = [Collaborator(name=name) for name in collaborator_names]\n", + "collaborators = [Collaborator(name=name, num_gpus=0.0) for name in collaborator_names]\n", "\n", "aggregator_test = deepcopy(mnist_test)\n", "aggregator_test.targets = mnist_test.targets[len(collaborators)::len(collaborators)+1]\n", @@ -900,7 +902,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.6" + "version": "3.8.19" }, "vscode": { "interpreter": { diff --git a/openfl-workspace/experimental/101_torch_cnn_mnist/plan/data.yaml b/openfl-workspace/experimental/101_torch_cnn_mnist/plan/data.yaml index 0950198725..b4d312447d 100644 --- a/openfl-workspace/experimental/101_torch_cnn_mnist/plan/data.yaml +++ b/openfl-workspace/experimental/101_torch_cnn_mnist/plan/data.yaml @@ -7,21 +7,7 @@ # collaborator_name ,data_directory_path col1: - callable_func: - settings: - batch_size: 32 - index: 0 - n_collaborators: 2 - train_dataset: src.collaborator_private_attrs.train_dataset - test_dataset: src.collaborator_private_attrs.test_dataset - template: src.collaborator_private_attrs.collaborator_private_attrs + private_attributes: src.collaborator_private_attrs.col1_private_attributes col2: - callable_func: - settings: - batch_size: 32 - index: 1 - n_collaborators: 2 - train_dataset: src.collaborator_private_attrs.train_dataset - test_dataset: src.collaborator_private_attrs.test_dataset - template: src.collaborator_private_attrs.collaborator_private_attrs + private_attributes: src.collaborator_private_attrs.col2_private_attributes diff --git a/openfl-workspace/experimental/101_torch_cnn_mnist/src/collaborator_private_attrs.py b/openfl-workspace/experimental/101_torch_cnn_mnist/src/collaborator_private_attrs.py index 097c81634b..2be4a4b820 100644 --- a/openfl-workspace/experimental/101_torch_cnn_mnist/src/collaborator_private_attrs.py +++ b/openfl-workspace/experimental/101_torch_cnn_mnist/src/collaborator_private_attrs.py @@ -31,25 +31,37 @@ ), ) -train_dataset = mnist_train -test_dataset = mnist_test - - -def collaborator_private_attrs( - index, n_collaborators, batch_size, train_dataset, test_dataset -): - train = deepcopy(train_dataset) - test = deepcopy(test_dataset) - train.data = train_dataset.data[index::n_collaborators] - train.targets = train_dataset.targets[index::n_collaborators] - test.data = test_dataset.data[index::n_collaborators] - test.targets = test_dataset.targets[index::n_collaborators] - - return { - "train_loader": torch.utils.data.DataLoader( - train, batch_size=batch_size, shuffle=True - ), - "test_loader": torch.utils.data.DataLoader( - test, batch_size=batch_size, shuffle=True - ), - } + +n_collaborators = 2 +batch_size = 32 + +train = deepcopy(mnist_train) +test = deepcopy(mnist_test) + +train.data = mnist_train.data[0::n_collaborators] +train.targets = mnist_train.targets[0::n_collaborators] +test.data = mnist_test.data[0::n_collaborators] +test.targets = mnist_test.targets[0::n_collaborators] + +col1_private_attributes = { + "train_loader": torch.utils.data.DataLoader( + train, batch_size=batch_size, shuffle=True + ), + "test_loader": torch.utils.data.DataLoader( + test, batch_size=batch_size, shuffle=True + ), +} + +train.data = mnist_train.data[1::n_collaborators] +train.targets = mnist_train.targets[1::n_collaborators] +test.data = mnist_test.data[1::n_collaborators] +test.targets = mnist_test.targets[1::n_collaborators] + +col2_private_attributes = { + "train_loader": torch.utils.data.DataLoader( + train, batch_size=batch_size, shuffle=True + ), + "test_loader": torch.utils.data.DataLoader( + test, batch_size=batch_size, shuffle=True + ), +} diff --git a/openfl-workspace/experimental/104_keras_mnist/requirements.txt b/openfl-workspace/experimental/104_keras_mnist/requirements.txt index 8e961eac43..0f57144081 100644 --- a/openfl-workspace/experimental/104_keras_mnist/requirements.txt +++ b/openfl-workspace/experimental/104_keras_mnist/requirements.txt @@ -1 +1 @@ -tensorflow==2.11.1 +tensorflow diff --git a/openfl/experimental/component/aggregator/__init__.py b/openfl/experimental/component/aggregator/__init__.py index 6686ce37b8..c2af4cc2ac 100644 --- a/openfl/experimental/component/aggregator/__init__.py +++ b/openfl/experimental/component/aggregator/__init__.py @@ -3,4 +3,4 @@ """openfl.experimental.component.aggregator package.""" # FIXME: Too much recursion. -from openfl.experimental.component.aggregator import Aggregator +from openfl.experimental.component.aggregator.aggregator import Aggregator diff --git a/openfl/experimental/component/aggregator/aggregator.py b/openfl/experimental/component/aggregator/aggregator.py index 116748903b..af44cdd6d1 100644 --- a/openfl/experimental/component/aggregator/aggregator.py +++ b/openfl/experimental/component/aggregator/aggregator.py @@ -43,6 +43,7 @@ def __init__( checkpoint: bool = False, private_attributes_callable: Callable = None, private_attributes_kwargs: Dict = {}, + private_attributes: Dict = {}, single_col_cert_common_name: str = None, log_metric_callback: Callable = None, **kwargs, @@ -93,7 +94,7 @@ def __init__( self.flow.runtime.collaborators = self.authorized_cols self.__private_attrs_callable = private_attributes_callable - self.__private_attrs = {} + self.__private_attrs = private_attributes self.connected_collaborators = [] self.tasks_sent_to_collaborators = 0 self.collaborator_results_received = [] diff --git a/openfl/experimental/component/collaborator/collaborator.py b/openfl/experimental/component/collaborator/collaborator.py index 8dc1b3c8a8..be84ffe2e8 100644 --- a/openfl/experimental/component/collaborator/collaborator.py +++ b/openfl/experimental/component/collaborator/collaborator.py @@ -34,6 +34,7 @@ def __init__( client: Any, private_attributes_callable: Any = None, private_attributes_kwargs: Dict = {}, + private_attributes: Dict = {}, **kwargs, ) -> None: @@ -47,12 +48,12 @@ def __init__( self.__private_attrs_callable = private_attributes_callable - self.__private_attrs = {} + self.__private_attrs = private_attributes if self.__private_attrs_callable is not None: self.logger.info("Initializing collaborator.") self.__initialize_private_attributes(private_attributes_kwargs) - def __initialize_private_attributes(self, kwrags: Dict) -> None: + def __initialize_private_attributes(self, kwargs: Dict) -> None: """ Call private_attrs_callable function set attributes to self.__private_attrs @@ -63,7 +64,7 @@ def __initialize_private_attributes(self, kwrags: Dict) -> None: Returns: None """ - self.__private_attrs = self.__private_attrs_callable(**kwrags) + self.__private_attrs = self.__private_attrs_callable(**kwargs) def __set_attributes_to_clone(self, clone: Any) -> None: """ diff --git a/openfl/experimental/federated/plan/plan.py b/openfl/experimental/federated/plan/plan.py index 4b378ef02e..f67346223b 100644 --- a/openfl/experimental/federated/plan/plan.py +++ b/openfl/experimental/federated/plan/plan.py @@ -275,13 +275,14 @@ def get_aggregator(self): defaults[SETTINGS]["federation_uuid"] = self.federation_uuid defaults[SETTINGS]["authorized_cols"] = self.authorized_cols - private_attrs_callable, private_attrs_kwargs = self.get_private_attr( - "aggregator" + private_attrs_callable, private_attrs_kwargs, private_attributes = ( + self.get_private_attr( + "aggregator" + ) ) - defaults[SETTINGS][ - "private_attributes_callable" - ] = private_attrs_callable + defaults[SETTINGS]["private_attributes_callable"] = private_attrs_callable defaults[SETTINGS]["private_attributes_kwargs"] = private_attrs_kwargs + defaults[SETTINGS]["private_attributes"] = private_attributes defaults[SETTINGS]["flow"] = self.get_flow() checkpoint = self.config.get("federated_flow", False) @@ -323,14 +324,14 @@ def get_collaborator( defaults[SETTINGS]["aggregator_uuid"] = self.aggregator_uuid defaults[SETTINGS]["federation_uuid"] = self.federation_uuid - private_attrs_callable, private_attrs_kwargs = self.get_private_attr( - collaborator_name + private_attrs_callable, private_attrs_kwargs, private_attributes = ( + self.get_private_attr( + collaborator_name + ) ) - - defaults[SETTINGS][ - "private_attributes_callable" - ] = private_attrs_callable + defaults[SETTINGS]["private_attributes_callable"] = private_attrs_callable defaults[SETTINGS]["private_attributes_kwargs"] = private_attrs_kwargs + defaults[SETTINGS]["private_attributes"] = private_attributes if client is not None: defaults[SETTINGS]["client"] = client @@ -456,6 +457,7 @@ def import_nested_settings(settings): def get_private_attr(self, private_attr_name=None): private_attrs_callable = None private_attrs_kwargs = {} + private_attributes = {} import os from pathlib import Path @@ -468,24 +470,45 @@ def get_private_attr(self, private_attr_name=None): d = Plan.load(Path(data_yaml).absolute()) if d.get(private_attr_name, None): - private_attrs_callable = { - "template": d.get(private_attr_name)["callable_func"][ - "template" - ] - } - - private_attrs_kwargs = self.import_kwargs_modules( - d.get(private_attr_name)["callable_func"] - )["settings"] - - if isinstance(private_attrs_callable, dict): - private_attrs_callable = Plan.import_( - **private_attrs_callable + callable_func = d.get(private_attr_name, {}).get( + "callable_func" + ) + private_attributes = d.get(private_attr_name, {}).get( + "private_attributes" + ) + if callable_func and private_attributes: + logger = getLogger(__name__) + logger.warning( + f'Warning: {private_attr_name} private attributes ' + 'will be initialized via callable and ' + 'attributes directly specified ' + 'will be ignored' + ) + + if callable_func is not None: + private_attrs_callable = { + "template": d.get(private_attr_name)["callable_func"][ + "template" + ] + } + + private_attrs_kwargs = self.import_kwargs_modules( + d.get(private_attr_name)["callable_func"] + )["settings"] + + if isinstance(private_attrs_callable, dict): + private_attrs_callable = Plan.import_( + **private_attrs_callable + ) + elif private_attributes: + private_attributes = Plan.import_( + d.get(private_attr_name)["private_attributes"] ) elif not callable(private_attrs_callable): raise TypeError( f"private_attrs_callable should be callable object " f"or be import from code part, get {private_attrs_callable}" ) - return private_attrs_callable, private_attrs_kwargs - return None, None + + return private_attrs_callable, private_attrs_kwargs, private_attributes + return None, None, {} diff --git a/openfl/experimental/interface/cli/workspace.py b/openfl/experimental/interface/cli/workspace.py index 7d21f2566b..2aff2498bb 100644 --- a/openfl/experimental/interface/cli/workspace.py +++ b/openfl/experimental/interface/cli/workspace.py @@ -138,7 +138,7 @@ def create_(prefix, custom_template, template, notebook, template_output_dir): output_workspace=template_output_dir, ) - create(prefix, template_output_dir) + create(prefix, Path(template_output_dir).resolve()) logger.warning( "The user should review the generated workspace for completeness " diff --git a/openfl/experimental/workspace_export/export.py b/openfl/experimental/workspace_export/export.py index 16490324d4..4370cfe21e 100644 --- a/openfl/experimental/workspace_export/export.py +++ b/openfl/experimental/workspace_export/export.py @@ -56,6 +56,11 @@ def __init__(self, notebook_path: str, output_workspace: str) -> None: self.logger.info("Converting jupter notebook to python script...") export_filename = self.__get_exp_name() + if export_filename is None: + raise NameError( + "Please include `#| default_exp ` in " + "the first cell of the notebook." + ) self.script_path = Path( self.__convert_to_python( self.notebook_path, @@ -114,10 +119,12 @@ def __change_runtime(self): with open(self.script_path, "r") as f: data = f.read() - if data.find("backend='ray'") != -1: - data = data.replace("backend='ray'", "backend='single_process'") - elif data.find('backend="ray"') != -1: - data = data.replace('backend="ray"', 'backend="single_process"') + if "backend='ray'" in data or 'backend="ray"' in data: + data = data.replace( + "backend='ray'", "backend='single_process'" + ).replace( + 'backend="ray"', 'backend="single_process"' + ) with open(self.script_path, "w") as f: f.write(data) @@ -140,7 +147,7 @@ def __get_class_arguments(self, class_name): # If class not found if "cls" not in locals(): - raise Exception(f"{class_name} not found.") + raise NameError(f"{class_name} not found.") if inspect.isclass(cls): # Check if the class has an __init__ method @@ -216,8 +223,11 @@ def __extract_class_initializing_args(self, class_name): if value.startswith("[") and "," not in value: value = value.lstrip("[").rstrip("]") try: + # Evaluate the value to convert it from a string + # representation into its corresponding python object. value = ast.literal_eval(value) - except Exception: + except ValueError: + # ValueError is ignored because we want the value as a string pass instantiation_args["kwargs"][kwarg.arg] = value @@ -379,18 +389,19 @@ def generate_data_yaml(self): ) # Find federated_flow._runtime and federated_flow._runtime.collaborators for t in self.available_modules_in_exported_script: + tempstring = t t = getattr(self.exported_script_module, t) if isinstance(t, federated_flow_class): + flow_name = tempstring if not hasattr(t, "_runtime"): - raise Exception( + raise AttributeError( "Unable to locate LocalRuntime instantiation" ) runtime = t._runtime if not hasattr(runtime, "collaborators"): - raise Exception( + raise AttributeError( "LocalRuntime instance does not have collaborators" ) - collaborators_names = runtime.collaborators break data_yaml = self.created_workspace_path.joinpath( @@ -402,7 +413,11 @@ def generate_data_yaml(self): # Find aggregator details aggregator = runtime._aggregator + runtime_name = 'runtime_local' + runtime_created = False private_attrs_callable = aggregator.private_attributes_callable + aggregator_private_attributes = aggregator.private_attributes + if private_attrs_callable is not None: data["aggregator"] = { "callable_func": { @@ -422,27 +437,65 @@ def generate_data_yaml(self): arg = arguments_passed_to_initialize[key] value = f"src.{self.script_name}.{arg}" data["aggregator"]["callable_func"]["settings"][key] = value + elif aggregator_private_attributes: + runtime_created = True + with open(self.script_path, 'a') as f: + f.write(f"\n{runtime_name} = {flow_name}._runtime\n") + f.write( + f"\naggregator_private_attributes = " + f"{runtime_name}._aggregator.private_attributes\n" + ) + data["aggregator"] = { + "private_attributes": f"src.{self.script_name}.aggregator_private_attributes" + } + # Get runtime collaborators + collaborators = runtime._LocalRuntime__collaborators # Find arguments expected by Collaborator arguments_passed_to_initialize = self.__extract_class_initializing_args( "Collaborator" )["kwargs"] - for collab_name in collaborators_names: - if collab_name not in data: + runtime_collab_created = False + for collab in collaborators.values(): + collab_name = collab.get_name() + callable_func = collab.private_attributes_callable + private_attributes = collab.private_attributes + + if callable_func: + if collab_name not in data: + data[collab_name] = { + "callable_func": {"settings": {}, "template": None} + } + # Find collaborator private_attributes callable details + kw_args = runtime.get_collaborator_kwargs(collab_name) + for key, value in kw_args.items(): + if key == "private_attributes_callable": + value = f"src.{self.script_name}.{value}" + data[collab_name]["callable_func"]["template"] = value + elif isinstance(value, (int, str, bool)): + data[collab_name]["callable_func"]["settings"][key] = value + else: + arg = arguments_passed_to_initialize[key] + value = f"src.{self.script_name}.{arg}" + data[collab_name]["callable_func"]["settings"][key] = value + elif private_attributes: + with open(self.script_path, 'a') as f: + if not runtime_created: + f.write(f"\n{runtime_name} = {flow_name}._runtime\n") + runtime_created = True + if not runtime_collab_created: + f.write( + f"\nruntime_collaborators = " + f"{runtime_name}._LocalRuntime__collaborators" + ) + runtime_collab_created = True + f.write( + f"\n{collab_name}_private_attributes = " + f"runtime_collaborators['{collab_name}'].private_attributes" + ) data[collab_name] = { - "callable_func": {"settings": {}, "template": None} + "private_attributes": f"src." + f"{self.script_name}.{collab_name}_private_attributes" } - # Find collaborator details - kw_args = runtime.get_collaborator_kwargs(collab_name) - for key, value in kw_args.items(): - if key == "private_attributes_callable": - value = f"src.{self.script_name}.{value}" - data[collab_name]["callable_func"]["template"] = value - elif isinstance(value, (int, str, bool)): - data[collab_name]["callable_func"]["settings"][key] = value - else: - arg = arguments_passed_to_initialize[key] - value = f"src.{self.script_name}.{arg}" - data[collab_name]["callable_func"]["settings"][key] = value self.__write_yaml(data_yaml, data) diff --git a/tests/github/experimental/testflow_privateattributes_initialization_with_both_options.py b/tests/github/experimental/testflow_privateattributes_initialization_with_both_options.py new file mode 100644 index 0000000000..d14a8863b8 --- /dev/null +++ b/tests/github/experimental/testflow_privateattributes_initialization_with_both_options.py @@ -0,0 +1,258 @@ +# Copyright (C) 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import sys +import numpy as np +from openfl.experimental.interface import FLSpec, Aggregator, Collaborator +from openfl.experimental.runtime import LocalRuntime +from openfl.experimental.placement import aggregator, collaborator + + +class bcolors: # NOQA: N801 + HEADER = "\033[95m" + OKBLUE = "\033[94m" + OKCYAN = "\033[96m" + OKGREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + + +class TestFlowPrivateAttributes(FLSpec): + """ + Testflow to validate Aggregator private attributes are not accessible to collaborators + and vice versa + """ + + error_list = [] + + @aggregator + def start(self): + """ + Flow start. + """ + print( + f"{bcolors.OKBLUE}Testing FederatedFlow - Starting Test for accessibility of private " + + f"attributes {bcolors.ENDC}" + ) + self.collaborators = self.runtime.collaborators + + validate_collab_private_attr(self, "test_loader_via_callable", "start") + + self.exclude_agg_to_agg = 10 + self.include_agg_to_agg = 100 + self.next(self.aggregator_step, exclude=["exclude_agg_to_agg"]) + + @aggregator + def aggregator_step(self): + """ + Testing whether Agg private attributes are accessible in next agg step. + Collab private attributes should not be accessible here + """ + validate_collab_private_attr(self, "test_loader_via_callable", "aggregator_step") + + self.include_agg_to_collab = 42 + self.exclude_agg_to_collab = 40 + self.next( + self.collaborator_step_a, + foreach="collaborators", + exclude=["exclude_agg_to_collab"], + ) + + @collaborator + def collaborator_step_a(self): + """ + Testing whether Collab private attributes are accessible in collab step + Aggregator private attributes should not be accessible here + """ + validate_agg_private_attrs( + self, "train_loader_via_callable", "test_loader_via_callable", "collaborator_step_a" + ) + + self.exclude_collab_to_collab = 2 + self.include_collab_to_collab = 22 + self.next(self.collaborator_step_b, exclude=["exclude_collab_to_collab"]) + + @collaborator + def collaborator_step_b(self): + """ + Testing whether Collab private attributes are accessible in collab step + Aggregator private attributes should not be accessible here + """ + + validate_agg_private_attrs( + self, "train_loader_via_callable", "test_loader_via_callable", "collaborator_step_b" + ) + self.exclude_collab_to_agg = 10 + self.include_collab_to_agg = 12 + self.next(self.join, exclude=["exclude_collab_to_agg"]) + + @aggregator + def join(self, inputs): + """ + Testing whether attributes are excluded from collab to agg + """ + # Aggregator should only be able to access its own attributes + if hasattr(self, "test_loader_via_callable") is False: + TestFlowPrivateAttributes.error_list.append( + "aggregator_join_aggregator_attributes_missing" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in join - aggregator private attributes" + + f" not accessible {bcolors.ENDC}" + ) + + for idx, collab in enumerate(inputs): + if ( + hasattr(collab, "train_loader_via_callable") is True + or hasattr(collab, "test_loader_via_callable") is True + ): + # Error - we are able to access collaborator attributes + TestFlowPrivateAttributes.error_list.append( + "join_collaborator_attributes_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in Join - Collaborator: {collab}" + + f" private attributes accessible {bcolors.ENDC}" + ) + + self.next(self.end) + + @aggregator + def end(self): + """ + This is the 'end' step. All flows must have an 'end' step, which is the + last step in the flow. + + """ + print( + f"{bcolors.OKBLUE}Testing FederatedFlow - Ending Test for accessibility of private " + + f"attributes {bcolors.ENDC}" + ) + + if TestFlowPrivateAttributes.error_list: + raise ( + AssertionError( + f"{bcolors.FAIL}\n ...Test case failed ... {bcolors.ENDC}" + ) + ) + else: + print(f"{bcolors.OKGREEN}\n ...Test case passed ... {bcolors.ENDC}") + + TestFlowPrivateAttributes.error_list = [] + + +def validate_collab_private_attr(self, private_attr, step_name): + # Aggregator should only be able to access its own attributes + if hasattr(self, private_attr) is False: + TestFlowPrivateAttributes.error_list.append( + step_name + "_aggregator_attributes_missing" + ) + print( + f"{bcolors.FAIL} ...Failed in {step_name} - aggregator private attributes not " + + f"accessible {bcolors.ENDC}" + ) + + for idx, collab in enumerate(self.collaborators): + # Collaborator private attributes should not be accessible + if ( + type(self.collaborators[idx]) is not str + or hasattr(self.runtime, "_collaborators") is True + or hasattr(self.runtime, "__collaborators") is True + ): + # Error - we are able to access collaborator attributes + TestFlowPrivateAttributes.error_list.append( + step_name + "_collaborator_attributes_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - collaborator {collab} " + + f"private attributes accessible {bcolors.ENDC}" + ) + + +def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name): + # Collaborator should only be able to access its own attributes + if hasattr(self, private_attr_1) is False or hasattr(self, private_attr_2) is False: + TestFlowPrivateAttributes.error_list.append( + step_name + "collab_attributes_not_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Collab " + + f"private attributes not accessible {bcolors.ENDC}" + ) + + if hasattr(self.runtime, "_aggregator") is True: + # Error - we are able to access aggregator attributes + TestFlowPrivateAttributes.error_list.append( + step_name + "_aggregator_attributes_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Aggregator" + + f" private attributes accessible {bcolors.ENDC}" + ) + + +if __name__ == "__main__": + # Setup Aggregator with private attributes via callable function + def callable_to_initialize_aggregator_private_attributes(): + return {"test_loader_via_callable": np.random.rand(10, 28, 28)} # Random data + + aggregator = Aggregator( + name="agg", + private_attributes_callable=callable_to_initialize_aggregator_private_attributes, + ) + # Setup aggregator private attributes + aggregator.private_attributes = {"test_loader": np.random.rand(10, 28, 28)} + + # Setup collaborators with private attributes via callable function + collaborator_names = [ + "Portland", + "Seattle", + "Chandler", + "Bangalore", + "Delhi", + "Paris", + "New York", + "Tel Aviv", + "Beijing", + "Tokyo", + ] + + def callable_to_initialize_collaborator_private_attributes(index): + return { + "train_loader_via_callable": np.random.rand(idx * 50, 28, 28), + "test_loader_via_callable": np.random.rand(idx * 10, 28, 28), + } + + collaborators = [] + for idx, collaborator_name in enumerate(collaborator_names): + collab = Collaborator( + name=collaborator_name, + private_attributes_callable=callable_to_initialize_collaborator_private_attributes, + index=idx, + ) + # Setup collaborator private attributes + collab.private_attributes = { + "train_loader": np.random.rand(idx * 50, 28, 28), + "test_loader": np.random.rand(idx * 10, 28, 28), + } + collaborators.append(collab) + + backend = "single_process" + if len(sys.argv) > 1 and sys.argv[1] == "ray": + backend = "ray" + + local_runtime = LocalRuntime( + aggregator=aggregator, collaborators=collaborators, backend=backend + ) + print(f"Local runtime collaborators = {local_runtime.collaborators}") + + flflow = TestFlowPrivateAttributes(checkpoint=True) + flflow.runtime = local_runtime + for i in range(5): + print(f"Starting round {i}...") + flflow.run() + + print(f"{bcolors.OKBLUE}End of Testing FederatedFlow {bcolors.ENDC}") diff --git a/tests/github/experimental/testflow_privateattributes_initialization_without_callable.py b/tests/github/experimental/testflow_privateattributes_initialization_without_callable.py new file mode 100644 index 0000000000..e91f94cd20 --- /dev/null +++ b/tests/github/experimental/testflow_privateattributes_initialization_without_callable.py @@ -0,0 +1,239 @@ +# Copyright (C) 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import sys +import numpy as np +from openfl.experimental.interface import FLSpec, Aggregator, Collaborator +from openfl.experimental.runtime import LocalRuntime +from openfl.experimental.placement import aggregator, collaborator + + +class bcolors: # NOQA: N801 + HEADER = "\033[95m" + OKBLUE = "\033[94m" + OKCYAN = "\033[96m" + OKGREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + + +class TestFlowPrivateAttributes(FLSpec): + """ + Testflow to validate Aggregator private attributes are not accessible to collaborators + and vice versa + """ + + error_list = [] + + @aggregator + def start(self): + """ + Flow start. + """ + print( + f"{bcolors.OKBLUE}Testing FederatedFlow - Starting Test for accessibility of private " + + f"attributes {bcolors.ENDC}" + ) + self.collaborators = self.runtime.collaborators + + validate_collab_private_attr(self, "test_loader", "start") + + self.exclude_agg_to_agg = 10 + self.include_agg_to_agg = 100 + self.next(self.aggregator_step, exclude=["exclude_agg_to_agg"]) + + @aggregator + def aggregator_step(self): + """ + Testing whether Agg private attributes are accessible in next agg step. + Collab private attributes should not be accessible here + """ + validate_collab_private_attr(self, "test_loader", "aggregator_step") + + self.include_agg_to_collab = 42 + self.exclude_agg_to_collab = 40 + self.next( + self.collaborator_step_a, + foreach="collaborators", + exclude=["exclude_agg_to_collab"], + ) + + @collaborator + def collaborator_step_a(self): + """ + Testing whether Collab private attributes are accessible in collab step + Aggregator private attributes should not be accessible here + """ + validate_agg_private_attrs( + self, "train_loader", "test_loader", "collaborator_step_a" + ) + + self.exclude_collab_to_collab = 2 + self.include_collab_to_collab = 22 + self.next(self.collaborator_step_b, exclude=["exclude_collab_to_collab"]) + + @collaborator + def collaborator_step_b(self): + """ + Testing whether Collab private attributes are accessible in collab step + Aggregator private attributes should not be accessible here + """ + + validate_agg_private_attrs( + self, "train_loader", "test_loader", "collaborator_step_b" + ) + self.exclude_collab_to_agg = 10 + self.include_collab_to_agg = 12 + self.next(self.join, exclude=["exclude_collab_to_agg"]) + + @aggregator + def join(self, inputs): + """ + Testing whether attributes are excluded from collab to agg + """ + # Aggregator should only be able to access its own attributes + if hasattr(self, "test_loader") is False: + TestFlowPrivateAttributes.error_list.append( + "aggregator_join_aggregator_attributes_missing" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in join - aggregator private attributes" + + f" not accessible {bcolors.ENDC}" + ) + + for idx, collab in enumerate(inputs): + if ( + hasattr(collab, "train_loader") is True + or hasattr(collab, "test_loader") is True + ): + # Error - we are able to access collaborator attributes + TestFlowPrivateAttributes.error_list.append( + "join_collaborator_attributes_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in Join - Collaborator: {collab}" + + f" private attributes accessible {bcolors.ENDC}" + ) + + self.next(self.end) + + @aggregator + def end(self): + """ + This is the 'end' step. All flows must have an 'end' step, which is the + last step in the flow. + + """ + print( + f"{bcolors.OKBLUE}Testing FederatedFlow - Ending Test for accessibility of private " + + f"attributes {bcolors.ENDC}" + ) + + if TestFlowPrivateAttributes.error_list: + raise ( + AssertionError( + f"{bcolors.FAIL}\n ...Test case failed ... {bcolors.ENDC}" + ) + ) + else: + print(f"{bcolors.OKGREEN}\n ...Test case passed ... {bcolors.ENDC}") + + TestFlowPrivateAttributes.error_list = [] + + +def validate_collab_private_attr(self, private_attr, step_name): + # Aggregator should only be able to access its own attributes + if hasattr(self, private_attr) is False: + TestFlowPrivateAttributes.error_list.append( + step_name + "_aggregator_attributes_missing" + ) + print( + f"{bcolors.FAIL} ...Failed in {step_name} - aggregator private attributes not " + + f"accessible {bcolors.ENDC}" + ) + + for idx, collab in enumerate(self.collaborators): + # Collaborator private attributes should not be accessible + if ( + type(self.collaborators[idx]) is not str + or hasattr(self.runtime, "_collaborators") is True + or hasattr(self.runtime, "__collaborators") is True + ): + # Error - we are able to access collaborator attributes + TestFlowPrivateAttributes.error_list.append( + step_name + "_collaborator_attributes_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - collaborator {collab} " + + f"private attributes accessible {bcolors.ENDC}" + ) + + +def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name): + # Collaborator should only be able to access its own attributes + if hasattr(self, private_attr_1) is False or hasattr(self, private_attr_2) is False: + TestFlowPrivateAttributes.error_list.append( + step_name + "collab_attributes_not_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Collab " + + f"private attributes not accessible {bcolors.ENDC}" + ) + + if hasattr(self.runtime, "_aggregator") is True: + # Error - we are able to access aggregator attributes + TestFlowPrivateAttributes.error_list.append( + step_name + "_aggregator_attributes_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Aggregator" + + f" private attributes accessible {bcolors.ENDC}" + ) + + +if __name__ == "__main__": + # Setup aggregator private attributes + aggregator = Aggregator() + aggregator.private_attributes = { + "test_loader": np.random.rand(10, 28, 28) # Random data + } + + # Setup collaborators with private attributes + collaborator_names = [ + "Portland", + "Seattle", + "Chandler", + "Bangalore", + "Delhi", + "Paris", + "New York", + "Tel Aviv", + "Beijing", + "Tokyo", + ] + collaborators = [Collaborator(name=name) for name in collaborator_names] + for idx, collab in enumerate(collaborators): + collab.private_attributes = { + "train_loader": np.random.rand(idx * 50, 28, 28), + "test_loader": np.random.rand(idx * 10, 28, 28), + } + + backend = "single_process" + if len(sys.argv) > 1 and sys.argv[1] == "ray": + backend = "ray" + + local_runtime = LocalRuntime( + aggregator=aggregator, collaborators=collaborators, backend=backend + ) + print(f"Local runtime collaborators = {local_runtime.collaborators}") + + flflow = TestFlowPrivateAttributes(checkpoint=True) + flflow.runtime = local_runtime + for i in range(5): + print(f"Starting round {i}...") + flflow.run() + + print(f"{bcolors.OKBLUE}End of Testing FederatedFlow {bcolors.ENDC}") diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/.workspace b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/.workspace new file mode 100644 index 0000000000..3c2c5d08b4 --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/.workspace @@ -0,0 +1,2 @@ +current_plan_name: default + diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/plan/cols.yaml b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/plan/cols.yaml new file mode 100644 index 0000000000..59d4f60bce --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/plan/cols.yaml @@ -0,0 +1,4 @@ +# Copyright (C) 2020-2023 Intel Corporation +# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you. + +collaborators: \ No newline at end of file diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/plan/data.yaml b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/plan/data.yaml new file mode 100644 index 0000000000..df38715392 --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/plan/data.yaml @@ -0,0 +1,26 @@ +## Copyright (C) 2020-2023 Intel Corporation +# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you. + +# all keys under 'collaborators' corresponds to a specific colaborator name the corresponding dictionary has data_name, data_path pairs. +# Note that in the mnist case we do not store the data locally, and the data_path is used to pass an integer that helps the data object +# construct the shard of the mnist dataset to be use for this collaborator. + +col1: + callable_func: + settings: + index: 1 + template: src.collaborator_private_attrs.collaborator_private_attrs + private_attributes: src.collaborator_private_attrs.collaborator_private_attributes + +col2: + callable_func: + settings: + index: 2 + template: src.collaborator_private_attrs.collaborator_private_attrs + private_attributes: src.collaborator_private_attrs.collaborator_private_attributes + +aggregator: + callable_func: + settings: {} + template: src.aggregator_private_attrs.aggregator_private_attrs + private_attributes: src.aggregator_private_attrs.aggregator_private_attributes \ No newline at end of file diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/plan/defaults b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/plan/defaults new file mode 100644 index 0000000000..fb82f9c5b6 --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/plan/defaults @@ -0,0 +1,2 @@ +../../workspace/plan/defaults + diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/plan/plan.yaml b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/plan/plan.yaml new file mode 100644 index 0000000000..b5ab688b84 --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/plan/plan.yaml @@ -0,0 +1,26 @@ +# Copyright (C) 2020-2023 Intel Corporation +# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you. + +aggregator : + defaults : plan/defaults/aggregator.yaml + template : openfl.experimental.component.aggregator.Aggregator + settings : + rounds_to_train : 10 + log_metric_callback : + template : src.utils.write_metric + + +collaborator : + defaults : plan/defaults/collaborator.yaml + template : openfl.experimental.component.collaborator.Collaborator + settings : {} + + +federated_flow: + template: src.testflow_privateattributes.TestFlowPrivateAttributes + settings: + checkpoint: true + + +network : + defaults : plan/defaults/network.yaml \ No newline at end of file diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/requirements.txt b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/requirements.txt new file mode 100644 index 0000000000..16b349007c --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/requirements.txt @@ -0,0 +1,4 @@ +torch==1.13.1 +torchvision==0.14.1 +tensorboard +wheel>=0.38.0 # not directly required, pinned by Snyk to avoid a vulnerability diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/__init__.py b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/__init__.py new file mode 100644 index 0000000000..6e02c1c951 --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/__init__.py @@ -0,0 +1,2 @@ +# Copyright (C) 2020-2021 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/aggregator_private_attrs.py b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/aggregator_private_attrs.py new file mode 100644 index 0000000000..c8ed45d384 --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/aggregator_private_attrs.py @@ -0,0 +1,10 @@ +# Copyright (C) 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +import numpy as np + + +def aggregator_private_attrs(): + return {"test_loader_via_callable": np.random.rand(10, 28, 28)} # Random data + + +aggregator_private_attributes = {"test_loader": np.random.rand(10, 28, 28)} # Random data diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/collaborator_private_attrs.py b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/collaborator_private_attrs.py new file mode 100644 index 0000000000..7b7f437d33 --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/collaborator_private_attrs.py @@ -0,0 +1,16 @@ +# Copyright (C) 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +import numpy as np + + +def collaborator_private_attrs(index): + return { + "train_loader_via_callable": np.random.rand(index * 50, 28, 28), + "test_loader_via_callable": np.random.rand(index * 10, 28, 28), + } + + +collaborator_private_attributes = { + "train_loader": np.random.rand(1 * 50, 28, 28), + "test_loader": np.random.rand(1 * 10, 28, 28), +} diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/testflow_privateattributes.py b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/testflow_privateattributes.py new file mode 100644 index 0000000000..579ba2820e --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/testflow_privateattributes.py @@ -0,0 +1,193 @@ +# Copyright (C) 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from openfl.experimental.component import Aggregator +from openfl.experimental.interface import FLSpec +from openfl.experimental.placement import aggregator, collaborator + + +class bcolors: # NOQA: N801 + HEADER = "\033[95m" + OKBLUE = "\033[94m" + OKCYAN = "\033[96m" + OKGREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + + +class TestFlowPrivateAttributes(FLSpec): + """ + Testflow to validate Aggregator private attributes are not accessible to collaborators + and vice versa + """ + + ERROR_LIST = [] + + @aggregator + def start(self): + """ + Flow start. + """ + print( + f"{bcolors.OKBLUE}Testing FederatedFlow - Starting Test for accessibility of private " + + f"attributes {bcolors.ENDC}" + ) + self.collaborators = self.runtime.collaborators + + validate_collab_private_attr(self, "test_loader_via_callable", "start") + + self.exclude_agg_to_agg = 10 + self.include_agg_to_agg = 100 + self.next(self.aggregator_step, exclude=["exclude_agg_to_agg"]) + + @aggregator + def aggregator_step(self): + """ + Testing whether Agg private attributes are accessible in next agg step. + Collab private attributes should not be accessible here + """ + validate_collab_private_attr(self, "test_loader_via_callable", "aggregator_step") + + self.include_agg_to_collab = 42 + self.exclude_agg_to_collab = 40 + self.next( + self.collaborator_step_a, + foreach="collaborators", + exclude=["exclude_agg_to_collab"], + ) + + @collaborator + def collaborator_step_a(self): + """ + Testing whether Collab private attributes are accessible in collab step + Aggregator private attributes should not be accessible here + """ + validate_agg_private_attrs( + self, "train_loader_via_callable", "test_loader_via_callable", "collaborator_step_a" + ) + + self.exclude_collab_to_collab = 2 + self.include_collab_to_collab = 22 + self.next(self.collaborator_step_b, exclude=["exclude_collab_to_collab"]) + + @collaborator + def collaborator_step_b(self): + """ + Testing whether Collab private attributes are accessible in collab step + Aggregator private attributes should not be accessible here + """ + + validate_agg_private_attrs( + self, "train_loader_via_callable", "test_loader_via_callable", "collaborator_step_b" + ) + self.exclude_collab_to_agg = 10 + self.include_collab_to_agg = 12 + self.next(self.join, exclude=["exclude_collab_to_agg"]) + + @aggregator + def join(self, inputs): + """ + Testing whether attributes are excluded from collab to agg + """ + # Aggregator should only be able to access its own attributes + if hasattr(self, "test_loader_via_callable") is False: + TestFlowPrivateAttributes.ERROR_LIST.append( + "aggregator_join_aggregator_attributes_missing" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in join - aggregator private attributes" + + f" not accessible {bcolors.ENDC}" + ) + + for input in enumerate(inputs): + collab = input[1].input + if ( + hasattr(input, "train_loader_via_callable") is True + or hasattr(input, "test_loader_via_callable") is True + ): + # Error - we are able to access collaborator attributes + TestFlowPrivateAttributes.ERROR_LIST.append( + "join_collaborator_attributes_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in Join - Collaborator: {collab}" + + f" private attributes accessible {bcolors.ENDC}" + ) + + self.next(self.end) + + @aggregator + def end(self): + """ + This is the 'end' step. All flows must have an 'end' step, which is the + last step in the flow. + + """ + print( + f"{bcolors.OKBLUE}Testing FederatedFlow - Ending Test for accessibility of private " + + f"attributes {bcolors.ENDC}" + ) + + if TestFlowPrivateAttributes.ERROR_LIST: + raise ( + AssertionError( + f"{bcolors.FAIL}\n ...Test case failed ... {bcolors.ENDC}" + ) + ) + else: + print(f"{bcolors.OKGREEN}\n ...Test case passed ... {bcolors.ENDC}") + + TestFlowPrivateAttributes.ERROR_LIST = [] + + +def validate_collab_private_attr(self, private_attr, step_name): + # Aggregator should only be able to access its own attributes + if hasattr(self, private_attr) is False: + TestFlowPrivateAttributes.ERROR_LIST.append( + step_name + "_aggregator_attributes_missing" + ) + print( + f"{bcolors.FAIL} ...Failed in {step_name} - aggregator private attributes not " + + f"accessible {bcolors.ENDC}" + ) + + for idx, collab in enumerate(self.collaborators): + # Collaborator private attributes should not be accessible + if ( + type(self.collaborators[idx]) is not str + or hasattr(self.runtime, "_collaborators") is True + or hasattr(self.runtime, "__collaborators") is True + ): + # Error - we are able to access collaborator attributes + TestFlowPrivateAttributes.ERROR_LIST.append( + step_name + "_collaborator_attributes_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - collaborator {collab} " + + f"private attributes accessible {bcolors.ENDC}" + ) + + +def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name): + # Collaborator should only be able to access its own attributes + if not hasattr(self, private_attr_1) or not hasattr(self, private_attr_2): + TestFlowPrivateAttributes.ERROR_LIST.append( + step_name + "collab_attributes_not_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Collab " + + f"private attributes not accessible {bcolors.ENDC}" + ) + + if hasattr(self.runtime, "_aggregator") and isinstance(self.runtime._aggregator, Aggregator): + # Error - we are able to access aggregator attributes + TestFlowPrivateAttributes.ERROR_LIST.append( + step_name + "_aggregator_attributes_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Aggregator" + + f" private attributes accessible {bcolors.ENDC}" + ) diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/utils.py b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/utils.py new file mode 100644 index 0000000000..1e56f3e68d --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/utils.py @@ -0,0 +1,20 @@ +# Copyright (C) 2020-2021 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from torch.utils.tensorboard import SummaryWriter + + +writer = None + + +def get_writer(): + """Create global writer object.""" + global writer + if not writer: + writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) + + +def write_metric(node_name, task_name, metric_name, metric, round_number): + """Write metric callback.""" + get_writer() + writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/.workspace b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/.workspace new file mode 100644 index 0000000000..3c2c5d08b4 --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/.workspace @@ -0,0 +1,2 @@ +current_plan_name: default + diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/plan/cols.yaml b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/plan/cols.yaml new file mode 100644 index 0000000000..59d4f60bce --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/plan/cols.yaml @@ -0,0 +1,4 @@ +# Copyright (C) 2020-2023 Intel Corporation +# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you. + +collaborators: \ No newline at end of file diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/plan/data.yaml b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/plan/data.yaml new file mode 100644 index 0000000000..07999de4fc --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/plan/data.yaml @@ -0,0 +1,15 @@ +## Copyright (C) 2020-2023 Intel Corporation +# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you. + +# all keys under 'collaborators' corresponds to a specific colaborator name the corresponding dictionary has data_name, data_path pairs. +# Note that in the mnist case we do not store the data locally, and the data_path is used to pass an integer that helps the data object +# construct the shard of the mnist dataset to be use for this collaborator. + +col1: + private_attributes: src.collaborator_private_attrs.collaborator_private_attributes + +col2: + private_attributes: src.collaborator_private_attrs.collaborator_private_attributes + +aggregator: + private_attributes: src.aggregator_private_attrs.aggregator_private_attributes \ No newline at end of file diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/plan/defaults b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/plan/defaults new file mode 100644 index 0000000000..fb82f9c5b6 --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/plan/defaults @@ -0,0 +1,2 @@ +../../workspace/plan/defaults + diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/plan/plan.yaml b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/plan/plan.yaml new file mode 100644 index 0000000000..b5ab688b84 --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/plan/plan.yaml @@ -0,0 +1,26 @@ +# Copyright (C) 2020-2023 Intel Corporation +# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you. + +aggregator : + defaults : plan/defaults/aggregator.yaml + template : openfl.experimental.component.aggregator.Aggregator + settings : + rounds_to_train : 10 + log_metric_callback : + template : src.utils.write_metric + + +collaborator : + defaults : plan/defaults/collaborator.yaml + template : openfl.experimental.component.collaborator.Collaborator + settings : {} + + +federated_flow: + template: src.testflow_privateattributes.TestFlowPrivateAttributes + settings: + checkpoint: true + + +network : + defaults : plan/defaults/network.yaml \ No newline at end of file diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/requirements.txt b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/requirements.txt new file mode 100644 index 0000000000..16b349007c --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/requirements.txt @@ -0,0 +1,4 @@ +torch==1.13.1 +torchvision==0.14.1 +tensorboard +wheel>=0.38.0 # not directly required, pinned by Snyk to avoid a vulnerability diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/__init__.py b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/__init__.py new file mode 100644 index 0000000000..6e02c1c951 --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/__init__.py @@ -0,0 +1,2 @@ +# Copyright (C) 2020-2021 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/aggregator_private_attrs.py b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/aggregator_private_attrs.py new file mode 100644 index 0000000000..d04ce7f74a --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/aggregator_private_attrs.py @@ -0,0 +1,5 @@ +# Copyright (C) 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +import numpy as np + +aggregator_private_attributes = {"test_loader": np.random.rand(10, 28, 28)} # Random data diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/collaborator_private_attrs.py b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/collaborator_private_attrs.py new file mode 100644 index 0000000000..70cca6a48e --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/collaborator_private_attrs.py @@ -0,0 +1,8 @@ +# Copyright (C) 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +import numpy as np + +collaborator_private_attributes = { + "train_loader": np.random.rand(1 * 50, 28, 28), + "test_loader": np.random.rand(1 * 10, 28, 28), +} diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/testflow_privateattributes.py b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/testflow_privateattributes.py new file mode 100644 index 0000000000..3f19ed71c7 --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/testflow_privateattributes.py @@ -0,0 +1,193 @@ +# Copyright (C) 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from openfl.experimental.component import Aggregator +from openfl.experimental.interface import FLSpec +from openfl.experimental.placement import aggregator, collaborator + + +class bcolors: # NOQA: N801 + HEADER = "\033[95m" + OKBLUE = "\033[94m" + OKCYAN = "\033[96m" + OKGREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + + +class TestFlowPrivateAttributes(FLSpec): + """ + Testflow to validate Aggregator private attributes are not accessible to collaborators + and vice versa + """ + + ERROR_LIST = [] + + @aggregator + def start(self): + """ + Flow start. + """ + print( + f"{bcolors.OKBLUE}Testing FederatedFlow - Starting Test for accessibility of private " + + f"attributes {bcolors.ENDC}" + ) + self.collaborators = self.runtime.collaborators + + validate_collab_private_attr(self, "test_loader", "start") + + self.exclude_agg_to_agg = 10 + self.include_agg_to_agg = 100 + self.next(self.aggregator_step, exclude=["exclude_agg_to_agg"]) + + @aggregator + def aggregator_step(self): + """ + Testing whether Agg private attributes are accessible in next agg step. + Collab private attributes should not be accessible here + """ + validate_collab_private_attr(self, "test_loader", "aggregator_step") + + self.include_agg_to_collab = 42 + self.exclude_agg_to_collab = 40 + self.next( + self.collaborator_step_a, + foreach="collaborators", + exclude=["exclude_agg_to_collab"], + ) + + @collaborator + def collaborator_step_a(self): + """ + Testing whether Collab private attributes are accessible in collab step + Aggregator private attributes should not be accessible here + """ + validate_agg_private_attrs( + self, "train_loader", "test_loader", "collaborator_step_a" + ) + + self.exclude_collab_to_collab = 2 + self.include_collab_to_collab = 22 + self.next(self.collaborator_step_b, exclude=["exclude_collab_to_collab"]) + + @collaborator + def collaborator_step_b(self): + """ + Testing whether Collab private attributes are accessible in collab step + Aggregator private attributes should not be accessible here + """ + + validate_agg_private_attrs( + self, "train_loader", "test_loader", "collaborator_step_b" + ) + self.exclude_collab_to_agg = 10 + self.include_collab_to_agg = 12 + self.next(self.join, exclude=["exclude_collab_to_agg"]) + + @aggregator + def join(self, inputs): + """ + Testing whether attributes are excluded from collab to agg + """ + # Aggregator should only be able to access its own attributes + if hasattr(self, "test_loader") is False: + TestFlowPrivateAttributes.ERROR_LIST.append( + "aggregator_join_aggregator_attributes_missing" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in join - aggregator private attributes" + + f" not accessible {bcolors.ENDC}" + ) + + for input in enumerate(inputs): + collab = input[1].input + if ( + hasattr(input, "train_loader") is True + or hasattr(input, "test_loader") is True + ): + # Error - we are able to access collaborator attributes + TestFlowPrivateAttributes.ERROR_LIST.append( + "join_collaborator_attributes_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in Join - Collaborator: {collab}" + + f" private attributes accessible {bcolors.ENDC}" + ) + + self.next(self.end) + + @aggregator + def end(self): + """ + This is the 'end' step. All flows must have an 'end' step, which is the + last step in the flow. + + """ + print( + f"{bcolors.OKBLUE}Testing FederatedFlow - Ending Test for accessibility of private " + + f"attributes {bcolors.ENDC}" + ) + + if TestFlowPrivateAttributes.ERROR_LIST: + raise ( + AssertionError( + f"{bcolors.FAIL}\n ...Test case failed ... {bcolors.ENDC}" + ) + ) + else: + print(f"{bcolors.OKGREEN}\n ...Test case passed ... {bcolors.ENDC}") + + TestFlowPrivateAttributes.ERROR_LIST = [] + + +def validate_collab_private_attr(self, private_attr, step_name): + # Aggregator should only be able to access its own attributes + if hasattr(self, private_attr) is False: + TestFlowPrivateAttributes.ERROR_LIST.append( + step_name + "_aggregator_attributes_missing" + ) + print( + f"{bcolors.FAIL} ...Failed in {step_name} - aggregator private attributes not " + + f"accessible {bcolors.ENDC}" + ) + + for idx, collab in enumerate(self.collaborators): + # Collaborator private attributes should not be accessible + if ( + type(self.collaborators[idx]) is not str + or hasattr(self.runtime, "_collaborators") is True + or hasattr(self.runtime, "__collaborators") is True + ): + # Error - we are able to access collaborator attributes + TestFlowPrivateAttributes.ERROR_LIST.append( + step_name + "_collaborator_attributes_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - collaborator {collab} " + + f"private attributes accessible {bcolors.ENDC}" + ) + + +def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name): + # Collaborator should only be able to access its own attributes + if not hasattr(self, private_attr_1) or not hasattr(self, private_attr_2): + TestFlowPrivateAttributes.ERROR_LIST.append( + step_name + "collab_attributes_not_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Collab " + + f"private attributes not accessible {bcolors.ENDC}" + ) + + if hasattr(self.runtime, "_aggregator") and isinstance(self.runtime._aggregator, Aggregator): + # Error - we are able to access aggregator attributes + TestFlowPrivateAttributes.ERROR_LIST.append( + step_name + "_aggregator_attributes_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Aggregator" + + f" private attributes accessible {bcolors.ENDC}" + ) diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/utils.py b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/utils.py new file mode 100644 index 0000000000..1e56f3e68d --- /dev/null +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/utils.py @@ -0,0 +1,20 @@ +# Copyright (C) 2020-2021 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from torch.utils.tensorboard import SummaryWriter + + +writer = None + + +def get_writer(): + """Create global writer object.""" + global writer + if not writer: + writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) + + +def write_metric(node_name, task_name, metric_name, metric, round_number): + """Write metric callback.""" + get_writer() + writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number)