Skip to content

Commit

Permalink
Merge branch 'fix-Filesystem-path-manipulation-issue' of https://gith…
Browse files Browse the repository at this point in the history
…ub.com/rajithkrishnegowda/openfl into fix-Filesystem-path-manipulation-issue
  • Loading branch information
rajithkrishnegowda committed Nov 4, 2024
2 parents d09d08d + 71c6d2d commit 7a61439
Show file tree
Hide file tree
Showing 14 changed files with 322 additions and 227 deletions.
50 changes: 27 additions & 23 deletions openfl/experimental/component/aggregator/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,18 @@ def __initialize_private_attributes(self, kwargs: Dict) -> None:
self.__private_attrs."""
self.__private_attrs = self.__private_attrs_callable(**kwargs)

def __set_attributes_to_clone(self, clone: Any) -> None:
"""Set private_attrs to clone as attributes."""
def __set_private_attrs_to_clone(self, clone: Any) -> None:
"""Set private_attrs of Aggregator as attributes of FLSpec clone"""
if len(self.__private_attrs) > 0:
for name, attr in self.__private_attrs.items():
setattr(clone, name, attr)

def __delete_agg_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None:
def __delete_private_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None:
"""
Remove aggregator private attributes from FLSpec clone before
transition from Aggregator step to collaborator steps.
Remove aggregator private attributes from FLSpec clone
before transition from aggregator step to collaborator steps.
Instead of removing private attributes this method can also replace
private attributes with a string (required in checkpointing)
"""
# Update aggregator private attributes by taking latest
# parameters from clone, then delete attributes from clone.
Expand Down Expand Up @@ -152,6 +154,9 @@ def run_flow(self) -> None:
"""Start the execution and run flow until transition."""
# Start function will be the first step if any flow
f_name = "start"
# Creating a clones from the flow object
FLSpec._reset_clones()
FLSpec._create_clones(self.flow, self.flow.runtime.collaborators)

self.logger.info(f"Starting round {self.current_round}...")
while True:
Expand Down Expand Up @@ -298,7 +303,7 @@ def do_task(self, f_name: str) -> Any:
string / None: Next collaborator function or None end of the flow.
"""
# Set aggregator private attributes to flow object
self.__set_attributes_to_clone(self.flow)
self.__set_private_attrs_to_clone(self.flow)

not_at_transition_point = True
# Run a loop to execute flow steps until not_at_transition_point
Expand All @@ -311,9 +316,11 @@ def do_task(self, f_name: str) -> Any:
if f.__name__ == "end":
f()
# Take the checkpoint of "end" step
self.__delete_agg_attrs_from_clone(self.flow, "Private attributes: Not Available.")
self.__delete_private_attrs_from_clone(
self.flow, "Private attributes: Not Available."
)
self.call_checkpoint(self.flow, f)
self.__set_attributes_to_clone(self.flow)
self.__set_private_attrs_to_clone(self.flow)
# Check if all rounds of external loop is executed
if self.current_round is self.rounds_to_train:
# All rounds execute, it is time to quit
Expand Down Expand Up @@ -348,34 +355,31 @@ def do_task(self, f_name: str) -> Any:
# clones are arguments
f(*selected_clones)

self.__delete_agg_attrs_from_clone(self.flow, "Private attributes: Not Available.")
self.__delete_private_attrs_from_clone(self.flow, "Private attributes: Not Available.")
# Take the checkpoint of executed step
self.call_checkpoint(self.flow, f)
self.__set_attributes_to_clone(self.flow)
self.__set_private_attrs_to_clone(self.flow)

# Next function in the flow
_, f, parent_func = self.flow.execute_task_args[:3]
f_name = f.__name__

self.flow._display_transition_logs(f, parent_func)

# Transition check
if aggregator_to_collaborator(f, parent_func):
# Extract clones, instance snapshot and kwargs when reached
# foreach loop first time
if len(self.flow.execute_task_args) > 4:
temp = self.flow.execute_task_args[3:]
self.clones_dict, self.instance_snapshot, self.kwargs = temp

self.selected_collaborators = getattr(self.flow, self.kwargs["foreach"])
else:
self.kwargs = self.flow.execute_task_args[3]

# Transition encountered, break the loop
not_at_transition_point = False

# Delete private attributes from flow object
self.__delete_agg_attrs_from_clone(self.flow)
# Delete aggregator private attribute from flow object
self.__delete_private_attrs_from_clone(self.flow)

# Unpack execute_task_args - clones_dict, instance snapshot and kwargs
self.clones_dict, self.instance_snapshot, self.kwargs = self.flow.execute_task_args[3:]
if "foreach" in self.kwargs:
self.flow.filter_exclude_include(f, **self.kwargs)
self.selected_collaborators = getattr(self.flow, self.kwargs["foreach"])
else:
self.kwargs = self.flow.execute_task_args[-1]

return f_name if f_name != "end" else None

