Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve Visibility of Aggregator Private Attributes in Experimental Workflow #1084

Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
544cdd2
Experimental Aggregator Workflow: Fixed private attributes issue and …
refai06 Sep 27, 2024
49eac51
Experimental Aggregator Workflow: Fixed private attributes issue and …
refai06 Oct 1, 2024
a827774
Experimental Aggregator Workflow: Updated the code and optimzed
refai06 Oct 4, 2024
5d308cd
Experimental Aggregator Workflow: Updated the code and optimzed
refai06 Oct 4, 2024
dbe8b1e
Merge branch 'securefederatedai:develop' into exp_agg_workflow_privat…
refai06 Oct 7, 2024
461247d
Experimental Aggregator Workflow: Comments update
refai06 Oct 7, 2024
58b5d16
Merge branch 'securefederatedai:develop' into exp_agg_workflow_privat…
refai06 Oct 8, 2024
abdd735
Merge branch 'securefederatedai:develop' into exp_agg_workflow_privat…
refai06 Oct 15, 2024
8323ed9
Experimental Aggregator Workflow: code update and Localruntime:Partic…
refai06 Oct 16, 2024
fbeb05b
Merge branch 'securefederatedai:develop' into exp_agg_workflow_privat…
refai06 Oct 17, 2024
188adff
comments update
refai06 Oct 17, 2024
44ab058
Merge branch 'securefederatedai:develop' into exp_agg_workflow_privat…
refai06 Oct 17, 2024
547bfca
Merge branch 'securefederatedai:develop' into exp_agg_workflow_privat…
refai06 Oct 23, 2024
a8596bb
Merge branch 'securefederatedai:develop' into exp_agg_workflow_privat…
refai06 Oct 24, 2024
0e96228
Optimize code: attribute check
refai06 Oct 23, 2024
4c06768
Merge branch 'securefederatedai:develop' into exp_agg_workflow_privat…
refai06 Oct 25, 2024
2d0a1c9
Merge branch 'securefederatedai:develop' into exp_agg_workflow_privat…
refai06 Nov 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions openfl/experimental/component/aggregator/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,18 @@ def __initialize_private_attributes(self, kwargs: Dict) -> None:
self.__private_attrs."""
self.__private_attrs = self.__private_attrs_callable(**kwargs)

def __set_attributes_to_clone(self, clone: Any) -> None:
"""Set private_attrs to clone as attributes."""
def __set_private_attrs_to_clone(self, clone: Any) -> None:
"""Set private_attrs of Aggregator as attributes of FLSpec clone"""
if len(self.__private_attrs) > 0:
for name, attr in self.__private_attrs.items():
setattr(clone, name, attr)

def __delete_agg_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None:
def __delete_private_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None:
"""
Remove aggregator private attributes from FLSpec clone before
transition from Aggregator step to collaborator steps.
Remove aggregator private attributes from FLSpec clone
before transition from aggregator step to collaborator steps.
Instead of removing private attributes this method can also replace
private attributes with a string (required in checkpointing)
"""
# Update aggregator private attributes by taking latest
# parameters from clone, then delete attributes from clone.
Expand Down Expand Up @@ -152,6 +154,9 @@ def run_flow(self) -> None:
"""Start the execution and run flow until transition."""
# Start function will be the first step if any flow
f_name = "start"
# Creating a clones from the flow object
FLSpec._reset_clones()
FLSpec._create_clones(self.flow, self.flow.runtime.collaborators)

self.logger.info(f"Starting round {self.current_round}...")
while True:
Expand Down Expand Up @@ -298,7 +303,7 @@ def do_task(self, f_name: str) -> Any:
string / None: Next collaborator function or None end of the flow.
"""
# Set aggregator private attributes to flow object
self.__set_attributes_to_clone(self.flow)
self.__set_private_attrs_to_clone(self.flow)

not_at_transition_point = True
# Run a loop to execute flow steps until not_at_transition_point
Expand All @@ -311,9 +316,11 @@ def do_task(self, f_name: str) -> Any:
if f.__name__ == "end":
f()
# Take the checkpoint of "end" step
self.__delete_agg_attrs_from_clone(self.flow, "Private attributes: Not Available.")
self.__delete_private_attrs_from_clone(
self.flow, "Private attributes: Not Available."
)
self.call_checkpoint(self.flow, f)
self.__set_attributes_to_clone(self.flow)
self.__set_private_attrs_to_clone(self.flow)
# Check if all rounds of external loop is executed
if self.current_round is self.rounds_to_train:
# All rounds execute, it is time to quit
Expand Down Expand Up @@ -348,34 +355,31 @@ def do_task(self, f_name: str) -> Any:
# clones are arguments
f(*selected_clones)

