Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve Visibility of Aggregator Private Attributes in Experimental Workflow #1084

Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
544cdd2
Experimental Aggregator Workflow: Fixed private attributes issue and …
refai06 Sep 27, 2024
49eac51
Experimental Aggregator Workflow: Fixed private attributes issue and …
refai06 Oct 1, 2024
a827774
Experimental Aggregator Workflow: Updated the code and optimzed
refai06 Oct 4, 2024
5d308cd
Experimental Aggregator Workflow: Updated the code and optimzed
refai06 Oct 4, 2024
dbe8b1e
Merge branch 'securefederatedai:develop' into exp_agg_workflow_privat…
refai06 Oct 7, 2024
461247d
Experimental Aggregator Workflow: Comments update
refai06 Oct 7, 2024
58b5d16
Merge branch 'securefederatedai:develop' into exp_agg_workflow_privat…
refai06 Oct 8, 2024
abdd735
Merge branch 'securefederatedai:develop' into exp_agg_workflow_privat…
refai06 Oct 15, 2024
8323ed9
Experimental Aggregator Workflow: code update and Localruntime:Partic…
refai06 Oct 16, 2024
fbeb05b
Merge branch 'securefederatedai:develop' into exp_agg_workflow_privat…
refai06 Oct 17, 2024
188adff
comments update
refai06 Oct 17, 2024
44ab058
Merge branch 'securefederatedai:develop' into exp_agg_workflow_privat…
refai06 Oct 17, 2024
547bfca
Merge branch 'securefederatedai:develop' into exp_agg_workflow_privat…
refai06 Oct 23, 2024
a8596bb
Merge branch 'securefederatedai:develop' into exp_agg_workflow_privat…
refai06 Oct 24, 2024
0e96228
Optimize code: attribute check
refai06 Oct 23, 2024
4c06768
Merge branch 'securefederatedai:develop' into exp_agg_workflow_privat…
refai06 Oct 25, 2024
2d0a1c9
Merge branch 'securefederatedai:develop' into exp_agg_workflow_privat…
refai06 Nov 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions openfl/experimental/component/aggregator/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ def run_flow(self) -> None:
"""Start the execution and run flow until transition."""
# Start function will be the first step if any flow
f_name = "start"
# Creating a clones from the flow object
FLSpec._reset_clones()
FLSpec._create_clones(self.flow, self.flow.runtime.collaborators)

self.logger.info(f"Starting round {self.current_round}...")
while True:
Expand Down Expand Up @@ -363,22 +366,21 @@ def do_task(self, f_name: str) -> Any:

# Transition check
if aggregator_to_collaborator(f, parent_func):
teoparvanov marked this conversation as resolved.
Show resolved Hide resolved
# Extract clones, instance snapshot and kwargs when reached
# foreach loop first time
if len(self.flow.execute_task_args) > 4:
temp = self.flow.execute_task_args[3:]
self.clones_dict, self.instance_snapshot, self.kwargs = temp

# Delete aggregator private attribute from flow object
self.__delete_agg_attrs_from_clone(self.flow)
# Unpack execute_task_args - clones_dict, instance snapshot and kwargs
self.clones_dict, self.instance_snapshot, self.kwargs = self.flow.execute_task_args[
3:
]
if "foreach" in self.kwargs:
self.flow.filter_exclude_include(f, **self.kwargs)
self.selected_collaborators = getattr(self.flow, self.kwargs["foreach"])
else:
self.kwargs = self.flow.execute_task_args[3]
self.kwargs = self.flow.execute_task_args[-1]

# Transition encountered, break the loop
not_at_transition_point = False

# Delete private attributes from flow object
self.__delete_agg_attrs_from_clone(self.flow)
teoparvanov marked this conversation as resolved.
Show resolved Hide resolved

return f_name if f_name != "end" else None