Expand Down
20 changes: 11 additions & 9 deletions openfl/experimental/component/collaborator/collaborator.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ def __initialize_private_attributes(self, kwargs: Dict) -> None:
"""
self.__private_attrs = self.__private_attrs_callable(**kwargs)

def __set_attributes_to_clone(self, clone: Any) -> None:
"""Set private_attrs to clone as attributes.
def __set_private_attrs_to_clone(self, clone: Any) -> None:
"""Set private_attrs of Collaborator as attributes of FLSpec clone.
Args:
clone (FLSpec): Clone to which private attributes are to be
Expand All @@ -80,10 +80,12 @@ def __set_attributes_to_clone(self, clone: Any) -> None:
for name, attr in self.__private_attrs.items():
setattr(clone, name, attr)

def __delete_agg_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None:
def __delete_private_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None:
"""
Remove aggregator private attributes from FLSpec clone before
transition from Aggregator step to collaborator steps
Remove collaborator private attributes from FLSpec clone
before transition from Collaborator step to aggregator steps.
Instead of removing private attributes this method can also replace
private attributes with a string (required in checkpointing)
Args:
clone (FLSpec): Clone from which private attributes are to be
Expand Down Expand Up @@ -190,17 +192,17 @@ def do_task(self, f_name: str, ctx: Any) -> Tuple:
Tuple(str, FLSpec): Next aggregator function, and updated context.
"""
# Set private attributes to context
self.__set_attributes_to_clone(ctx)
self.__set_private_attrs_to_clone(ctx)

# Loop control variable
not_at_transition_point = True
while not_at_transition_point:
f = getattr(ctx, f_name)
f()
# Checkpoint the function
self.__delete_agg_attrs_from_clone(ctx, "Private attributes: Not Available.")
self.__delete_private_attrs_from_clone(ctx, "Private attributes: Not Available.")
self.call_checkpoint(ctx, f, f._stream_buffer)
self.__set_attributes_to_clone(ctx)
self.__set_private_attrs_to_clone(ctx)

_, f, parent_func = ctx.execute_task_args[:3]
# Display transition logs if transition
Expand All @@ -214,6 +216,6 @@ def do_task(self, f_name: str, ctx: Any) -> Tuple:
f_name = f.__name__

# Reomve private attributes from context
self.__delete_agg_attrs_from_clone(ctx)
self.__delete_private_attrs_from_clone(ctx)

return f_name, ctx
41 changes: 8 additions & 33 deletions openfl/experimental/interface/fl_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,22 +228,6 @@ def restore_instance_snapshot(self, ctx: FLSpec, instance_snapshot: List[FLSpec]
if not hasattr(ctx, name):
setattr(ctx, name, attr)

def get_clones(self, kwargs):
"""Create, and prepare clones."""
FLSpec._reset_clones()
FLSpec._create_clones(self, self.runtime.collaborators)
selected_collaborators = self.__getattribute__(kwargs["foreach"])

for col in selected_collaborators:
clone = FLSpec._clones[col]
clone.input = col
artifacts_iter, _ = generate_artifacts(ctx=clone)
attributes = artifacts_iter()
for name, attr in attributes:
setattr(clone, name, deepcopy(attr))
clone._foreach_methods = self._foreach_methods
clone._metaflow_interface = self._metaflow_interface

def next(self, f, **kwargs):
"""Specifies the next task in the flow to execute.
Expand All @@ -264,30 +248,21 @@ def next(self, f, **kwargs):
if aggregator_to_collaborator(f, parent_func):
agg_to_collab_ss = self._capture_instance_snapshot(kwargs=kwargs)

if str(self._runtime) == "FederatedRuntime":
if len(FLSpec._clones) == 0:
self.get_clones(kwargs)

# Remove included / excluded attributes from next task
filter_attributes(self, f, **kwargs)

if str(self._runtime) == "FederatedRuntime":
if f.collaborator_step and not f.aggregator_step:
self._foreach_methods.append(f.__name__)

if "foreach" in kwargs:
self.filter_exclude_include(f, **kwargs)
# if "foreach" in kwargs:
self.execute_task_args = (
self,
f,
parent_func,
FLSpec._clones,
agg_to_collab_ss,
kwargs,
)
else:
self.execute_task_args = (self, f, parent_func, kwargs)
self.execute_task_args = (
self,
f,
parent_func,
FLSpec._clones,
agg_to_collab_ss,
kwargs,
)