self.__delete_agg_attrs_from_clone(self.flow, "Private attributes: Not Available.")
self.__delete_private_attrs_from_clone(self.flow, "Private attributes: Not Available.")
# Take the checkpoint of executed step
self.call_checkpoint(self.flow, f)
self.__set_attributes_to_clone(self.flow)
self.__set_private_attrs_to_clone(self.flow)

# Next function in the flow
_, f, parent_func = self.flow.execute_task_args[:3]
f_name = f.__name__

self.flow._display_transition_logs(f, parent_func)

# Transition check
if aggregator_to_collaborator(f, parent_func):
teoparvanov marked this conversation as resolved.
Show resolved Hide resolved
# Extract clones, instance snapshot and kwargs when reached
# foreach loop first time
if len(self.flow.execute_task_args) > 4:
temp = self.flow.execute_task_args[3:]
self.clones_dict, self.instance_snapshot, self.kwargs = temp

self.selected_collaborators = getattr(self.flow, self.kwargs["foreach"])
else:
self.kwargs = self.flow.execute_task_args[3]

# Transition encountered, break the loop
not_at_transition_point = False

# Delete private attributes from flow object
self.__delete_agg_attrs_from_clone(self.flow)
teoparvanov marked this conversation as resolved.
Show resolved Hide resolved
# Delete aggregator private attribute from flow object
self.__delete_private_attrs_from_clone(self.flow)

# Unpack execute_task_args - clones_dict, instance snapshot and kwargs
self.clones_dict, self.instance_snapshot, self.kwargs = self.flow.execute_task_args[3:]
if "foreach" in self.kwargs:
self.flow.filter_exclude_include(f, **self.kwargs)
self.selected_collaborators = getattr(self.flow, self.kwargs["foreach"])
else:
self.kwargs = self.flow.execute_task_args[-1]

return f_name if f_name != "end" else None

Expand Down
20 changes: 11 additions & 9 deletions openfl/experimental/component/collaborator/collaborator.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ def __initialize_private_attributes(self, kwargs: Dict) -> None:
"""
self.__private_attrs = self.__private_attrs_callable(**kwargs)

def __set_attributes_to_clone(self, clone: Any) -> None:
"""Set private_attrs to clone as attributes.
def __set_private_attrs_to_clone(self, clone: Any) -> None:
"""Set private_attrs of Collaborator as attributes of FLSpec clone.

Args:
clone (FLSpec): Clone to which private attributes are to be
Expand All @@ -80,10 +80,12 @@ def __set_attributes_to_clone(self, clone: Any) -> None:
for name, attr in self.__private_attrs.items():
setattr(clone, name, attr)

def __delete_agg_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None:
def __delete_private_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None:
"""
Remove aggregator private attributes from FLSpec clone before
transition from Aggregator step to collaborator steps
Remove collaborator private attributes from FLSpec clone
before transition from Collaborator step to aggregator steps.
Instead of removing private attributes this method can also replace
private attributes with a string (required in checkpointing)

Args:
clone (FLSpec): Clone from which private attributes are to be
Expand Down Expand Up @@ -190,17 +192,17 @@ def do_task(self, f_name: str, ctx: Any) -> Tuple:
Tuple(str, FLSpec): Next aggregator function, and updated context.
"""
# Set private attributes to context
self.__set_attributes_to_clone(ctx)
self.__set_private_attrs_to_clone(ctx)

# Loop control variable
not_at_transition_point = True
while not_at_transition_point:
f = getattr(ctx, f_name)
f()
# Checkpoint the function
self.__delete_agg_attrs_from_clone(ctx, "Private attributes: Not Available.")
self.__delete_private_attrs_from_clone(ctx, "Private attributes: Not Available.")
self.call_checkpoint(ctx, f, f._stream_buffer)
self.__set_attributes_to_clone(ctx)
self.__set_private_attrs_to_clone(ctx)

_, f, parent_func = ctx.execute_task_args[:3]
# Display transition logs if transition
Expand All @@ -214,6 +216,6 @@ def do_task(self, f_name: str, ctx: Any) -> Tuple:
f_name = f.__name__

# Reomve private attributes from context
self.__delete_agg_attrs_from_clone(ctx)
self.__delete_private_attrs_from_clone(ctx)

return f_name, ctx
41 changes: 8 additions & 33 deletions openfl/experimental/interface/fl_spec.py
teoparvanov marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -228,22 +228,6 @@ def restore_instance_snapshot(self, ctx: FLSpec, instance_snapshot: List[FLSpec]
if not hasattr(ctx, name):
setattr(ctx, name, attr)

def get_clones(self, kwargs):
"""Create, and prepare clones."""
FLSpec._reset_clones()
FLSpec._create_clones(self, self.runtime.collaborators)
selected_collaborators = self.__getattribute__(kwargs["foreach"])

for col in selected_collaborators:
clone = FLSpec._clones[col]
clone.input = col
artifacts_iter, _ = generate_artifacts(ctx=clone)
attributes = artifacts_iter()
for name, attr in attributes:
setattr(clone, name, deepcopy(attr))
clone._foreach_methods = self._foreach_methods
clone._metaflow_interface = self._metaflow_interface

def next(self, f, **kwargs):
"""Specifies the next task in the flow to execute.

