diff --git a/openfl/experimental/component/aggregator/aggregator.py b/openfl/experimental/component/aggregator/aggregator.py index 40a94b5148..bba3969008 100644 --- a/openfl/experimental/component/aggregator/aggregator.py +++ b/openfl/experimental/component/aggregator/aggregator.py @@ -108,13 +108,13 @@ def __initialize_private_attributes(self, kwargs: Dict) -> None: self.__private_attrs.""" self.__private_attrs = self.__private_attrs_callable(**kwargs) - def __set_attributes_to_clone(self, clone: Any) -> None: + def __set_private_attrs_to_clone(self, clone: Any) -> None: """Set private_attrs to clone as attributes.""" if len(self.__private_attrs) > 0: for name, attr in self.__private_attrs.items(): setattr(clone, name, attr) - def __delete_agg_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None: + def __delete_private_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None: """ Remove aggregator private attributes from FLSpec clone before transition from Aggregator step to collaborator steps. @@ -301,7 +301,7 @@ def do_task(self, f_name: str) -> Any: string / None: Next collaborator function or None end of the flow. """ # Set aggregator private attributes to flow object - self.__set_attributes_to_clone(self.flow) + self.__set_private_attrs_to_clone(self.flow) not_at_transition_point = True # Run a loop to execute flow steps until not_at_transition_point @@ -314,9 +314,11 @@ def do_task(self, f_name: str) -> Any: if f.__name__ == "end": f() # Take the checkpoint of "end" step - self.__delete_agg_attrs_from_clone(self.flow, "Private attributes: Not Available.") + self.__delete_private_attrs_from_clone( + self.flow, "Private attributes: Not Available." + ) self.call_checkpoint(self.flow, f) - self.__set_attributes_to_clone(self.flow) + self.__set_private_attrs_to_clone(self.flow) # Check if all rounds of external loop is executed if self.current_round is self.rounds_to_train: # All rounds execute, it is time to quit @@ -351,10 +353,10 @@ def do_task(self, f_name: str) -> Any: # clones are arguments f(*selected_clones) - self.__delete_agg_attrs_from_clone(self.flow, "Private attributes: Not Available.") + self.__delete_private_attrs_from_clone(self.flow, "Private attributes: Not Available.") # Take the checkpoint of executed step self.call_checkpoint(self.flow, f) - self.__set_attributes_to_clone(self.flow) + self.__set_private_attrs_to_clone(self.flow) # Next function in the flow _, f, parent_func = self.flow.execute_task_args[:3] @@ -364,21 +366,20 @@ def do_task(self, f_name: str) -> Any: # Transition check if aggregator_to_collaborator(f, parent_func): - # Delete aggregator private attribute from flow object - self.__delete_agg_attrs_from_clone(self.flow) - # Unpack execute_task_args - clones_dict, instance snapshot and kwargs - self.clones_dict, self.instance_snapshot, self.kwargs = self.flow.execute_task_args[ - 3: - ] - if "foreach" in self.kwargs: - self.flow.filter_exclude_include(f, **self.kwargs) - self.selected_collaborators = getattr(self.flow, self.kwargs["foreach"]) - else: - self.kwargs = self.flow.execute_task_args[-1] - # Transition encountered, break the loop not_at_transition_point = False + # Delete aggregator private attribute from flow object + self.__delete_private_attrs_from_clone(self.flow) + + # Unpack execute_task_args - clones_dict, instance snapshot and kwargs + self.clones_dict, self.instance_snapshot, self.kwargs = self.flow.execute_task_args[3:] + if "foreach" in self.kwargs: + self.flow.filter_exclude_include(f, **self.kwargs) + self.selected_collaborators = getattr(self.flow, self.kwargs["foreach"]) + else: + self.kwargs = self.flow.execute_task_args[-1] + return f_name if f_name != "end" else None def send_task_results( diff --git a/openfl/experimental/component/collaborator/collaborator.py b/openfl/experimental/component/collaborator/collaborator.py index 0a41ea5fe4..527d7ee940 100644 --- a/openfl/experimental/component/collaborator/collaborator.py +++ b/openfl/experimental/component/collaborator/collaborator.py @@ -66,7 +66,7 @@ def __initialize_private_attributes(self, kwargs: Dict) -> None: """ self.__private_attrs = self.__private_attrs_callable(**kwargs) - def __set_attributes_to_clone(self, clone: Any) -> None: + def __set_private_attrs_to_clone(self, clone: Any) -> None: """Set private_attrs to clone as attributes. Args: @@ -80,7 +80,7 @@ def __set_attributes_to_clone(self, clone: Any) -> None: for name, attr in self.__private_attrs.items(): setattr(clone, name, attr) - def __delete_agg_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None: + def __delete_private_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None: """ Remove aggregator private attributes from FLSpec clone before transition from Aggregator step to collaborator steps @@ -190,7 +190,7 @@ def do_task(self, f_name: str, ctx: Any) -> Tuple: Tuple(str, FLSpec): Next aggregator function, and updated context. """ # Set private attributes to context - self.__set_attributes_to_clone(ctx) + self.__set_private_attrs_to_clone(ctx) # Loop control variable not_at_transition_point = True @@ -198,9 +198,9 @@ def do_task(self, f_name: str, ctx: Any) -> Tuple: f = getattr(ctx, f_name) f() # Checkpoint the function - self.__delete_agg_attrs_from_clone(ctx, "Private attributes: Not Available.") + self.__delete_private_attrs_from_clone(ctx, "Private attributes: Not Available.") self.call_checkpoint(ctx, f, f._stream_buffer) - self.__set_attributes_to_clone(ctx) + self.__set_private_attrs_to_clone(ctx) _, f, parent_func = ctx.execute_task_args[:3] # Display transition logs if transition @@ -214,6 +214,6 @@ def do_task(self, f_name: str, ctx: Any) -> Tuple: f_name = f.__name__ # Reomve private attributes from context - self.__delete_agg_attrs_from_clone(ctx) + self.__delete_private_attrs_from_clone(ctx) return f_name, ctx diff --git a/openfl/experimental/interface/participants.py b/openfl/experimental/interface/participants.py index c2f2173bf0..9881cc017f 100644 --- a/openfl/experimental/interface/participants.py +++ b/openfl/experimental/interface/participants.py @@ -42,6 +42,47 @@ def name(self, name: str): """ self._name = name + def get_name(self) -> str: + """Gets the name of the aggregator/collaborator. + + Returns: + str: The name of the aggregator/collaborator. + """ + return self._name + + def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) -> None: + """Initialize private attributes of aggregator/collaborator by invoking the + callable specified by user.""" + if self.private_attributes_callable is not None: + self.private_attributes = self.private_attributes_callable(**self.kwargs) + elif private_attrs: + self.private_attributes = private_attrs + + def _set_private_attrs_to_clone(self, clone: Any) -> None: + """Set private attributes to FLSpec clone before + transitioning from Aggregator step to collaborator steps. + + Args: + clone (Any): The clone to set attributes to. + """ + # set private attributes as clone attributes + for name, attr in self.private_attributes.items(): + setattr(clone, name, attr) + + def _delete_private_attrs_from_clone(self, clone: Any) -> None: + """Remove private attributes from FLSpec clone before + transition + + Args: + clone (Any): The clone to remove attributes from. + """ + # Update private attributes by taking latest + # parameters from clone, then delete attributes from clone. + for attr_name in self.private_attributes: + if hasattr(clone, attr_name): + self.private_attributes.update({attr_name: getattr(clone, attr_name)}) + delattr(clone, attr_name) + def private_attributes(self, attrs: Dict[str, Any]) -> None: """Set the private attributes of the participant. These attributes will only be available within the tasks performed by the participants and @@ -119,48 +160,6 @@ def __init__( else: self.private_attributes_callable = private_attributes_callable - def get_name(self) -> str: - """Gets the name of the collaborator. - - Returns: - str: The name of the collaborator. - """ - return self._name - - def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) -> None: - """Initialize private attributes of Collaborator object by invoking the - callable specified by user.""" - if self.private_attributes_callable is not None: - self.private_attributes = self.private_attributes_callable(**self.kwargs) - elif private_attrs: - self.private_attributes = private_attrs - - def __set_collaborator_attrs_to_clone(self, clone: Any) -> None: - """Set collaborator private attributes to FLSpec clone before - transitioning from Aggregator step to collaborator steps. - - Args: - clone (Any): The clone to set attributes to. - """ - # set collaborator private attributes as - # clone attributes - for name, attr in self.private_attributes.items(): - setattr(clone, name, attr) - - def __delete_collab_attrs_from_clone(self, clone: Any) -> None: - """Remove collaborator private attributes from FLSpec clone before - transitioning from Collaborator step to Aggregator step. - - Args: - clone (Any): The clone to remove attributes from. - """ - # Update collaborator private attributes by taking latest - # parameters from clone, then delete attributes from clone. - for attr_name in self.private_attributes: - if hasattr(clone, attr_name): - self.private_attributes.update({attr_name: getattr(clone, attr_name)}) - delattr(clone, attr_name) - def execute_func(self, ctx: Any, f_name: str, callback: Callable) -> Any: """Execute remote function f. @@ -172,11 +171,11 @@ def execute_func(self, ctx: Any, f_name: str, callback: Callable) -> Any: Returns: Any: The result of the function execution. """ - self.__set_collaborator_attrs_to_clone(ctx) + self._set_private_attrs_to_clone(ctx) callback(ctx, f_name) - self.__delete_collab_attrs_from_clone(ctx) + self._delete_private_attrs_from_clone(ctx) return ctx @@ -227,48 +226,6 @@ def __init__( else: self.private_attributes_callable = private_attributes_callable - def get_name(self) -> str: - """Gets the name of the aggregator. - - Returns: - str: The name of the aggregator. - """ - return self.name - - def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) -> None: - """Initialize private attributes of Aggregator object by invoking the - callable specified by user.""" - if self.private_attributes_callable is not None: - self.private_attributes = self.private_attributes_callable(**self.kwargs) - elif private_attrs: - self.private_attributes = private_attrs - - def __set_agg_attrs_to_clone(self, clone: Any) -> None: - """Set aggregator private attributes to FLSpec clone before transition - from Aggregator step to collaborator steps. - - Args: - clone (Any): The clone to set attributes to. - """ - # set aggregator private attributes as - # clone attributes - for name, attr in self.private_attributes.items(): - setattr(clone, name, attr) - - def __delete_agg_attrs_from_clone(self, clone: Any) -> None: - """Remove aggregator private attributes from FLSpec clone before - transition from Aggregator step to collaborator steps. - - Args: - clone (Any): The clone to remove attributes from. - """ - # Update aggregator private attributes by taking latest - # parameters from clone, then delete attributes from clone. - for attr_name in self.private_attributes: - if hasattr(clone, attr_name): - self.private_attributes.update({attr_name: getattr(clone, attr_name)}) - delattr(clone, attr_name) - def execute_func( self, ctx: Any, f_name: str, callback: Callable, clones: Optional[Any] = None ) -> Any: @@ -284,13 +241,13 @@ def execute_func( Returns: Any: The result of the function execution. """ - self.__set_agg_attrs_to_clone(ctx) + self._set_private_attrs_to_clone(ctx) if clones is not None: callback(ctx, f_name, clones) else: callback(ctx, f_name) - self.__delete_agg_attrs_from_clone(ctx) + self._delete_private_attrs_from_clone(ctx) return ctx