elif str(self._runtime) == "LocalRuntime":
# update parameters required to execute execute_task function
Expand Down
133 changes: 45 additions & 88 deletions openfl/experimental/interface/participants.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,47 @@ def name(self, name: str):
"""
self._name = name

def get_name(self) -> str:
"""Gets the name of Participant (aggregator or collaborator).
Returns:
str: The name of the aggregator or collaborator.
"""
return self._name

def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) -> None:
"""Initialize private attributes of Participant (aggregator or collaborator)
by invoking the callable specified by user."""
if self.private_attributes_callable is not None:
self.private_attributes = self.private_attributes_callable(**self.kwargs)
elif private_attrs:
self.private_attributes = private_attrs

def _set_private_attrs_to_clone(self, clone: Any) -> None:
"""Set private attributes to FLSpec clone before
transitioning from aggregator to collaborator or vice versa
Args:
clone (Any): The clone to set attributes to.
"""
# set private attributes as clone attributes
for name, attr in self.private_attributes.items():
setattr(clone, name, attr)

def _delete_private_attrs_from_clone(self, clone: Any) -> None:
"""Remove private attributes from FLSpec clone before
transitioning from collaborator to aggregator or vice versa
Args:
clone (Any): The clone to remove attributes from.
"""
# Update private attributes by taking latest
# parameters from clone, then delete attributes from clone.
for attr_name in self.private_attributes:
if hasattr(clone, attr_name):
self.private_attributes.update({attr_name: getattr(clone, attr_name)})
delattr(clone, attr_name)

def private_attributes(self, attrs: Dict[str, Any]) -> None:
"""Set the private attributes of the participant. These attributes will
only be available within the tasks performed by the participants and
Expand Down Expand Up @@ -119,48 +160,6 @@ def __init__(
else:
self.private_attributes_callable = private_attributes_callable

def get_name(self) -> str:
"""Gets the name of the collaborator.
Returns:
str: The name of the collaborator.
"""
return self._name

def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) -> None:
"""Initialize private attributes of Collaborator object by invoking the
callable specified by user."""
if self.private_attributes_callable is not None:
self.private_attributes = self.private_attributes_callable(**self.kwargs)
elif private_attrs:
self.private_attributes = private_attrs

def __set_collaborator_attrs_to_clone(self, clone: Any) -> None:
"""Set collaborator private attributes to FLSpec clone before
transitioning from Aggregator step to collaborator steps.
Args:
clone (Any): The clone to set attributes to.
"""
# set collaborator private attributes as
# clone attributes
for name, attr in self.private_attributes.items():
setattr(clone, name, attr)

def __delete_collab_attrs_from_clone(self, clone: Any) -> None:
"""Remove collaborator private attributes from FLSpec clone before
transitioning from Collaborator step to Aggregator step.
Args:
clone (Any): The clone to remove attributes from.
"""
# Update collaborator private attributes by taking latest
# parameters from clone, then delete attributes from clone.
for attr_name in self.private_attributes:
if hasattr(clone, attr_name):
self.private_attributes.update({attr_name: getattr(clone, attr_name)})
delattr(clone, attr_name)

def execute_func(self, ctx: Any, f_name: str, callback: Callable) -> Any:
"""Execute remote function f.
Expand All @@ -172,11 +171,11 @@ def execute_func(self, ctx: Any, f_name: str, callback: Callable) -> Any:
Returns:
Any: The result of the function execution.
"""
self.__set_collaborator_attrs_to_clone(ctx)
self._set_private_attrs_to_clone(ctx)

callback(ctx, f_name)

self.__delete_collab_attrs_from_clone(ctx)
self._delete_private_attrs_from_clone(ctx)

return ctx

Expand Down Expand Up @@ -227,48 +226,6 @@ def __init__(
else:
self.private_attributes_callable = private_attributes_callable

def get_name(self) -> str:
"""Gets the name of the aggregator.
Returns:
str: The name of the aggregator.
"""
return self.name

def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) -> None:
"""Initialize private attributes of Aggregator object by invoking the
callable specified by user."""
if self.private_attributes_callable is not None:
self.private_attributes = self.private_attributes_callable(**self.kwargs)
elif private_attrs:
self.private_attributes = private_attrs

def __set_agg_attrs_to_clone(self, clone: Any) -> None:
"""Set aggregator private attributes to FLSpec clone before transition
from Aggregator step to collaborator steps.
Args:
clone (Any): The clone to set attributes to.
"""
# set aggregator private attributes as
# clone attributes
for name, attr in self.private_attributes.items():
setattr(clone, name, attr)

def __delete_agg_attrs_from_clone(self, clone: Any) -> None:
"""Remove aggregator private attributes from FLSpec clone before
transition from Aggregator step to collaborator steps.
Args:
clone (Any): The clone to remove attributes from.
"""
# Update aggregator private attributes by taking latest
# parameters from clone, then delete attributes from clone.
for attr_name in self.private_attributes:
if hasattr(clone, attr_name):
self.private_attributes.update({attr_name: getattr(clone, attr_name)})
delattr(clone, attr_name)

def execute_func(
self, ctx: Any, f_name: str, callback: Callable, clones: Optional[Any] = None
) -> Any:
Expand All @@ -284,13 +241,13 @@ def execute_func(
Returns:
Any: The result of the function execution.
"""
self.__set_agg_attrs_to_clone(ctx)
self._set_private_attrs_to_clone(ctx)

if clones is not None:
callback(ctx, f_name, clones)
else:
callback(ctx, f_name)

self.__delete_agg_attrs_from_clone(ctx)
self._delete_private_attrs_from_clone(ctx)

return ctx
Loading

0 comments on commit 7a61439

Please sign in to comment.