def send_task_results(
Expand Down
41 changes: 8 additions & 33 deletions openfl/experimental/interface/fl_spec.py
teoparvanov marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -228,22 +228,6 @@ def restore_instance_snapshot(self, ctx: FLSpec, instance_snapshot: List[FLSpec]
if not hasattr(ctx, name):
setattr(ctx, name, attr)

def get_clones(self, kwargs):
"""Create, and prepare clones."""
FLSpec._reset_clones()
FLSpec._create_clones(self, self.runtime.collaborators)
selected_collaborators = self.__getattribute__(kwargs["foreach"])

for col in selected_collaborators:
clone = FLSpec._clones[col]
clone.input = col
artifacts_iter, _ = generate_artifacts(ctx=clone)
attributes = artifacts_iter()
for name, attr in attributes:
setattr(clone, name, deepcopy(attr))
clone._foreach_methods = self._foreach_methods
clone._metaflow_interface = self._metaflow_interface

def next(self, f, **kwargs):
"""Specifies the next task in the flow to execute.

Expand All @@ -264,30 +248,21 @@ def next(self, f, **kwargs):
if aggregator_to_collaborator(f, parent_func):
agg_to_collab_ss = self._capture_instance_snapshot(kwargs=kwargs)

if str(self._runtime) == "FederatedRuntime":
if len(FLSpec._clones) == 0:
self.get_clones(kwargs)

# Remove included / excluded attributes from next task
filter_attributes(self, f, **kwargs)

if str(self._runtime) == "FederatedRuntime":
if f.collaborator_step and not f.aggregator_step:
self._foreach_methods.append(f.__name__)

if "foreach" in kwargs:
self.filter_exclude_include(f, **kwargs)
# if "foreach" in kwargs:
self.execute_task_args = (
self,
f,
parent_func,
FLSpec._clones,
agg_to_collab_ss,
kwargs,
)
else:
self.execute_task_args = (self, f, parent_func, kwargs)
self.execute_task_args = (
self,
f,
parent_func,
FLSpec._clones,
agg_to_collab_ss,
kwargs,
)
Comment on lines +258 to +265
Copy link
Collaborator

@teoparvanov teoparvanov Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we remove the private attribute filtering from here, then who is responsible for doing it, especially if we execute the FLSpec via the run() method, rather than through the export feature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Private attribute filtering is required while transitioning from aggregator to collab (and vice-versa). For LocalRuntime it is done inside Participants (openfl/experimental/interface/participants.py) and in FederatedRuntime it is done inside Component (openfl/experimental/component/aggregator/aggregator.py and openfl/experimental/component/collaborator/collaborator.py)

Copy link
Collaborator

@teoparvanov teoparvanov Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for the explanation, @refai06 - the distinction is indeed not obvious for the "uninitiated" reader...

As a follow-up PR, I'd suggest re-organizing a little bit the code, to make it easier to navigate and review. Right now, everything related to Workflow API resides directly under openfl/experimental - thus implicitly "disallowing" other experimental features. Instead, it may be better to have a dedicated openfl/experimental/workflow package for example. Additionally, under openfl/experimental/workflow, we could imagine separate sub-packages for local, federated and export for example.

WDYT?


elif str(self._runtime) == "LocalRuntime":
# update parameters required to execute execute_task function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@


def aggregator_private_attrs():
return {"test_loader": np.random.rand(10, 28, 28)} # Random data
return {"test_loader_agg": np.random.rand(10, 28, 28)} # Random data
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def start(self):
)
self.collaborators = self.runtime.collaborators

validate_collab_private_attr(self, "test_loader", "start")
validate_agg_private_attr(self, "start", aggr = ["test_loader_agg"], collabs =["train_loader", "test_loader"])

self.exclude_agg_to_agg = 10
self.include_agg_to_agg = 100
Expand All @@ -49,7 +49,7 @@ def aggregator_step(self):
Testing whether Agg private attributes are accessible in next agg step.
Collab private attributes should not be accessible here
"""
validate_collab_private_attr(self, "test_loader", "aggregator_step")
validate_agg_private_attr(self, "aggregator_step", aggr = ["test_loader_agg"], collabs =["train_loader", "test_loader"])

self.include_agg_to_collab = 42
self.exclude_agg_to_collab = 40
Expand All @@ -65,8 +65,8 @@ def collaborator_step_a(self):
Testing whether Collab private attributes are accessible in collab step
Aggregator private attributes should not be accessible here
"""
validate_agg_private_attrs(
self, "train_loader", "test_loader", "collaborator_step_a"
validate_collab_private_attrs(
self, "collaborator_step_a", aggr = ["test_loader_agg"], collabs =["train_loader", "test_loader"]
)

self.exclude_collab_to_collab = 2
Expand All @@ -80,8 +80,8 @@ def collaborator_step_b(self):
Aggregator private attributes should not be accessible here
"""

validate_agg_private_attrs(
self, "train_loader", "test_loader", "collaborator_step_b"
validate_collab_private_attrs(
self, "collaborator_step_b", aggr = ["test_loader_agg"], collabs =["train_loader", "test_loader"]
)
self.exclude_collab_to_agg = 10
self.include_collab_to_agg = 12
Expand All @@ -93,7 +93,7 @@ def join(self, inputs):
Testing whether attributes are excluded from collab to agg
"""
# Aggregator should only be able to access its own attributes
if hasattr(self, "test_loader") is False:
if hasattr(self, "test_loader_agg") is False:
TestFlowPrivateAttributes.ERROR_LIST.append(
"aggregator_join_aggregator_attributes_missing"
)
Expand Down Expand Up @@ -143,19 +143,45 @@ def end(self):
TestFlowPrivateAttributes.ERROR_LIST = []


def validate_collab_private_attr(self, private_attr, step_name):
def validate_agg_private_attr(self, step_name, **private_attrs_kwargs):

"""
Validate that aggregator can only access their own attributes

Args:
step_name: Name of the step being validated
private_attr_kwargs: Keyword arguments specifying the names of private attributes for the aggregator and collaborators.
"""
agg_attrs = private_attrs_kwargs.get('aggr',[])
collab_attrs = private_attrs_kwargs.get('collabs', [])
# Aggregator should only be able to access its own attributes
if hasattr(self, private_attr) is False:

# check for missing aggregator attributes
inaccessible_agg_attrs = [attr for attr in agg_attrs if not hasattr(self, attr)]
if inaccessible_agg_attrs:
TestFlowPrivateAttributes.ERROR_LIST.append(
step_name + "_aggregator_attributes_missing"
step_name + " aggregator_attributes_missing"
)
print(
f"{bcolors.FAIL} ...Failed in {step_name} - aggregator private attributes not "
+ f"accessible {bcolors.ENDC}"
)

# check for collaborator private attributes that should not be accessible
breached_collab_attrs = [attr for attr in collab_attrs if hasattr(self,attr) is True]
if breached_collab_attrs:
TestFlowPrivateAttributes.ERROR_LIST.append(
step_name + "_collaborator_attributes_found"
)
print(
f"{bcolors.FAIL} ... Attribute test failed in {step_name} - collaborator"
+ f"private attributes accessible:{','.join(breached_collab_attrs)} {bcolors.ENDC}"
)



for idx, collab in enumerate(self.collaborators):
# Collaborator private attributes should not be accessible
# Collaborator attributes should not be accessible in aggregator step
if (
type(self.collaborators[idx]) is not str
or hasattr(self.runtime, "_collaborators") is True
Expand All @@ -171,17 +197,44 @@ def validate_collab_private_attr(self, private_attr, step_name):
)


def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name):
def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs):

"""
Validate that collaborators can only access their own attributes

Args:
step_name: Name of the step being validated
private_attr_kwargs: Keyword arguments specifying the names of private attributes for the aggregator and collaborators.
"""
agg_attrs = private_attrs_kwargs.get('aggr',[])
collab_attrs = private_attrs_kwargs.get('collabs', [])

# Collaborator should only be able to access its own attributes
if not hasattr(self, private_attr_1) or not hasattr(self, private_attr_2):

# check for missing collaborators attributes
inaccessible_collab_attrs = [attr for attr in collab_attrs if not hasattr(self,attr)]

if inaccessible_collab_attrs:
TestFlowPrivateAttributes.ERROR_LIST.append(
step_name + "collab_attributes_not_found"
step_name + " collab_attributes_not_found"
)
print(
f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Collab "
+ f"private attributes not accessible {bcolors.ENDC}"
)
# check for aggregator private attributes that should not be accessible
breached_agg_attr = [attr for attr in agg_attrs if hasattr(self, attr) is True]
if breached_agg_attr:
TestFlowPrivateAttributes.ERROR_LIST.append(
step_name + "_aggregator_attributes_found"
)

print(
f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Aggregator"
+ f" private attributes accessible: {','.join(breached_agg_attr)} {bcolors.ENDC}"
)

# Aggregator attributes should not be accessible in collaborator step
if hasattr(self.runtime, "_aggregator") and isinstance(self.runtime._aggregator, Aggregator):
# Error - we are able to access aggregator attributes
TestFlowPrivateAttributes.ERROR_LIST.append(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


def aggregator_private_attrs():
return {"test_loader_via_callable": np.random.rand(10, 28, 28)} # Random data
return {"test_loader_agg_via_callable": np.random.rand(10, 28, 28)} # Random data


aggregator_private_attributes = {"test_loader": np.random.rand(10, 28, 28)} # Random data
aggregator_private_attributes = {"test_loader_agg": np.random.rand(10, 28, 28)} # Random data
Loading
Loading