diff --git a/docs/workflow_interface.rst b/docs/workflow_interface.rst index 7ccbdb092e1..0886aa3fc96 100644 --- a/docs/workflow_interface.rst +++ b/docs/workflow_interface.rst @@ -181,9 +181,9 @@ A :code:`Runtime` defines where the flow will be executed, who the participants local_runtime = LocalRuntime(aggregator=aggregator_, collaborators=collaborators) -Let's break this down, starting with the :code:`Aggregator` and :code:`Collaborator` components. These components represent the *Participants* in a Federated Learning experiment. Each participant has its own set of *private attributes* that represent the information / data specific to its role or requirements. As the name suggests these *private attributes* are accessible only to the particular participant, and are appropriately inserted into or filtered out of current Flow state when transferring from between Participants. For e.g. Collaborator private attributes are inserted into :code:`self` when transitioning from Aggregator to Collaborator and are filtered out when transitioning from Collaborator to Aggregator +Let's break this down, starting with the :code:`Aggregator` and :code:`Collaborator` components. These components represent the *Participants* in a Federated Learning experiment. Each participant has its own set of *private attributes* that represent the information / data specific to its role or requirements. As the name suggests these *private attributes* are accessible only to the particular participant, and are appropriately inserted into or filtered out of current Flow state when transferring from between Participants. For e.g. Collaborator private attributes are inserted into :code:`flow` when transitioning from Aggregator to Collaborator and are filtered out when transitioning from Collaborator to Aggregator. -In the above :code:`FederatedFlow`, each collaborator accesses train and test datasets via *private attributes* :code:`train_loader` and :code:`test_loader`. These *private attributes* need to be set using a (user defined) callback function while instantiating the participant. Participant *private attributes* are returned by the callback function in form of a dictionary, where the key is the name of the attribute and the value is the object +In the above :code:`FederatedFlow`, each collaborator accesses train and test datasets via *private attributes* :code:`train_loader` and :code:`test_loader`. These *private attributes* need to be set using a (user defined) callback function while instantiating the participant. Participant *private attributes* are returned by the callback function in form of a dictionary, where the key is the name of the attribute and the value is the object. In this example callback function :code:`callable_to_initialize_collaborator_private_attributes()` returns the collaborator private attributes :code:`train_loader` and :code:`test_loader` that are accessed by collaborator steps (:code:`aggregated_model_validation`, :code:`train` and :code:`local_model_validation`). Some important points to remember while creating callback function and private attributes are: @@ -221,7 +221,7 @@ For users with large servers or multiple GPUs they wish to take advantage of, we .. code-block:: python # Aggregator - aggregator_ = Aggregator() + aggregator_ = Aggregator(num_gpus=0.2) collaborator_names = ["Portland", "Seattle", "Chandler", "Bangalore"] @@ -234,7 +234,7 @@ For users with large servers or multiple GPUs they wish to take advantage of, we collaborators.append( Collaborator( name=collaborator_name, - num_gpus=0.25, # Number of the GPU allocated to Participant + num_gpus=0.2, # Number of the GPU allocated to Participant private_attributes_callable=callable_to_initialize_collaborator_private_attributes, index=idx, n_collaborators=len(collaborator_names), @@ -247,7 +247,9 @@ For users with large servers or multiple GPUs they wish to take advantage of, we # The Ray Backend will now be used for local execution local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='ray') -In the above example, we have used :code:`num_gpus=0.25` while instantiating Collaborator to specify that each participant shall use 1/4th of GPU - this results in one GPU being dedicated for a total of 4 collaborators. Users can tune these arguments based on their Federated Learning requirements and available hardware resources. Configurations where one Participant is shared across GPUs is not supported. For e.g. trying to run 5 participants on 2 GPU hardware with :code:`num_gpus=0.4` will not work since 80% of each GPU is allocated to 2 collaborators and 5th collaborator does not have any available GPU remaining for use +In the above example, we have used :code:`num_gpus=0.2` while instantiating Aggregator and Collaborator to specify that each participant shall use 1/5th of GPU - this results in one GPU being dedicated for a total of 4 collaborators and 1 Aggregator. Users can tune these arguments based on their Federated Learning requirements and available hardware resources. Configurations where one Participant is shared across GPUs is not supported. For e.g. trying to run 5 participants on 2 GPU hardware with :code:`num_gpus=0.4` will not work since 80% of each GPU is allocated to 4 participants and 5th participant does not have any available GPU remaining for use. + +**Note:** It is not necessary to have ALL the participants use GPUs. For e.g. only the Collaborator are allocated to GPUs. In this scenario user should ensure that the artifacts returned by Collaborators to Aggregator (e.g. locally trained model object) should be loaded back to CPU before exiting the collaborator step (i.e. before the join step). As Tensorflow manages the object allocation by default therefore this step is needed only for Pytorch. Debugging with the Metaflow Client ================================== @@ -266,19 +268,38 @@ After the flow has started running, you can use the Metaflow Client to get inter .. code-block:: python - from metaflow import Flow, Run, Task, Step - + from metaflow import Metaflow, Flow, Step, Task + + # Initialize Metaflow object and obtain list of executed flows: + m = Metaflow() + list(m) + > [Flow('FederatedFlow'), Flow('AggregatorValidationFlow'), Flow('FederatedFlow_MNIST_Watermarking')] + # The name of the flow is the name of the class - flow = Flow('FederatedFlow') - run = flow.latest_run + # Identify the Flow name + flow_name = 'FederatedFlow' + + # List all instances of Federatedflow executed under distinct run IDs + flow = Flow(flow_name) + list(flow) + > [Run('FederatedFlow/1692946840822001'), + Run('FederatedFlow/1692946796234386'), + Run('FederatedFlow/1692902602941163'), + Run('FederatedFlow/1692902559123920'),] + + # To Retrieve the latest run of the Federatedflow + run = Flow(flow_name).latest_run + print(run) + > Run('FederatedFlow/1692946840822001') + list(run) - > [Step('FederatedFlow/1671152854447797/end'), - Step('FederatedFlow/1671152854447797/join'), - Step('FederatedFlow/1671152854447797/local_model_validation'), - Step('FederatedFlow/1671152854447797/train'), - Step('FederatedFlow/1671152854447797/aggregated_model_validation'), - Step('FederatedFlow/1671152854447797/start')] - step = Step('FederatedFlow/1671152854447797/aggregated_model_validation') + > [Step('FederatedFlow/1692946840822001/end'), + Step('FederatedFlow/1692946840822001/join'), + Step('FederatedFlow/1692946840822001/local_model_validation'), + Step('FederatedFlow/1692946840822001/train'), + Step('FederatedFlow/1692946840822001/aggregated_model_validation'), + Step('FederatedFlow/1692946840822001/start')] + step = Step('FederatedFlow/1692946840822001/aggregated_model_validation') for task in step: if task.data.input == 'Portland': print(task.data) @@ -308,6 +329,37 @@ And if we wanted to get log or error message for that task, you can just run: print(portland_task.stderr) > [No output] +Also, If we wanted to get the best model and the last model, you can just run: + +.. code-block:: python + + # Choose the specific step containing the desired models (e.g., 'join' step): + step = Step('FederatedFlow/1692946840822001/join') + list(step) + > [Task('FederatedFlow/1692946840822001/join/12'),--> Round 3 + Task('FederatedFlow/1692946840822001/join/9'), --> Round 2 + Task('FederatedFlow/1692946840822001/join/6'), --> Round 1 + Task('FederatedFlow/1692946840822001/join/3')] --> Round 0 + + """The sequence of tasks represents each round, with the most recent task corresponding to the final round and the preceding tasks indicating the previous rounds + in chronological order. + To determine the best model, analyze the command line logs and model accuracy for each round. Then, provide the corresponding task ID associated with that Task""" + task = Task('FederatedFlow/1692946840822001/join/9') + + # Access the best model and its associated data + best_model = task.data.model + best_local_model_accuracy = task.data.local_model_accuracy + best_aggregated_model_accuracy = t.data.aggregated_model_accuracy + + # To retrieve the last model, select the most recent Task i.e last round. + task = Task('FederatedFlow/1692946840822001/join/12') + last_model = task.data.model + + # Save the chosen models using a suitable framework (e.g., PyTorch in this example): + import torch + torch.save(last_model.state_dict(), PATH) + torch.save(best_model.state_dict(), PATH) + While this information is useful for debugging, depending on your workflow it may require significant disk space. For this reason, `checkpoint` is disabled by default. Runtimes: Future Plans @@ -327,4 +379,3 @@ Our goal is to make it a one line change to configure where and how a flow is ex federated_runtime = FederatedRuntime(...) flow.runtime = federated_runtime flow.run() -