Skip to content

Commit

Permalink
Experimental Aggregator Workflow: code update and Localruntime:Partic…
Browse files Browse the repository at this point in the history
…ipant refactor

Signed-off-by: refai06 <[email protected]>
  • Loading branch information
refai06 committed Oct 16, 2024
1 parent abdd735 commit 8323ed9
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 113 deletions.
39 changes: 20 additions & 19 deletions openfl/experimental/component/aggregator/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ def __initialize_private_attributes(self, kwargs: Dict) -> None:
self.__private_attrs."""
self.__private_attrs = self.__private_attrs_callable(**kwargs)

def __set_attributes_to_clone(self, clone: Any) -> None:
def __set_private_attrs_to_clone(self, clone: Any) -> None:
"""Set private_attrs to clone as attributes."""
if len(self.__private_attrs) > 0:
for name, attr in self.__private_attrs.items():
setattr(clone, name, attr)

def __delete_agg_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None:
def __delete_private_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None:
"""
Remove aggregator private attributes from FLSpec clone before
transition from Aggregator step to collaborator steps.
Expand Down Expand Up @@ -301,7 +301,7 @@ def do_task(self, f_name: str) -> Any:
string / None: Next collaborator function or None end of the flow.
"""
# Set aggregator private attributes to flow object
self.__set_attributes_to_clone(self.flow)
self.__set_private_attrs_to_clone(self.flow)

not_at_transition_point = True
# Run a loop to execute flow steps until not_at_transition_point
Expand All @@ -314,9 +314,11 @@ def do_task(self, f_name: str) -> Any:
if f.__name__ == "end":
f()
# Take the checkpoint of "end" step
self.__delete_agg_attrs_from_clone(self.flow, "Private attributes: Not Available.")
self.__delete_private_attrs_from_clone(
self.flow, "Private attributes: Not Available."
)
self.call_checkpoint(self.flow, f)
self.__set_attributes_to_clone(self.flow)
self.__set_private_attrs_to_clone(self.flow)
# Check if all rounds of external loop is executed
if self.current_round is self.rounds_to_train:
# All rounds execute, it is time to quit
Expand Down Expand Up @@ -351,10 +353,10 @@ def do_task(self, f_name: str) -> Any:
# clones are arguments
f(*selected_clones)

self.__delete_agg_attrs_from_clone(self.flow, "Private attributes: Not Available.")
self.__delete_private_attrs_from_clone(self.flow, "Private attributes: Not Available.")
# Take the checkpoint of executed step
self.call_checkpoint(self.flow, f)
self.__set_attributes_to_clone(self.flow)
self.__set_private_attrs_to_clone(self.flow)

# Next function in the flow
_, f, parent_func = self.flow.execute_task_args[:3]
Expand All @@ -364,21 +366,20 @@ def do_task(self, f_name: str) -> Any:

# Transition check
if aggregator_to_collaborator(f, parent_func):
# Delete aggregator private attribute from flow object
self.__delete_agg_attrs_from_clone(self.flow)
# Unpack execute_task_args - clones_dict, instance snapshot and kwargs
self.clones_dict, self.instance_snapshot, self.kwargs = self.flow.execute_task_args[
3:
]
if "foreach" in self.kwargs:
self.flow.filter_exclude_include(f, **self.kwargs)
self.selected_collaborators = getattr(self.flow, self.kwargs["foreach"])
else:
self.kwargs = self.flow.execute_task_args[-1]

# Transition encountered, break the loop
not_at_transition_point = False

# Delete aggregator private attribute from flow object
self.__delete_private_attrs_from_clone(self.flow)

# Unpack execute_task_args - clones_dict, instance snapshot and kwargs
self.clones_dict, self.instance_snapshot, self.kwargs = self.flow.execute_task_args[3:]
if "foreach" in self.kwargs:
self.flow.filter_exclude_include(f, **self.kwargs)
self.selected_collaborators = getattr(self.flow, self.kwargs["foreach"])
else:
self.kwargs = self.flow.execute_task_args[-1]

return f_name if f_name != "end" else None

def send_task_results(
Expand Down
12 changes: 6 additions & 6 deletions openfl/experimental/component/collaborator/collaborator.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __initialize_private_attributes(self, kwargs: Dict) -> None:
"""
self.__private_attrs = self.__private_attrs_callable(**kwargs)

