Skip to content

Commit

Permalink
Incorporated Internal review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ishant Thakare <[email protected]>
  • Loading branch information
ishant162 committed May 20, 2024
1 parent 4d86169 commit 151efa6
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 29 deletions.
47 changes: 37 additions & 10 deletions docs/about/features_index/workflowinterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -169,22 +169,49 @@ A :code:`Runtime` defines where the flow will be executed, who the participants
Let's break this down, starting with the :code:`Aggregator` and :code:`Collaborator` components. These components represent the *Participants* in a Federated Learning experiment. Each participant has its own set of *private attributes* that represent the information / data specific to its role or requirements. As the name suggests these *private attributes* are accessible only to the particular participant, and are appropriately inserted into or filtered out of current Flow state when transferring from between Participants. For e.g. Collaborator private attributes are inserted into :code:`flow` when transitioning from Aggregator to Collaborator and are filtered out when transitioning from Collaborator to Aggregator.

In the above :code:`FederatedFlow`, each collaborator accesses train and test datasets via *private attributes* :code:`train_loader` and :code:`test_loader`. These *private attributes* need to be set using a (user defined) callback function while instantiating the participant. Participant *private attributes* are returned by the callback function in form of a dictionary, where the key is the name of the attribute and the value is the object. In this example callback function :code:`callable_to_initialize_collaborator_private_attributes()` returns the collaborator private attributes :code:`train_loader` and :code:`test_loader` that are accessed by collaborator steps (:code:`aggregated_model_validation`, :code:`train` and :code:`local_model_validation`).
In the above :code:`FederatedFlow`, each collaborator accesses train and test datasets via *private attributes* :code:`train_loader` and :code:`test_loader`. These *private attributes* need to be set in form of a dictionary(user defined), where the key is the name of the attribute and the value is the object. In this example :code:`collaborator.private_attributes` sets the collaborator *private attributes* :code:`train_loader` and :code:`test_loader` that are accessed by collaborator steps (:code:`aggregated_model_validation`, :code:`train` and :code:`local_model_validation`).

There is another way of initializing private attributes in which *private attributes* need to be set in form of a dictionary(user defined), where the key is the name of the attribute and the value is the object.
There is another way of initializing private attributes in which *private attributes* need to be set using a (user defined) callback function while instantiating the participant.

.. code-block:: python
collaborator_names = ['Portland', 'Seattle', 'Chandler','Bangalore']
for collaborator_name in collaborator_names:
collab = Collaborator(name=collaborator_name)
collab.private_attributes = {
'train_loader': torch.utils.data.DataLoader(local_train,batch_size=batch_size_train, shuffle=True),
'test_loader': torch.utils.data.DataLoader(local_test,batch_size=batch_size_train, shuffle=True)
# Aggregator
aggregator_ = Aggregator()
collaborator_names = ["Portland", "Seattle", "Chandler", "Bangalore"]
def callable_to_initialize_collaborator_private_attributes(index, n_collaborators, batch_size, train_dataset, test_dataset):
train = deepcopy(train_dataset)
test = deepcopy(test_dataset)
train.data = train_dataset.data[index::n_collaborators]
train.targets = train_dataset.targets[index::n_collaborators]
test.data = test_dataset.data[index::n_collaborators]
test.targets = test_dataset.targets[index::n_collaborators]
return {
"train_loader": torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True),
"test_loader": torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=True),
}
collaborators.append(collab)
In this example :code:`collaborator.private_attributes` sets the collaborator private attributes :code:`train_loader` and :code:`test_loader`.
# Setup collaborators private attributes via callable function
collaborators = []
for idx, collaborator_name in enumerate(collaborator_names):
collaborators.append(
Collaborator(
name=collaborator_name,
private_attributes_callable=callable_to_initialize_collaborator_private_attributes,
index=idx,
n_collaborators=len(collaborator_names),
train_dataset=mnist_train,
test_dataset=mnist_test,
batch_size=64
)
)
local_runtime = LocalRuntime(aggregator=aggregator_, collaborators=collaborators)
Participant *private attributes* are returned by the callback function in form of a dictionary, where the key is the name of the attribute and the value is the object. In this example callback function :code:`callable_to_initialize_collaborator_private_attributes()` returns :code:`train_loader` and :code:`test_loader` in the form of a dictionary.