Expand All @@ -264,30 +248,21 @@ def next(self, f, **kwargs):
if aggregator_to_collaborator(f, parent_func):
agg_to_collab_ss = self._capture_instance_snapshot(kwargs=kwargs)

if str(self._runtime) == "FederatedRuntime":
if len(FLSpec._clones) == 0:
self.get_clones(kwargs)

# Remove included / excluded attributes from next task
filter_attributes(self, f, **kwargs)

if str(self._runtime) == "FederatedRuntime":
if f.collaborator_step and not f.aggregator_step:
self._foreach_methods.append(f.__name__)

if "foreach" in kwargs:
self.filter_exclude_include(f, **kwargs)
# if "foreach" in kwargs:
self.execute_task_args = (
self,
f,
parent_func,
FLSpec._clones,
agg_to_collab_ss,
kwargs,
)
else:
self.execute_task_args = (self, f, parent_func, kwargs)
self.execute_task_args = (
self,
f,
parent_func,
FLSpec._clones,
agg_to_collab_ss,
kwargs,
)
Comment on lines +258 to +265
Copy link
Collaborator

@teoparvanov teoparvanov Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we remove the private attribute filtering from here, then who is responsible for doing it, especially if we execute the FLSpec via the run() method, rather than through the export feature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Private attribute filtering is required while transitioning from aggregator to collab (and vice-versa). For LocalRuntime it is done inside Participants (openfl/experimental/interface/participants.py) and in FederatedRuntime it is done inside Component (openfl/experimental/component/aggregator/aggregator.py and openfl/experimental/component/collaborator/collaborator.py)

Copy link
Collaborator

@teoparvanov teoparvanov Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for the explanation, @refai06 - the distinction is indeed not obvious for the "uninitiated" reader...

As a follow-up PR, I'd suggest re-organizing a little bit the code, to make it easier to navigate and review. Right now, everything related to Workflow API resides directly under openfl/experimental - thus implicitly "disallowing" other experimental features. Instead, it may be better to have a dedicated openfl/experimental/workflow package for example. Additionally, under openfl/experimental/workflow, we could imagine separate sub-packages for local, federated and export for example.

WDYT?


elif str(self._runtime) == "LocalRuntime":
# update parameters required to execute execute_task function
Expand Down
133 changes: 45 additions & 88 deletions openfl/experimental/interface/participants.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,47 @@ def name(self, name: str):
"""
self._name = name

def get_name(self) -> str:
"""Gets the name of Participant (aggregator or collaborator).

Returns:
str: The name of the aggregator or collaborator.
"""
return self._name

def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) -> None:
"""Initialize private attributes of Participant (aggregator or collaborator)
by invoking the callable specified by user."""
if self.private_attributes_callable is not None:
self.private_attributes = self.private_attributes_callable(**self.kwargs)
elif private_attrs:
self.private_attributes = private_attrs

def _set_private_attrs_to_clone(self, clone: Any) -> None:
"""Set private attributes to FLSpec clone before
transitioning from aggregator to collaborator or vice versa

Args:
clone (Any): The clone to set attributes to.
"""
# set private attributes as clone attributes
for name, attr in self.private_attributes.items():
setattr(clone, name, attr)

def _delete_private_attrs_from_clone(self, clone: Any) -> None:
"""Remove private attributes from FLSpec clone before
transitioning from collaborator to aggregator or vice versa