def __set_attributes_to_clone(self, clone: Any) -> None:
def __set_private_attrs_to_clone(self, clone: Any) -> None:
"""Set private_attrs to clone as attributes.
Args:
Expand All @@ -80,7 +80,7 @@ def __set_attributes_to_clone(self, clone: Any) -> None:
for name, attr in self.__private_attrs.items():
setattr(clone, name, attr)

def __delete_agg_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None:
def __delete_private_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None:
"""
Remove aggregator private attributes from FLSpec clone before
transition from Aggregator step to collaborator steps
Expand Down Expand Up @@ -190,17 +190,17 @@ def do_task(self, f_name: str, ctx: Any) -> Tuple:
Tuple(str, FLSpec): Next aggregator function, and updated context.
"""
# Set private attributes to context
self.__set_attributes_to_clone(ctx)
self.__set_private_attrs_to_clone(ctx)

# Loop control variable
not_at_transition_point = True
while not_at_transition_point:
f = getattr(ctx, f_name)
f()
# Checkpoint the function
self.__delete_agg_attrs_from_clone(ctx, "Private attributes: Not Available.")
self.__delete_private_attrs_from_clone(ctx, "Private attributes: Not Available.")
self.call_checkpoint(ctx, f, f._stream_buffer)
self.__set_attributes_to_clone(ctx)
self.__set_private_attrs_to_clone(ctx)

_, f, parent_func = ctx.execute_task_args[:3]
# Display transition logs if transition
Expand All @@ -214,6 +214,6 @@ def do_task(self, f_name: str, ctx: Any) -> Tuple:
f_name = f.__name__

# Reomve private attributes from context
self.__delete_agg_attrs_from_clone(ctx)
self.__delete_private_attrs_from_clone(ctx)

return f_name, ctx
133 changes: 45 additions & 88 deletions openfl/experimental/interface/participants.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,47 @@ def name(self, name: str):
"""
self._name = name

def get_name(self) -> str:
"""Gets the name of the aggregator/collaborator.
Returns:
str: The name of the aggregator/collaborator.
"""
return self._name

def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) -> None:
"""Initialize private attributes of aggregator/collaborator by invoking the
callable specified by user."""
if self.private_attributes_callable is not None:
self.private_attributes = self.private_attributes_callable(**self.kwargs)
elif private_attrs:
self.private_attributes = private_attrs

def _set_private_attrs_to_clone(self, clone: Any) -> None:
"""Set private attributes to FLSpec clone before
transitioning from Aggregator step to collaborator steps.
Args:
clone (Any): The clone to set attributes to.
"""
# set private attributes as clone attributes
for name, attr in self.private_attributes.items():
setattr(clone, name, attr)

def _delete_private_attrs_from_clone(self, clone: Any) -> None:
"""Remove private attributes from FLSpec clone before
transition
Args:
clone (Any): The clone to remove attributes from.
"""
# Update private attributes by taking latest
# parameters from clone, then delete attributes from clone.
for attr_name in self.private_attributes:
if hasattr(clone, attr_name):
self.private_attributes.update({attr_name: getattr(clone, attr_name)})
delattr(clone, attr_name)