**Note:**If both callable and private attributes are provided, the initialization will prioritize the private attributes through the :code:`callable` function.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,13 @@
"id": "2aabf61e",
"metadata": {},
"source": [
"You'll notice in the `FederatedFlow` definition above that there were certain attributes that the flow was not initialized with, namely the `train_loader` and `test_loader` for each of the collaborators\n",
"You'll notice in the `FederatedFlow` definition above that there were certain attributes that the flow was not initialized with, namely the `train_loader` and `test_loader` for each of the collaborators. These are **private_attributes** of the particular participant and (as the name suggests) are accessible ONLY to the particular participant's through its task. Additionally these private attributes are always filtered out of the current state when transferring from collaborator to aggregator, and vice versa.\n",
" \n",
"These are **private_attributes** of the particular participant: a dictionary where key is name of the attribute, and value is the object that is made accessible through that participant's task. In this example, we segment shards of the MNIST dataset for **four collaborators**: Portland, Seattle, Chandler, and Portland. Each collaborator has their own slice of the dataset that's accessible via the `train_loader` or `test_loader` attribute. As shown below, `collaborator.private_attributes`, sets the private attributes `train_loader` and `test_loader` of the collaborator\n",
"Users can directly specify a collaborator's private attributes via `collaborator.private_attributes` which is a dictionary where key is name of the attribute and value is the object that is made accessible to collaborator. In this example, we segment shards of the MNIST dataset for four collaborators: `Portland`, `Seattle`, `Chandler` and `Bangalore`. Each shard / slice of the dataset is assigned to collaborator's private_attribute.\n",
" \n",
"Note that the private attributes are flexible, and user can choose to pass in a completely different type of object to any of the collaborators or aggregator (with an arbitrary name). These private attributes will always be filtered out of the current state when transfering from collaborator to aggregator, or vice versa"
"Note that the private attributes are flexible, and user can choose to pass in a completely different type of object to any of the collaborators or aggregator (with an arbitrary name).\n",
" \n",
"Subsequent tutorials shall show examples to assign private_attributes for aggregator and another methodology of specifying private attributes via a callable."
]
},
{
Expand Down
12 changes: 6 additions & 6 deletions openfl/experimental/interface/participants.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) ->
initialize private attributes of Collaborator object by invoking
the callable or by passing private_attrs argument
"""
if private_attrs:
self.private_attributes = private_attrs
elif self.private_attributes_callable is not None:
if self.private_attributes_callable is not None:
self.private_attributes = self.private_attributes_callable(**self.kwargs)
elif private_attrs:
self.private_attributes = private_attrs

def __set_collaborator_attrs_to_clone(self, clone: Any) -> None:
"""
Expand Down Expand Up @@ -195,10 +195,10 @@ def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) ->
initialize private attributes of Aggregator object by invoking
the callable or by passing private_attrs argument
"""
if private_attrs:
self.private_attributes = private_attrs
elif self.private_attributes_callable is not None:
if self.private_attributes_callable is not None:
self.private_attributes = self.private_attributes_callable(**self.kwargs)
elif private_attrs:
self.private_attributes = private_attrs

def __set_agg_attrs_to_clone(self, clone: Any) -> None:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def start(self):
)
self.collaborators = self.runtime.collaborators

validate_collab_private_attr(self, "test_loader", "start")
validate_collab_private_attr(self, "test_loader_via_callable", "start")

self.exclude_agg_to_agg = 10
self.include_agg_to_agg = 100
Expand All @@ -51,7 +51,7 @@ def aggregator_step(self):
Testing whether Agg private attributes are accessible in next agg step.
Collab private attributes should not be accessible here
"""
validate_collab_private_attr(self, "test_loader", "aggregator_step")
validate_collab_private_attr(self, "test_loader_via_callable", "aggregator_step")

self.include_agg_to_collab = 42
self.exclude_agg_to_collab = 40
Expand All @@ -68,7 +68,7 @@ def collaborator_step_a(self):
Aggregator private attributes should not be accessible here
"""
validate_agg_private_attrs(
self, "train_loader", "test_loader", "collaborator_step_a"
self, "train_loader_via_callable", "test_loader_via_callable", "collaborator_step_a"
)

self.exclude_collab_to_collab = 2
Expand All @@ -83,7 +83,7 @@ def collaborator_step_b(self):
"""

validate_agg_private_attrs(
self, "train_loader", "test_loader", "collaborator_step_b"
self, "train_loader_via_callable", "test_loader_via_callable", "collaborator_step_b"
)
self.exclude_collab_to_agg = 10
self.include_collab_to_agg = 12
Expand All @@ -95,7 +95,7 @@ def join(self, inputs):
Testing whether attributes are excluded from collab to agg
"""
# Aggregator should only be able to access its own attributes
if hasattr(self, "test_loader") is False:
if hasattr(self, "test_loader_via_callable") is False:
TestFlowPrivateAttributes.error_list.append(
"aggregator_join_aggregator_attributes_missing"
)
Expand All @@ -106,8 +106,8 @@ def join(self, inputs):

for idx, collab in enumerate(inputs):
if (
hasattr(collab, "train_loader") is True
or hasattr(collab, "test_loader") is True
hasattr(collab, "train_loader_via_callable") is True
or hasattr(collab, "test_loader_via_callable") is True
):
# Error - we are able to access collaborator attributes
TestFlowPrivateAttributes.error_list.append(
Expand Down Expand Up @@ -197,7 +197,7 @@ def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name):
if __name__ == "__main__":
# Setup Aggregator with private attributes via callable function
def callable_to_initialize_aggregator_private_attributes():
return {"test_loader": np.random.rand(10, 28, 28)} # Random data
return {"test_loader_via_callable": np.random.rand(10, 28, 28)} # Random data

aggregator = Aggregator(
name="agg",
Expand All @@ -222,8 +222,8 @@ def callable_to_initialize_aggregator_private_attributes():

def callable_to_initialize_collaborator_private_attributes(index):
return {
"train_loader": np.random.rand(idx * 50, 28, 28),
"test_loader": np.random.rand(idx * 10, 28, 28),
"train_loader_via_callable": np.random.rand(idx * 50, 28, 28),
"test_loader_via_callable": np.random.rand(idx * 10, 28, 28),
}

collaborators = []
Expand Down

0 comments on commit 151efa6

Please sign in to comment.