Args:
clone (Any): The clone to remove attributes from.
"""
# Update private attributes by taking latest
# parameters from clone, then delete attributes from clone.
for attr_name in self.private_attributes:
if hasattr(clone, attr_name):
self.private_attributes.update({attr_name: getattr(clone, attr_name)})
delattr(clone, attr_name)

def private_attributes(self, attrs: Dict[str, Any]) -> None:
"""Set the private attributes of the participant. These attributes will
only be available within the tasks performed by the participants and
Expand Down Expand Up @@ -119,48 +160,6 @@ def __init__(
else:
self.private_attributes_callable = private_attributes_callable

def get_name(self) -> str:
"""Gets the name of the collaborator.

Returns:
str: The name of the collaborator.
"""
return self._name

def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) -> None:
"""Initialize private attributes of Collaborator object by invoking the
callable specified by user."""
if self.private_attributes_callable is not None:
self.private_attributes = self.private_attributes_callable(**self.kwargs)
elif private_attrs:
self.private_attributes = private_attrs

def __set_collaborator_attrs_to_clone(self, clone: Any) -> None:
"""Set collaborator private attributes to FLSpec clone before
transitioning from Aggregator step to collaborator steps.

Args:
clone (Any): The clone to set attributes to.
"""
# set collaborator private attributes as
# clone attributes
for name, attr in self.private_attributes.items():
setattr(clone, name, attr)

def __delete_collab_attrs_from_clone(self, clone: Any) -> None:
"""Remove collaborator private attributes from FLSpec clone before
transitioning from Collaborator step to Aggregator step.

Args:
clone (Any): The clone to remove attributes from.
"""
# Update collaborator private attributes by taking latest
# parameters from clone, then delete attributes from clone.
for attr_name in self.private_attributes:
if hasattr(clone, attr_name):
self.private_attributes.update({attr_name: getattr(clone, attr_name)})
delattr(clone, attr_name)

def execute_func(self, ctx: Any, f_name: str, callback: Callable) -> Any:
"""Execute remote function f.

Expand All @@ -172,11 +171,11 @@ def execute_func(self, ctx: Any, f_name: str, callback: Callable) -> Any:
Returns:
Any: The result of the function execution.
"""
self.__set_collaborator_attrs_to_clone(ctx)
self._set_private_attrs_to_clone(ctx)

callback(ctx, f_name)

self.__delete_collab_attrs_from_clone(ctx)
self._delete_private_attrs_from_clone(ctx)

return ctx

Expand Down Expand Up @@ -227,48 +226,6 @@ def __init__(
else:
self.private_attributes_callable = private_attributes_callable

def get_name(self) -> str:
"""Gets the name of the aggregator.

Returns:
str: The name of the aggregator.
"""
return self.name

def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) -> None:
"""Initialize private attributes of Aggregator object by invoking the
callable specified by user."""
if self.private_attributes_callable is not None:
self.private_attributes = self.private_attributes_callable(**self.kwargs)
elif private_attrs:
self.private_attributes = private_attrs

def __set_agg_attrs_to_clone(self, clone: Any) -> None:
"""Set aggregator private attributes to FLSpec clone before transition
from Aggregator step to collaborator steps.

Args:
clone (Any): The clone to set attributes to.
"""
# set aggregator private attributes as
# clone attributes
for name, attr in self.private_attributes.items():
setattr(clone, name, attr)

def __delete_agg_attrs_from_clone(self, clone: Any) -> None:
"""Remove aggregator private attributes from FLSpec clone before
transition from Aggregator step to collaborator steps.

Args:
clone (Any): The clone to remove attributes from.
"""
# Update aggregator private attributes by taking latest
# parameters from clone, then delete attributes from clone.
for attr_name in self.private_attributes:
if hasattr(clone, attr_name):
self.private_attributes.update({attr_name: getattr(clone, attr_name)})
delattr(clone, attr_name)

def execute_func(
self, ctx: Any, f_name: str, callback: Callable, clones: Optional[Any] = None
) -> Any:
Expand All @@ -284,13 +241,13 @@ def execute_func(
Returns:
Any: The result of the function execution.
"""
self.__set_agg_attrs_to_clone(ctx)
self._set_private_attrs_to_clone(ctx)

if clones is not None:
callback(ctx, f_name, clones)
else:
callback(ctx, f_name)

self.__delete_agg_attrs_from_clone(ctx)
self._delete_private_attrs_from_clone(ctx)

return ctx
Loading
Loading