def private_attributes(self, attrs: Dict[str, Any]) -> None:
"""Set the private attributes of the participant. These attributes will
only be available within the tasks performed by the participants and
Expand Down Expand Up @@ -119,48 +160,6 @@ def __init__(
else:
self.private_attributes_callable = private_attributes_callable

def get_name(self) -> str:
"""Gets the name of the collaborator.
Returns:
str: The name of the collaborator.
"""
return self._name

def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) -> None:
"""Initialize private attributes of Collaborator object by invoking the
callable specified by user."""
if self.private_attributes_callable is not None:
self.private_attributes = self.private_attributes_callable(**self.kwargs)
elif private_attrs:
self.private_attributes = private_attrs

def __set_collaborator_attrs_to_clone(self, clone: Any) -> None:
"""Set collaborator private attributes to FLSpec clone before
transitioning from Aggregator step to collaborator steps.
Args:
clone (Any): The clone to set attributes to.
"""
# set collaborator private attributes as
# clone attributes
for name, attr in self.private_attributes.items():
setattr(clone, name, attr)

def __delete_collab_attrs_from_clone(self, clone: Any) -> None:
"""Remove collaborator private attributes from FLSpec clone before
transitioning from Collaborator step to Aggregator step.
Args:
clone (Any): The clone to remove attributes from.
"""
# Update collaborator private attributes by taking latest
# parameters from clone, then delete attributes from clone.
for attr_name in self.private_attributes:
if hasattr(clone, attr_name):
self.private_attributes.update({attr_name: getattr(clone, attr_name)})
delattr(clone, attr_name)

def execute_func(self, ctx: Any, f_name: str, callback: Callable) -> Any:
"""Execute remote function f.
Expand All @@ -172,11 +171,11 @@ def execute_func(self, ctx: Any, f_name: str, callback: Callable) -> Any:
Returns:
Any: The result of the function execution.
"""
self.__set_collaborator_attrs_to_clone(ctx)
self._set_private_attrs_to_clone(ctx)

callback(ctx, f_name)

self.__delete_collab_attrs_from_clone(ctx)
self._delete_private_attrs_from_clone(ctx)

return ctx

Expand Down Expand Up @@ -227,48 +226,6 @@ def __init__(
else:
self.private_attributes_callable = private_attributes_callable

def get_name(self) -> str:
"""Gets the name of the aggregator.
Returns:
str: The name of the aggregator.
"""
return self.name

def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) -> None:
"""Initialize private attributes of Aggregator object by invoking the
callable specified by user."""
if self.private_attributes_callable is not None:
self.private_attributes = self.private_attributes_callable(**self.kwargs)
elif private_attrs:
self.private_attributes = private_attrs

def __set_agg_attrs_to_clone(self, clone: Any) -> None:
"""Set aggregator private attributes to FLSpec clone before transition
from Aggregator step to collaborator steps.
Args:
clone (Any): The clone to set attributes to.
"""
# set aggregator private attributes as
# clone attributes
for name, attr in self.private_attributes.items():
setattr(clone, name, attr)

def __delete_agg_attrs_from_clone(self, clone: Any) -> None:
"""Remove aggregator private attributes from FLSpec clone before
transition from Aggregator step to collaborator steps.
Args:
clone (Any): The clone to remove attributes from.
"""
# Update aggregator private attributes by taking latest
# parameters from clone, then delete attributes from clone.
for attr_name in self.private_attributes:
if hasattr(clone, attr_name):
self.private_attributes.update({attr_name: getattr(clone, attr_name)})
delattr(clone, attr_name)

def execute_func(
self, ctx: Any, f_name: str, callback: Callable, clones: Optional[Any] = None
) -> Any:
Expand All @@ -284,13 +241,13 @@ def execute_func(
Returns:
Any: The result of the function execution.
"""
self.__set_agg_attrs_to_clone(ctx)
self._set_private_attrs_to_clone(ctx)

if clones is not None:
callback(ctx, f_name, clones)
else:
callback(ctx, f_name)

self.__delete_agg_attrs_from_clone(ctx)
self._delete_private_attrs_from_clone(ctx)

return ctx

0 comments on commit 8323ed9

Please sign in to comment.