From 544cdd296770c6edd8decb3fe4c4baaab95ee48e Mon Sep 17 00:00:00 2001 From: refai06 Date: Fri, 27 Sep 2024 13:40:52 +0530 Subject: [PATCH 1/9] Experimental Aggregator Workflow: Fixed private attributes issue and enhanced tcs Signed-off-by: refai06 --- .../component/aggregator/aggregator.py | 15 ++-- openfl/experimental/interface/fl_spec.py | 26 ++---- .../src/aggregator_private_attrs.py | 2 +- .../src/testflow_privateattributes.py | 79 +++++++++++++++--- .../src/aggregator_private_attrs.py | 4 +- .../src/testflow_privateattributes.py | 79 +++++++++++++++--- .../src/aggregator_private_attrs.py | 2 +- .../src/testflow_privateattributes.py | 80 ++++++++++++++++--- 8 files changed, 220 insertions(+), 67 deletions(-) diff --git a/openfl/experimental/component/aggregator/aggregator.py b/openfl/experimental/component/aggregator/aggregator.py index 74ca91cba0..76a26e6fb7 100644 --- a/openfl/experimental/component/aggregator/aggregator.py +++ b/openfl/experimental/component/aggregator/aggregator.py @@ -153,6 +153,8 @@ def run_flow(self) -> None: """Start the execution and run flow until transition.""" # Start function will be the first step if any flow f_name = "start" + FLSpec._reset_clones() + FLSpec._create_clones(self.flow, self.flow.runtime.collaborators) self.logger.info(f"Starting round {self.current_round}...") while True: @@ -365,13 +367,16 @@ def do_task(self, f_name: str) -> Any: if aggregator_to_collaborator(f, parent_func): # Extract clones, instance snapshot and kwargs when reached # foreach loop first time - if len(self.flow.execute_task_args) > 4: - temp = self.flow.execute_task_args[3:] - self.clones_dict, self.instance_snapshot, self.kwargs = temp - + self.__delete_agg_attrs_from_clone(self.flow) + # Unpack execute_task_args + _, f, parent_func, self.instance_snapshot, self.kwargs = self.flow.execute_task_args + self.flow._foreach_methods.append(f.__name__) + if "foreach" in self.kwargs: + self.flow.filter_exclude_include(f, **self.kwargs) + self.clones_dict = FLSpec._clones self.selected_collaborators = getattr(self.flow, self.kwargs["foreach"]) else: - self.kwargs = self.flow.execute_task_args[3] + self.kwargs = self.flow.execute_task_args[4] # Transition encountered, break the loop not_at_transition_point = False diff --git a/openfl/experimental/interface/fl_spec.py b/openfl/experimental/interface/fl_spec.py index da1debaedc..05847c5038 100644 --- a/openfl/experimental/interface/fl_spec.py +++ b/openfl/experimental/interface/fl_spec.py @@ -264,30 +264,18 @@ def next(self, f, **kwargs): if aggregator_to_collaborator(f, parent_func): agg_to_collab_ss = self._capture_instance_snapshot(kwargs=kwargs) - if str(self._runtime) == "FederatedRuntime": - if len(FLSpec._clones) == 0: - self.get_clones(kwargs) - # Remove included / excluded attributes from next task filter_attributes(self, f, **kwargs) if str(self._runtime) == "FederatedRuntime": - if f.collaborator_step and not f.aggregator_step: - self._foreach_methods.append(f.__name__) - if "foreach" in kwargs: - self.filter_exclude_include(f, **kwargs) - # if "foreach" in kwargs: - self.execute_task_args = ( - self, - f, - parent_func, - FLSpec._clones, - agg_to_collab_ss, - kwargs, - ) - else: - self.execute_task_args = (self, f, parent_func, kwargs) + self.execute_task_args = ( + self, + f, + parent_func, + agg_to_collab_ss, + kwargs, + ) elif str(self._runtime) == "LocalRuntime": # update parameters required to execute execute_task function diff --git a/tests/github/experimental/workspace/testcase_private_attributes/src/aggregator_private_attrs.py b/tests/github/experimental/workspace/testcase_private_attributes/src/aggregator_private_attrs.py index 8e5756f71c..389609db20 100644 --- a/tests/github/experimental/workspace/testcase_private_attributes/src/aggregator_private_attrs.py +++ b/tests/github/experimental/workspace/testcase_private_attributes/src/aggregator_private_attrs.py @@ -4,4 +4,4 @@ def aggregator_private_attrs(): - return {"test_loader": np.random.rand(10, 28, 28)} # Random data + return {"test_loader_agg": np.random.rand(10, 28, 28)} # Random data diff --git a/tests/github/experimental/workspace/testcase_private_attributes/src/testflow_privateattributes.py b/tests/github/experimental/workspace/testcase_private_attributes/src/testflow_privateattributes.py index 3f19ed71c7..fce9256440 100644 --- a/tests/github/experimental/workspace/testcase_private_attributes/src/testflow_privateattributes.py +++ b/tests/github/experimental/workspace/testcase_private_attributes/src/testflow_privateattributes.py @@ -37,7 +37,7 @@ def start(self): ) self.collaborators = self.runtime.collaborators - validate_collab_private_attr(self, "test_loader", "start") + validate_agg_private_attr(self, "start", Aggr = ["test_loader_agg"], Collabs =["train_loader", "test_loader"]) self.exclude_agg_to_agg = 10 self.include_agg_to_agg = 100 @@ -49,7 +49,7 @@ def aggregator_step(self): Testing whether Agg private attributes are accessible in next agg step. Collab private attributes should not be accessible here """ - validate_collab_private_attr(self, "test_loader", "aggregator_step") + validate_agg_private_attr(self, "aggregator_step", Aggr = ["test_loader_agg"], Collabs =["train_loader", "test_loader"]) self.include_agg_to_collab = 42 self.exclude_agg_to_collab = 40 @@ -65,8 +65,8 @@ def collaborator_step_a(self): Testing whether Collab private attributes are accessible in collab step Aggregator private attributes should not be accessible here """ - validate_agg_private_attrs( - self, "train_loader", "test_loader", "collaborator_step_a" + validate_collab_private_attrs( + self, "collaborator_step_a", Aggr = ["test_loader_agg"], Collabs =["train_loader", "test_loader"] ) self.exclude_collab_to_collab = 2 @@ -80,8 +80,8 @@ def collaborator_step_b(self): Aggregator private attributes should not be accessible here """ - validate_agg_private_attrs( - self, "train_loader", "test_loader", "collaborator_step_b" + validate_collab_private_attrs( + self, "collaborator_step_b", Aggr = ["test_loader_agg"], Collabs =["train_loader", "test_loader"] ) self.exclude_collab_to_agg = 10 self.include_collab_to_agg = 12 @@ -93,7 +93,7 @@ def join(self, inputs): Testing whether attributes are excluded from collab to agg """ # Aggregator should only be able to access its own attributes - if hasattr(self, "test_loader") is False: + if hasattr(self, "test_loader_agg") is False: TestFlowPrivateAttributes.ERROR_LIST.append( "aggregator_join_aggregator_attributes_missing" ) @@ -143,19 +143,45 @@ def end(self): TestFlowPrivateAttributes.ERROR_LIST = [] -def validate_collab_private_attr(self, private_attr, step_name): +def validate_agg_private_attr(self, step_name, **private_attrs_kwargs): + + """ + Validate that aggregator can only access their own attributes + + :param step_name: Name of the step being validated + :param private_attr: Keyword argument with 'Collaborator' and 'Aggregator' as keys + and their repespective private attributes as values + """ + agg_attrs = private_attrs_kwargs.get('Aggr',[]) + collab_attrs = private_attrs_kwargs.get('Collabs', []) # Aggregator should only be able to access its own attributes - if hasattr(self, private_attr) is False: + + # check for missing aggregator attributes + inaccessible_agg_attrs = [attr for attr in agg_attrs if not hasattr(self, attr)] + if inaccessible_agg_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( - step_name + "_aggregator_attributes_missing" + step_name + "aggregator_attributes_missing" ) print( f"{bcolors.FAIL} ...Failed in {step_name} - aggregator private attributes not " + f"accessible {bcolors.ENDC}" ) + # check for collaborator private attributes that should not be accessible + breached_collab_attrs = [attr for attr in collab_attrs if hasattr(self,attr) is True] + if breached_collab_attrs: + TestFlowPrivateAttributes.ERROR_LIST.append( + step_name + " collaborator_attributes_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - collaborator" + + f"private attributes accessible:{','.join(breached_collab_attrs)} {bcolors.ENDC}" + ) + + + for idx, collab in enumerate(self.collaborators): - # Collaborator private attributes should not be accessible + # Collaborator attributes should not be accessible in aggregator step if ( type(self.collaborators[idx]) is not str or hasattr(self.runtime, "_collaborators") is True @@ -171,9 +197,24 @@ def validate_collab_private_attr(self, private_attr, step_name): ) -def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name): +def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): + + """ + Validate that collaborators can only access their own attributes + + :param step_name: Name of the step being validated + :param private_attr: Keyword argument with 'Collaborator' and 'Aggregator' as keys + and their repespective private attributes as values + """ + agg_attrs = private_attrs_kwargs.get('Aggr',[]) + collab_attrs = private_attrs_kwargs.get('Collabs', []) + # Collaborator should only be able to access its own attributes - if not hasattr(self, private_attr_1) or not hasattr(self, private_attr_2): + + # check for missing collaborators attributes + inaccessible_collab_attrs = [attr for attr in collab_attrs if not hasattr(self,attr)] + + if inaccessible_collab_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( step_name + "collab_attributes_not_found" ) @@ -181,7 +222,19 @@ def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name): f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Collab " + f"private attributes not accessible {bcolors.ENDC}" ) + # check for aggregator private attributes that should not be accessible + breached_agg_attr = [attr for attr in agg_attrs if hasattr(self, attr) is True] + if breached_agg_attr: + TestFlowPrivateAttributes.ERROR_LIST.append( + step_name + " aggregator_attributes_found" + ) + + print( + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Aggregator" + + f" private attributes accessible: {','.join(breached_agg_attr)} {bcolors.ENDC}" + ) + # Aggregator attributes should not be accessible in collaborator step if hasattr(self.runtime, "_aggregator") and isinstance(self.runtime._aggregator, Aggregator): # Error - we are able to access aggregator attributes TestFlowPrivateAttributes.ERROR_LIST.append( diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/aggregator_private_attrs.py b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/aggregator_private_attrs.py index c8ed45d384..1272423239 100644 --- a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/aggregator_private_attrs.py +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/aggregator_private_attrs.py @@ -4,7 +4,7 @@ def aggregator_private_attrs(): - return {"test_loader_via_callable": np.random.rand(10, 28, 28)} # Random data + return {"test_loader_agg_via_callable": np.random.rand(10, 28, 28)} # Random data -aggregator_private_attributes = {"test_loader": np.random.rand(10, 28, 28)} # Random data +aggregator_private_attributes = {"test_loader_agg": np.random.rand(10, 28, 28)} # Random data diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/testflow_privateattributes.py b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/testflow_privateattributes.py index 579ba2820e..056b6d0768 100644 --- a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/testflow_privateattributes.py +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/testflow_privateattributes.py @@ -37,7 +37,7 @@ def start(self): ) self.collaborators = self.runtime.collaborators - validate_collab_private_attr(self, "test_loader_via_callable", "start") + validate_agg_private_attr(self,"start", Aggr = ["test_loader_agg_via_callable"], Collabs = ["train_loader_via_callable", "test_loader_via_callable"]) self.exclude_agg_to_agg = 10 self.include_agg_to_agg = 100 @@ -49,7 +49,7 @@ def aggregator_step(self): Testing whether Agg private attributes are accessible in next agg step. Collab private attributes should not be accessible here """ - validate_collab_private_attr(self, "test_loader_via_callable", "aggregator_step") + validate_agg_private_attr(self, "aggregator_step", Aggr = ["test_loader_agg_via_callable"], Collabs = ["train_loader_via_callable", "test_loader_via_callable"]) self.include_agg_to_collab = 42 self.exclude_agg_to_collab = 40 @@ -65,8 +65,8 @@ def collaborator_step_a(self): Testing whether Collab private attributes are accessible in collab step Aggregator private attributes should not be accessible here """ - validate_agg_private_attrs( - self, "train_loader_via_callable", "test_loader_via_callable", "collaborator_step_a" + validate_collab_private_attrs( + self, "collaborator_step_a", Aggr = ["test_loader_agg_via_callable"], Collabs = ["train_loader_via_callable", "test_loader_via_callable"] ) self.exclude_collab_to_collab = 2 @@ -80,8 +80,8 @@ def collaborator_step_b(self): Aggregator private attributes should not be accessible here """ - validate_agg_private_attrs( - self, "train_loader_via_callable", "test_loader_via_callable", "collaborator_step_b" + validate_collab_private_attrs( + self, "collaborator_step_b", Aggr = ["test_loader_agg_via_callable"], Collabs = ["train_loader_via_callable", "test_loader_via_callable"] ) self.exclude_collab_to_agg = 10 self.include_collab_to_agg = 12 @@ -93,7 +93,7 @@ def join(self, inputs): Testing whether attributes are excluded from collab to agg """ # Aggregator should only be able to access its own attributes - if hasattr(self, "test_loader_via_callable") is False: + if hasattr(self, "test_loader_agg_via_callable") is False: TestFlowPrivateAttributes.ERROR_LIST.append( "aggregator_join_aggregator_attributes_missing" ) @@ -143,19 +143,45 @@ def end(self): TestFlowPrivateAttributes.ERROR_LIST = [] -def validate_collab_private_attr(self, private_attr, step_name): +def validate_agg_private_attr(self, step_name, **private_attrs_kwargs): + + """ + Validate that aggregator can only access their own attributes + + :param step_name: Name of the step being validated + :param private_attr: Keyword argument with 'Collaborator' and 'Aggregator' as keys + and their repespective private attributes as values + """ + agg_attrs = private_attrs_kwargs.get('Aggr',[]) + collab_attrs = private_attrs_kwargs.get('Collabs', []) # Aggregator should only be able to access its own attributes - if hasattr(self, private_attr) is False: + + # check for missing aggregator attributes + inaccessible_agg_attrs = [attr for attr in agg_attrs if not hasattr(self, attr)] + if inaccessible_agg_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( - step_name + "_aggregator_attributes_missing" + step_name + "aggregator_attributes_missing" ) print( f"{bcolors.FAIL} ...Failed in {step_name} - aggregator private attributes not " + f"accessible {bcolors.ENDC}" ) + # check for collaborator private attributes that should not be accessible + breached_collab_attrs = [attr for attr in collab_attrs if hasattr(self,attr) is True] + if breached_collab_attrs: + TestFlowPrivateAttributes.ERROR_LIST.append( + step_name + " collaborator_attributes_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - collaborator" + + f"private attributes accessible:{','.join(breached_collab_attrs)} {bcolors.ENDC}" + ) + + + for idx, collab in enumerate(self.collaborators): - # Collaborator private attributes should not be accessible + # Collaborator attributes should not be accessible in aggregator step if ( type(self.collaborators[idx]) is not str or hasattr(self.runtime, "_collaborators") is True @@ -171,9 +197,24 @@ def validate_collab_private_attr(self, private_attr, step_name): ) -def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name): +def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): + + """ + Validate that collaborators can only access their own attributes + + :param step_name: Name of the step being validated + :param private_attr: Keyword argument with 'Collaborator' and 'Aggregator' as keys + and their repespective private attributes as values + """ + agg_attrs = private_attrs_kwargs.get('Aggr',[]) + collab_attrs = private_attrs_kwargs.get('Collabs', []) + # Collaborator should only be able to access its own attributes - if not hasattr(self, private_attr_1) or not hasattr(self, private_attr_2): + + # check for missing collaborators attributes + inaccessible_collab_attrs = [attr for attr in collab_attrs if not hasattr(self,attr)] + + if inaccessible_collab_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( step_name + "collab_attributes_not_found" ) @@ -181,7 +222,19 @@ def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name): f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Collab " + f"private attributes not accessible {bcolors.ENDC}" ) + # check for aggregator private attributes that should not be accessible + breached_agg_attr = [attr for attr in agg_attrs if hasattr(self, attr) is True] + if breached_agg_attr: + TestFlowPrivateAttributes.ERROR_LIST.append( + step_name + " aggregator_attributes_found" + ) + + print( + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Aggregator" + + f" private attributes accessible: {','.join(breached_agg_attr)} {bcolors.ENDC}" + ) + # Aggregator attributes should not be accessible in collaborator step if hasattr(self.runtime, "_aggregator") and isinstance(self.runtime._aggregator, Aggregator): # Error - we are able to access aggregator attributes TestFlowPrivateAttributes.ERROR_LIST.append( diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/aggregator_private_attrs.py b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/aggregator_private_attrs.py index d04ce7f74a..6d7dc459ce 100644 --- a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/aggregator_private_attrs.py +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/aggregator_private_attrs.py @@ -2,4 +2,4 @@ # SPDX-License-Identifier: Apache-2.0 import numpy as np -aggregator_private_attributes = {"test_loader": np.random.rand(10, 28, 28)} # Random data +aggregator_private_attributes = {"test_loader_agg": np.random.rand(10, 28, 28)} # Random data diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/testflow_privateattributes.py b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/testflow_privateattributes.py index 3f19ed71c7..17b42a8594 100644 --- a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/testflow_privateattributes.py +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/testflow_privateattributes.py @@ -37,7 +37,7 @@ def start(self): ) self.collaborators = self.runtime.collaborators - validate_collab_private_attr(self, "test_loader", "start") + validate_agg_private_attr(self,"start", Aggr = ["test_loader_agg"], Collabs = ["train_loader", "test_loader"]) self.exclude_agg_to_agg = 10 self.include_agg_to_agg = 100 @@ -49,7 +49,7 @@ def aggregator_step(self): Testing whether Agg private attributes are accessible in next agg step. Collab private attributes should not be accessible here """ - validate_collab_private_attr(self, "test_loader", "aggregator_step") + validate_agg_private_attr(self,"aggregator_step", Aggr = ["test_loader_agg"], Collabs = ["train_loader", "test_loader"]) self.include_agg_to_collab = 42 self.exclude_agg_to_collab = 40 @@ -65,8 +65,8 @@ def collaborator_step_a(self): Testing whether Collab private attributes are accessible in collab step Aggregator private attributes should not be accessible here """ - validate_agg_private_attrs( - self, "train_loader", "test_loader", "collaborator_step_a" + validate_collab_private_attrs( + self, "collaborator_step_a", Aggr = ["test_loader_agg"], Collabs = ["train_loader", "test_loader"] ) self.exclude_collab_to_collab = 2 @@ -80,8 +80,8 @@ def collaborator_step_b(self): Aggregator private attributes should not be accessible here """ - validate_agg_private_attrs( - self, "train_loader", "test_loader", "collaborator_step_b" + validate_collab_private_attrs( + self, "collaborator_step_b", Aggr = ["test_loader_agg"], Collabs = ["train_loader", "test_loader"] ) self.exclude_collab_to_agg = 10 self.include_collab_to_agg = 12 @@ -93,7 +93,7 @@ def join(self, inputs): Testing whether attributes are excluded from collab to agg """ # Aggregator should only be able to access its own attributes - if hasattr(self, "test_loader") is False: + if hasattr(self, "test_loader_agg") is False: TestFlowPrivateAttributes.ERROR_LIST.append( "aggregator_join_aggregator_attributes_missing" ) @@ -143,19 +143,45 @@ def end(self): TestFlowPrivateAttributes.ERROR_LIST = [] -def validate_collab_private_attr(self, private_attr, step_name): +def validate_agg_private_attr(self, step_name, **private_attrs_kwargs): + + """ + Validate that aggregator can only access their own attributes + + :param step_name: Name of the step being validated + :param private_attr: Keyword argument with 'Collaborator' and 'Aggregator' as keys + and their repespective private attributes as values + """ + agg_attrs = private_attrs_kwargs.get('Aggr',[]) + collab_attrs = private_attrs_kwargs.get('Collabs', []) # Aggregator should only be able to access its own attributes - if hasattr(self, private_attr) is False: + + # check for missing aggregator attributes + inaccessible_agg_attrs = [attr for attr in agg_attrs if not hasattr(self, attr)] + if inaccessible_agg_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( - step_name + "_aggregator_attributes_missing" + step_name + "aggregator_attributes_missing" ) print( f"{bcolors.FAIL} ...Failed in {step_name} - aggregator private attributes not " + f"accessible {bcolors.ENDC}" ) + # check for collaborator private attributes that should not be accessible + breached_collab_attrs = [attr for attr in collab_attrs if hasattr(self,attr) is True] + if breached_collab_attrs: + TestFlowPrivateAttributes.ERROR_LIST.append( + step_name + " collaborator_attributes_found" + ) + print( + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - collaborator" + + f"private attributes accessible:{','.join(breached_collab_attrs)} {bcolors.ENDC}" + ) + + + for idx, collab in enumerate(self.collaborators): - # Collaborator private attributes should not be accessible + # Collaborator attributes should not be accessible in aggregator step if ( type(self.collaborators[idx]) is not str or hasattr(self.runtime, "_collaborators") is True @@ -171,9 +197,24 @@ def validate_collab_private_attr(self, private_attr, step_name): ) -def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name): +def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): + + """ + Validate that collaborators can only access their own attributes + + :param step_name: Name of the step being validated + :param private_attr: Keyword argument with 'Collaborator' and 'Aggregator' as keys + and their repespective private attributes as values + """ + agg_attrs = private_attrs_kwargs.get('Aggr',[]) + collab_attrs = private_attrs_kwargs.get('Collabs', []) + # Collaborator should only be able to access its own attributes - if not hasattr(self, private_attr_1) or not hasattr(self, private_attr_2): + + # check for missing collaborators attributes + inaccessible_collab_attrs = [attr for attr in collab_attrs if not hasattr(self,attr)] + + if inaccessible_collab_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( step_name + "collab_attributes_not_found" ) @@ -181,7 +222,19 @@ def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name): f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Collab " + f"private attributes not accessible {bcolors.ENDC}" ) + # check for aggregator private attributes that should not be accessible + breached_agg_attr = [attr for attr in agg_attrs if hasattr(self, attr) is True] + if breached_agg_attr: + TestFlowPrivateAttributes.ERROR_LIST.append( + step_name + " aggregator_attributes_found" + ) + + print( + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Aggregator" + + f" private attributes accessible: {','.join(breached_agg_attr)} {bcolors.ENDC}" + ) + # Aggregator attributes should not be accessible in collaborator step if hasattr(self.runtime, "_aggregator") and isinstance(self.runtime._aggregator, Aggregator): # Error - we are able to access aggregator attributes TestFlowPrivateAttributes.ERROR_LIST.append( @@ -191,3 +244,4 @@ def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name): f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Aggregator" + f" private attributes accessible {bcolors.ENDC}" ) + \ No newline at end of file From 49eac510a33b76e9fc484ea52a79266886a6f806 Mon Sep 17 00:00:00 2001 From: refai06 Date: Tue, 1 Oct 2024 10:20:18 +0530 Subject: [PATCH 2/9] Experimental Aggregator Workflow: Fixed private attributes issue and testcase enhanced Signed-off-by: refai06 --- .../component/aggregator/aggregator.py | 4 +- openfl/experimental/interface/fl_spec.py | 16 -------- .../src/testflow_privateattributes.py | 36 +++++++++--------- .../src/testflow_privateattributes.py | 36 +++++++++--------- .../src/testflow_privateattributes.py | 37 +++++++++---------- 5 files changed, 56 insertions(+), 73 deletions(-) diff --git a/openfl/experimental/component/aggregator/aggregator.py b/openfl/experimental/component/aggregator/aggregator.py index 76a26e6fb7..ca87e3c9e4 100644 --- a/openfl/experimental/component/aggregator/aggregator.py +++ b/openfl/experimental/component/aggregator/aggregator.py @@ -365,8 +365,8 @@ def do_task(self, f_name: str) -> Any: # Transition check if aggregator_to_collaborator(f, parent_func): - # Extract clones, instance snapshot and kwargs when reached - # foreach loop first time + # Delete aggregator private attribute from flow object, instance snapshot and kwargs + # get clones_dict from flow object self.__delete_agg_attrs_from_clone(self.flow) # Unpack execute_task_args _, f, parent_func, self.instance_snapshot, self.kwargs = self.flow.execute_task_args diff --git a/openfl/experimental/interface/fl_spec.py b/openfl/experimental/interface/fl_spec.py index 05847c5038..41389d6d91 100644 --- a/openfl/experimental/interface/fl_spec.py +++ b/openfl/experimental/interface/fl_spec.py @@ -228,22 +228,6 @@ def restore_instance_snapshot(self, ctx: FLSpec, instance_snapshot: List[FLSpec] if not hasattr(ctx, name): setattr(ctx, name, attr) - def get_clones(self, kwargs): - """Create, and prepare clones.""" - FLSpec._reset_clones() - FLSpec._create_clones(self, self.runtime.collaborators) - selected_collaborators = self.__getattribute__(kwargs["foreach"]) - - for col in selected_collaborators: - clone = FLSpec._clones[col] - clone.input = col - artifacts_iter, _ = generate_artifacts(ctx=clone) - attributes = artifacts_iter() - for name, attr in attributes: - setattr(clone, name, deepcopy(attr)) - clone._foreach_methods = self._foreach_methods - clone._metaflow_interface = self._metaflow_interface - def next(self, f, **kwargs): """Specifies the next task in the flow to execute. diff --git a/tests/github/experimental/workspace/testcase_private_attributes/src/testflow_privateattributes.py b/tests/github/experimental/workspace/testcase_private_attributes/src/testflow_privateattributes.py index fce9256440..782259b3d2 100644 --- a/tests/github/experimental/workspace/testcase_private_attributes/src/testflow_privateattributes.py +++ b/tests/github/experimental/workspace/testcase_private_attributes/src/testflow_privateattributes.py @@ -37,7 +37,7 @@ def start(self): ) self.collaborators = self.runtime.collaborators - validate_agg_private_attr(self, "start", Aggr = ["test_loader_agg"], Collabs =["train_loader", "test_loader"]) + validate_agg_private_attr(self, "start", aggr = ["test_loader_agg"], collabs =["train_loader", "test_loader"]) self.exclude_agg_to_agg = 10 self.include_agg_to_agg = 100 @@ -49,7 +49,7 @@ def aggregator_step(self): Testing whether Agg private attributes are accessible in next agg step. Collab private attributes should not be accessible here """ - validate_agg_private_attr(self, "aggregator_step", Aggr = ["test_loader_agg"], Collabs =["train_loader", "test_loader"]) + validate_agg_private_attr(self, "aggregator_step", aggr = ["test_loader_agg"], collabs =["train_loader", "test_loader"]) self.include_agg_to_collab = 42 self.exclude_agg_to_collab = 40 @@ -66,7 +66,7 @@ def collaborator_step_a(self): Aggregator private attributes should not be accessible here """ validate_collab_private_attrs( - self, "collaborator_step_a", Aggr = ["test_loader_agg"], Collabs =["train_loader", "test_loader"] + self, "collaborator_step_a", aggr = ["test_loader_agg"], collabs =["train_loader", "test_loader"] ) self.exclude_collab_to_collab = 2 @@ -81,7 +81,7 @@ def collaborator_step_b(self): """ validate_collab_private_attrs( - self, "collaborator_step_b", Aggr = ["test_loader_agg"], Collabs =["train_loader", "test_loader"] + self, "collaborator_step_b", aggr = ["test_loader_agg"], collabs =["train_loader", "test_loader"] ) self.exclude_collab_to_agg = 10 self.include_collab_to_agg = 12 @@ -148,19 +148,19 @@ def validate_agg_private_attr(self, step_name, **private_attrs_kwargs): """ Validate that aggregator can only access their own attributes - :param step_name: Name of the step being validated - :param private_attr: Keyword argument with 'Collaborator' and 'Aggregator' as keys - and their repespective private attributes as values + Args: + step_name: Name of the step being validated + private_attr_kwargs: Keyword arguments specifying the names of private attributes for the aggregator and collaborators. """ - agg_attrs = private_attrs_kwargs.get('Aggr',[]) - collab_attrs = private_attrs_kwargs.get('Collabs', []) + agg_attrs = private_attrs_kwargs.get('aggr',[]) + collab_attrs = private_attrs_kwargs.get('collabs', []) # Aggregator should only be able to access its own attributes # check for missing aggregator attributes inaccessible_agg_attrs = [attr for attr in agg_attrs if not hasattr(self, attr)] if inaccessible_agg_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( - step_name + "aggregator_attributes_missing" + step_name + " aggregator_attributes_missing" ) print( f"{bcolors.FAIL} ...Failed in {step_name} - aggregator private attributes not " @@ -171,7 +171,7 @@ def validate_agg_private_attr(self, step_name, **private_attrs_kwargs): breached_collab_attrs = [attr for attr in collab_attrs if hasattr(self,attr) is True] if breached_collab_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( - step_name + " collaborator_attributes_found" + step_name + "_collaborator_attributes_found" ) print( f"{bcolors.FAIL} ... Attribute test failed in {step_name} - collaborator" @@ -202,12 +202,12 @@ def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): """ Validate that collaborators can only access their own attributes - :param step_name: Name of the step being validated - :param private_attr: Keyword argument with 'Collaborator' and 'Aggregator' as keys - and their repespective private attributes as values + Args: + step_name: Name of the step being validated + private_attr_kwargs: Keyword arguments specifying the names of private attributes for the aggregator and collaborators. """ - agg_attrs = private_attrs_kwargs.get('Aggr',[]) - collab_attrs = private_attrs_kwargs.get('Collabs', []) + agg_attrs = private_attrs_kwargs.get('aggr',[]) + collab_attrs = private_attrs_kwargs.get('collabs', []) # Collaborator should only be able to access its own attributes @@ -216,7 +216,7 @@ def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): if inaccessible_collab_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( - step_name + "collab_attributes_not_found" + step_name + " collab_attributes_not_found" ) print( f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Collab " @@ -226,7 +226,7 @@ def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): breached_agg_attr = [attr for attr in agg_attrs if hasattr(self, attr) is True] if breached_agg_attr: TestFlowPrivateAttributes.ERROR_LIST.append( - step_name + " aggregator_attributes_found" + step_name + "_aggregator_attributes_found" ) print( diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/testflow_privateattributes.py b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/testflow_privateattributes.py index 056b6d0768..2ab7a8af87 100644 --- a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/testflow_privateattributes.py +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/testflow_privateattributes.py @@ -37,7 +37,7 @@ def start(self): ) self.collaborators = self.runtime.collaborators - validate_agg_private_attr(self,"start", Aggr = ["test_loader_agg_via_callable"], Collabs = ["train_loader_via_callable", "test_loader_via_callable"]) + validate_agg_private_attr(self,"start", aggr = ["test_loader_agg_via_callable"], collabs = ["train_loader_via_callable", "test_loader_via_callable"]) self.exclude_agg_to_agg = 10 self.include_agg_to_agg = 100 @@ -49,7 +49,7 @@ def aggregator_step(self): Testing whether Agg private attributes are accessible in next agg step. Collab private attributes should not be accessible here """ - validate_agg_private_attr(self, "aggregator_step", Aggr = ["test_loader_agg_via_callable"], Collabs = ["train_loader_via_callable", "test_loader_via_callable"]) + validate_agg_private_attr(self, "aggregator_step", aggr = ["test_loader_agg_via_callable"], collabs = ["train_loader_via_callable", "test_loader_via_callable"]) self.include_agg_to_collab = 42 self.exclude_agg_to_collab = 40 @@ -66,7 +66,7 @@ def collaborator_step_a(self): Aggregator private attributes should not be accessible here """ validate_collab_private_attrs( - self, "collaborator_step_a", Aggr = ["test_loader_agg_via_callable"], Collabs = ["train_loader_via_callable", "test_loader_via_callable"] + self, "collaborator_step_a", aggr = ["test_loader_agg_via_callable"], collabs = ["train_loader_via_callable", "test_loader_via_callable"] ) self.exclude_collab_to_collab = 2 @@ -81,7 +81,7 @@ def collaborator_step_b(self): """ validate_collab_private_attrs( - self, "collaborator_step_b", Aggr = ["test_loader_agg_via_callable"], Collabs = ["train_loader_via_callable", "test_loader_via_callable"] + self, "collaborator_step_b", aggr = ["test_loader_agg_via_callable"], collabs = ["train_loader_via_callable", "test_loader_via_callable"] ) self.exclude_collab_to_agg = 10 self.include_collab_to_agg = 12 @@ -148,19 +148,19 @@ def validate_agg_private_attr(self, step_name, **private_attrs_kwargs): """ Validate that aggregator can only access their own attributes - :param step_name: Name of the step being validated - :param private_attr: Keyword argument with 'Collaborator' and 'Aggregator' as keys - and their repespective private attributes as values + Args: + step_name: Name of the step being validated + private_attr_kwargs: Keyword arguments specifying the names of private attributes for the aggregator and collaborators. """ - agg_attrs = private_attrs_kwargs.get('Aggr',[]) - collab_attrs = private_attrs_kwargs.get('Collabs', []) + agg_attrs = private_attrs_kwargs.get('aggr',[]) + collab_attrs = private_attrs_kwargs.get('collabs', []) # Aggregator should only be able to access its own attributes # check for missing aggregator attributes inaccessible_agg_attrs = [attr for attr in agg_attrs if not hasattr(self, attr)] if inaccessible_agg_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( - step_name + "aggregator_attributes_missing" + step_name + " aggregator_attributes_missing" ) print( f"{bcolors.FAIL} ...Failed in {step_name} - aggregator private attributes not " @@ -171,7 +171,7 @@ def validate_agg_private_attr(self, step_name, **private_attrs_kwargs): breached_collab_attrs = [attr for attr in collab_attrs if hasattr(self,attr) is True] if breached_collab_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( - step_name + " collaborator_attributes_found" + step_name + "_collaborator_attributes_found" ) print( f"{bcolors.FAIL} ... Attribute test failed in {step_name} - collaborator" @@ -202,12 +202,12 @@ def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): """ Validate that collaborators can only access their own attributes - :param step_name: Name of the step being validated - :param private_attr: Keyword argument with 'Collaborator' and 'Aggregator' as keys - and their repespective private attributes as values + Args: + step_name: Name of the step being validated + private_attr_kwargs: Keyword arguments specifying the names of private attributes for the aggregator and collaborators. """ - agg_attrs = private_attrs_kwargs.get('Aggr',[]) - collab_attrs = private_attrs_kwargs.get('Collabs', []) + agg_attrs = private_attrs_kwargs.get('aggr',[]) + collab_attrs = private_attrs_kwargs.get('collabs', []) # Collaborator should only be able to access its own attributes @@ -216,7 +216,7 @@ def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): if inaccessible_collab_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( - step_name + "collab_attributes_not_found" + step_name + " collab_attributes_not_found" ) print( f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Collab " @@ -226,7 +226,7 @@ def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): breached_agg_attr = [attr for attr in agg_attrs if hasattr(self, attr) is True] if breached_agg_attr: TestFlowPrivateAttributes.ERROR_LIST.append( - step_name + " aggregator_attributes_found" + step_name + "_aggregator_attributes_found" ) print( diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/testflow_privateattributes.py b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/testflow_privateattributes.py index 17b42a8594..94dfdba64f 100644 --- a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/testflow_privateattributes.py +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/testflow_privateattributes.py @@ -37,7 +37,7 @@ def start(self): ) self.collaborators = self.runtime.collaborators - validate_agg_private_attr(self,"start", Aggr = ["test_loader_agg"], Collabs = ["train_loader", "test_loader"]) + validate_agg_private_attr(self,"start", aggr = ["test_loader_agg"], collabs = ["train_loader", "test_loader"]) self.exclude_agg_to_agg = 10 self.include_agg_to_agg = 100 @@ -49,7 +49,7 @@ def aggregator_step(self): Testing whether Agg private attributes are accessible in next agg step. Collab private attributes should not be accessible here """ - validate_agg_private_attr(self,"aggregator_step", Aggr = ["test_loader_agg"], Collabs = ["train_loader", "test_loader"]) + validate_agg_private_attr(self,"aggregator_step", aggr = ["test_loader_agg"], collabs = ["train_loader", "test_loader"]) self.include_agg_to_collab = 42 self.exclude_agg_to_collab = 40 @@ -66,7 +66,7 @@ def collaborator_step_a(self): Aggregator private attributes should not be accessible here """ validate_collab_private_attrs( - self, "collaborator_step_a", Aggr = ["test_loader_agg"], Collabs = ["train_loader", "test_loader"] + self, "collaborator_step_a", aggr = ["test_loader_agg"], collabs = ["train_loader", "test_loader"] ) self.exclude_collab_to_collab = 2 @@ -81,7 +81,7 @@ def collaborator_step_b(self): """ validate_collab_private_attrs( - self, "collaborator_step_b", Aggr = ["test_loader_agg"], Collabs = ["train_loader", "test_loader"] + self, "collaborator_step_b", aggr = ["test_loader_agg"], collabs = ["train_loader", "test_loader"] ) self.exclude_collab_to_agg = 10 self.include_collab_to_agg = 12 @@ -148,19 +148,19 @@ def validate_agg_private_attr(self, step_name, **private_attrs_kwargs): """ Validate that aggregator can only access their own attributes - :param step_name: Name of the step being validated - :param private_attr: Keyword argument with 'Collaborator' and 'Aggregator' as keys - and their repespective private attributes as values + Args: + step_name: Name of the step being validated + private_attr_kwargs: Keyword arguments specifying the names of private attributes for the aggregator and collaborators. """ - agg_attrs = private_attrs_kwargs.get('Aggr',[]) - collab_attrs = private_attrs_kwargs.get('Collabs', []) + agg_attrs = private_attrs_kwargs.get('aggr',[]) + collab_attrs = private_attrs_kwargs.get('collabs', []) # Aggregator should only be able to access its own attributes # check for missing aggregator attributes inaccessible_agg_attrs = [attr for attr in agg_attrs if not hasattr(self, attr)] if inaccessible_agg_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( - step_name + "aggregator_attributes_missing" + step_name + " aggregator_attributes_missing" ) print( f"{bcolors.FAIL} ...Failed in {step_name} - aggregator private attributes not " @@ -171,7 +171,7 @@ def validate_agg_private_attr(self, step_name, **private_attrs_kwargs): breached_collab_attrs = [attr for attr in collab_attrs if hasattr(self,attr) is True] if breached_collab_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( - step_name + " collaborator_attributes_found" + step_name + "_collaborator_attributes_found" ) print( f"{bcolors.FAIL} ... Attribute test failed in {step_name} - collaborator" @@ -201,13 +201,12 @@ def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): """ Validate that collaborators can only access their own attributes - - :param step_name: Name of the step being validated - :param private_attr: Keyword argument with 'Collaborator' and 'Aggregator' as keys - and their repespective private attributes as values + Args: + step_name: Name of the step being validated + private_attr_kwargs: Keyword arguments specifying the names of private attributes for the aggregator and collaborators. """ - agg_attrs = private_attrs_kwargs.get('Aggr',[]) - collab_attrs = private_attrs_kwargs.get('Collabs', []) + agg_attrs = private_attrs_kwargs.get('aggr',[]) + collab_attrs = private_attrs_kwargs.get('collabs', []) # Collaborator should only be able to access its own attributes @@ -216,7 +215,7 @@ def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): if inaccessible_collab_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( - step_name + "collab_attributes_not_found" + step_name + " collab_attributes_not_found" ) print( f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Collab " @@ -226,7 +225,7 @@ def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): breached_agg_attr = [attr for attr in agg_attrs if hasattr(self, attr) is True] if breached_agg_attr: TestFlowPrivateAttributes.ERROR_LIST.append( - step_name + " aggregator_attributes_found" + step_name + "_aggregator_attributes_found" ) print( From a827774ab928ccd1294edc399f6c3a144a6d634d Mon Sep 17 00:00:00 2001 From: refai06 Date: Fri, 4 Oct 2024 10:24:24 +0530 Subject: [PATCH 3/9] Experimental Aggregator Workflow: Updated the code and optimzed Signed-off-by: refai06 --- .../component/aggregator/aggregator.py | 15 ++++++--------- openfl/experimental/interface/fl_spec.py | 1 + 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/openfl/experimental/component/aggregator/aggregator.py b/openfl/experimental/component/aggregator/aggregator.py index ca87e3c9e4..43cb4f9fda 100644 --- a/openfl/experimental/component/aggregator/aggregator.py +++ b/openfl/experimental/component/aggregator/aggregator.py @@ -365,25 +365,22 @@ def do_task(self, f_name: str) -> Any: # Transition check if aggregator_to_collaborator(f, parent_func): - # Delete aggregator private attribute from flow object, instance snapshot and kwargs - # get clones_dict from flow object + # Delete aggregator private attribute from flow object self.__delete_agg_attrs_from_clone(self.flow) - # Unpack execute_task_args - _, f, parent_func, self.instance_snapshot, self.kwargs = self.flow.execute_task_args + # Unpack execute_task_args - clones_dict, instance snapshot and kwargs + self.clones_dict, self.instance_snapshot, self.kwargs = self.flow.execute_task_args[ + 3: + ] self.flow._foreach_methods.append(f.__name__) if "foreach" in self.kwargs: self.flow.filter_exclude_include(f, **self.kwargs) - self.clones_dict = FLSpec._clones self.selected_collaborators = getattr(self.flow, self.kwargs["foreach"]) else: - self.kwargs = self.flow.execute_task_args[4] + self.kwargs = self.flow.execute_task_args[-1] # Transition encountered, break the loop not_at_transition_point = False - # Delete private attributes from flow object - self.__delete_agg_attrs_from_clone(self.flow) - return f_name if f_name != "end" else None def send_task_results( diff --git a/openfl/experimental/interface/fl_spec.py b/openfl/experimental/interface/fl_spec.py index 41389d6d91..f2f50f4e6c 100644 --- a/openfl/experimental/interface/fl_spec.py +++ b/openfl/experimental/interface/fl_spec.py @@ -257,6 +257,7 @@ def next(self, f, **kwargs): self, f, parent_func, + FLSpec._clones, agg_to_collab_ss, kwargs, ) From 5d308cd69d960893e32db7eb88dc2c22fd8c7229 Mon Sep 17 00:00:00 2001 From: refai06 Date: Fri, 4 Oct 2024 11:33:02 +0530 Subject: [PATCH 4/9] Experimental Aggregator Workflow: Updated the code and optimzed Signed-off-by: refai06 --- openfl/experimental/component/aggregator/aggregator.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/openfl/experimental/component/aggregator/aggregator.py b/openfl/experimental/component/aggregator/aggregator.py index 43cb4f9fda..74cb3c51f1 100644 --- a/openfl/experimental/component/aggregator/aggregator.py +++ b/openfl/experimental/component/aggregator/aggregator.py @@ -368,9 +368,7 @@ def do_task(self, f_name: str) -> Any: # Delete aggregator private attribute from flow object self.__delete_agg_attrs_from_clone(self.flow) # Unpack execute_task_args - clones_dict, instance snapshot and kwargs - self.clones_dict, self.instance_snapshot, self.kwargs = self.flow.execute_task_args[ - 3: - ] + self.clones_dict, self.instance_snapshot, self.kwargs = self.flow.execute_task_args[3:] self.flow._foreach_methods.append(f.__name__) if "foreach" in self.kwargs: self.flow.filter_exclude_include(f, **self.kwargs) From 461247d7bfed69b9853a4c17cf5bcb7bc399b599 Mon Sep 17 00:00:00 2001 From: refai06 Date: Mon, 7 Oct 2024 18:32:05 +0530 Subject: [PATCH 5/9] Experimental Aggregator Workflow: Comments update Signed-off-by: refai06 --- openfl/experimental/component/aggregator/aggregator.py | 6 ++++-- openfl/experimental/interface/fl_spec.py | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/openfl/experimental/component/aggregator/aggregator.py b/openfl/experimental/component/aggregator/aggregator.py index 74cb3c51f1..e8113eca93 100644 --- a/openfl/experimental/component/aggregator/aggregator.py +++ b/openfl/experimental/component/aggregator/aggregator.py @@ -153,6 +153,7 @@ def run_flow(self) -> None: """Start the execution and run flow until transition.""" # Start function will be the first step if any flow f_name = "start" + # Creating a clones from the flow object FLSpec._reset_clones() FLSpec._create_clones(self.flow, self.flow.runtime.collaborators) @@ -368,8 +369,9 @@ def do_task(self, f_name: str) -> Any: # Delete aggregator private attribute from flow object self.__delete_agg_attrs_from_clone(self.flow) # Unpack execute_task_args - clones_dict, instance snapshot and kwargs - self.clones_dict, self.instance_snapshot, self.kwargs = self.flow.execute_task_args[3:] - self.flow._foreach_methods.append(f.__name__) + self.clones_dict, self.instance_snapshot, self.kwargs = self.flow.execute_task_args[ + 3: + ] if "foreach" in self.kwargs: self.flow.filter_exclude_include(f, **self.kwargs) self.selected_collaborators = getattr(self.flow, self.kwargs["foreach"]) diff --git a/openfl/experimental/interface/fl_spec.py b/openfl/experimental/interface/fl_spec.py index f2f50f4e6c..273b356402 100644 --- a/openfl/experimental/interface/fl_spec.py +++ b/openfl/experimental/interface/fl_spec.py @@ -252,6 +252,8 @@ def next(self, f, **kwargs): filter_attributes(self, f, **kwargs) if str(self._runtime) == "FederatedRuntime": + if f.collaborator_step and not f.aggregator_step: + self._foreach_methods.append(f.__name__) self.execute_task_args = ( self, From 8323ed9ab6689f5fb0ada562161b027fb0830748 Mon Sep 17 00:00:00 2001 From: refai06 Date: Thu, 17 Oct 2024 00:10:58 +0530 Subject: [PATCH 6/9] Experimental Aggregator Workflow: code update and Localruntime:Participant refactor Signed-off-by: refai06 --- .../component/aggregator/aggregator.py | 39 ++--- .../component/collaborator/collaborator.py | 12 +- openfl/experimental/interface/participants.py | 133 ++++++------------ 3 files changed, 71 insertions(+), 113 deletions(-) diff --git a/openfl/experimental/component/aggregator/aggregator.py b/openfl/experimental/component/aggregator/aggregator.py index 40a94b5148..bba3969008 100644 --- a/openfl/experimental/component/aggregator/aggregator.py +++ b/openfl/experimental/component/aggregator/aggregator.py @@ -108,13 +108,13 @@ def __initialize_private_attributes(self, kwargs: Dict) -> None: self.__private_attrs.""" self.__private_attrs = self.__private_attrs_callable(**kwargs) - def __set_attributes_to_clone(self, clone: Any) -> None: + def __set_private_attrs_to_clone(self, clone: Any) -> None: """Set private_attrs to clone as attributes.""" if len(self.__private_attrs) > 0: for name, attr in self.__private_attrs.items(): setattr(clone, name, attr) - def __delete_agg_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None: + def __delete_private_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None: """ Remove aggregator private attributes from FLSpec clone before transition from Aggregator step to collaborator steps. @@ -301,7 +301,7 @@ def do_task(self, f_name: str) -> Any: string / None: Next collaborator function or None end of the flow. """ # Set aggregator private attributes to flow object - self.__set_attributes_to_clone(self.flow) + self.__set_private_attrs_to_clone(self.flow) not_at_transition_point = True # Run a loop to execute flow steps until not_at_transition_point @@ -314,9 +314,11 @@ def do_task(self, f_name: str) -> Any: if f.__name__ == "end": f() # Take the checkpoint of "end" step - self.__delete_agg_attrs_from_clone(self.flow, "Private attributes: Not Available.") + self.__delete_private_attrs_from_clone( + self.flow, "Private attributes: Not Available." + ) self.call_checkpoint(self.flow, f) - self.__set_attributes_to_clone(self.flow) + self.__set_private_attrs_to_clone(self.flow) # Check if all rounds of external loop is executed if self.current_round is self.rounds_to_train: # All rounds execute, it is time to quit @@ -351,10 +353,10 @@ def do_task(self, f_name: str) -> Any: # clones are arguments f(*selected_clones) - self.__delete_agg_attrs_from_clone(self.flow, "Private attributes: Not Available.") + self.__delete_private_attrs_from_clone(self.flow, "Private attributes: Not Available.") # Take the checkpoint of executed step self.call_checkpoint(self.flow, f) - self.__set_attributes_to_clone(self.flow) + self.__set_private_attrs_to_clone(self.flow) # Next function in the flow _, f, parent_func = self.flow.execute_task_args[:3] @@ -364,21 +366,20 @@ def do_task(self, f_name: str) -> Any: # Transition check if aggregator_to_collaborator(f, parent_func): - # Delete aggregator private attribute from flow object - self.__delete_agg_attrs_from_clone(self.flow) - # Unpack execute_task_args - clones_dict, instance snapshot and kwargs - self.clones_dict, self.instance_snapshot, self.kwargs = self.flow.execute_task_args[ - 3: - ] - if "foreach" in self.kwargs: - self.flow.filter_exclude_include(f, **self.kwargs) - self.selected_collaborators = getattr(self.flow, self.kwargs["foreach"]) - else: - self.kwargs = self.flow.execute_task_args[-1] - # Transition encountered, break the loop not_at_transition_point = False + # Delete aggregator private attribute from flow object + self.__delete_private_attrs_from_clone(self.flow) + + # Unpack execute_task_args - clones_dict, instance snapshot and kwargs + self.clones_dict, self.instance_snapshot, self.kwargs = self.flow.execute_task_args[3:] + if "foreach" in self.kwargs: + self.flow.filter_exclude_include(f, **self.kwargs) + self.selected_collaborators = getattr(self.flow, self.kwargs["foreach"]) + else: + self.kwargs = self.flow.execute_task_args[-1] + return f_name if f_name != "end" else None def send_task_results( diff --git a/openfl/experimental/component/collaborator/collaborator.py b/openfl/experimental/component/collaborator/collaborator.py index 0a41ea5fe4..527d7ee940 100644 --- a/openfl/experimental/component/collaborator/collaborator.py +++ b/openfl/experimental/component/collaborator/collaborator.py @@ -66,7 +66,7 @@ def __initialize_private_attributes(self, kwargs: Dict) -> None: """ self.__private_attrs = self.__private_attrs_callable(**kwargs) - def __set_attributes_to_clone(self, clone: Any) -> None: + def __set_private_attrs_to_clone(self, clone: Any) -> None: """Set private_attrs to clone as attributes. Args: @@ -80,7 +80,7 @@ def __set_attributes_to_clone(self, clone: Any) -> None: for name, attr in self.__private_attrs.items(): setattr(clone, name, attr) - def __delete_agg_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None: + def __delete_private_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None: """ Remove aggregator private attributes from FLSpec clone before transition from Aggregator step to collaborator steps @@ -190,7 +190,7 @@ def do_task(self, f_name: str, ctx: Any) -> Tuple: Tuple(str, FLSpec): Next aggregator function, and updated context. """ # Set private attributes to context - self.__set_attributes_to_clone(ctx) + self.__set_private_attrs_to_clone(ctx) # Loop control variable not_at_transition_point = True @@ -198,9 +198,9 @@ def do_task(self, f_name: str, ctx: Any) -> Tuple: f = getattr(ctx, f_name) f() # Checkpoint the function - self.__delete_agg_attrs_from_clone(ctx, "Private attributes: Not Available.") + self.__delete_private_attrs_from_clone(ctx, "Private attributes: Not Available.") self.call_checkpoint(ctx, f, f._stream_buffer) - self.__set_attributes_to_clone(ctx) + self.__set_private_attrs_to_clone(ctx) _, f, parent_func = ctx.execute_task_args[:3] # Display transition logs if transition @@ -214,6 +214,6 @@ def do_task(self, f_name: str, ctx: Any) -> Tuple: f_name = f.__name__ # Reomve private attributes from context - self.__delete_agg_attrs_from_clone(ctx) + self.__delete_private_attrs_from_clone(ctx) return f_name, ctx diff --git a/openfl/experimental/interface/participants.py b/openfl/experimental/interface/participants.py index c2f2173bf0..9881cc017f 100644 --- a/openfl/experimental/interface/participants.py +++ b/openfl/experimental/interface/participants.py @@ -42,6 +42,47 @@ def name(self, name: str): """ self._name = name + def get_name(self) -> str: + """Gets the name of the aggregator/collaborator. + + Returns: + str: The name of the aggregator/collaborator. + """ + return self._name + + def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) -> None: + """Initialize private attributes of aggregator/collaborator by invoking the + callable specified by user.""" + if self.private_attributes_callable is not None: + self.private_attributes = self.private_attributes_callable(**self.kwargs) + elif private_attrs: + self.private_attributes = private_attrs + + def _set_private_attrs_to_clone(self, clone: Any) -> None: + """Set private attributes to FLSpec clone before + transitioning from Aggregator step to collaborator steps. + + Args: + clone (Any): The clone to set attributes to. + """ + # set private attributes as clone attributes + for name, attr in self.private_attributes.items(): + setattr(clone, name, attr) + + def _delete_private_attrs_from_clone(self, clone: Any) -> None: + """Remove private attributes from FLSpec clone before + transition + + Args: + clone (Any): The clone to remove attributes from. + """ + # Update private attributes by taking latest + # parameters from clone, then delete attributes from clone. + for attr_name in self.private_attributes: + if hasattr(clone, attr_name): + self.private_attributes.update({attr_name: getattr(clone, attr_name)}) + delattr(clone, attr_name) + def private_attributes(self, attrs: Dict[str, Any]) -> None: """Set the private attributes of the participant. These attributes will only be available within the tasks performed by the participants and @@ -119,48 +160,6 @@ def __init__( else: self.private_attributes_callable = private_attributes_callable - def get_name(self) -> str: - """Gets the name of the collaborator. - - Returns: - str: The name of the collaborator. - """ - return self._name - - def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) -> None: - """Initialize private attributes of Collaborator object by invoking the - callable specified by user.""" - if self.private_attributes_callable is not None: - self.private_attributes = self.private_attributes_callable(**self.kwargs) - elif private_attrs: - self.private_attributes = private_attrs - - def __set_collaborator_attrs_to_clone(self, clone: Any) -> None: - """Set collaborator private attributes to FLSpec clone before - transitioning from Aggregator step to collaborator steps. - - Args: - clone (Any): The clone to set attributes to. - """ - # set collaborator private attributes as - # clone attributes - for name, attr in self.private_attributes.items(): - setattr(clone, name, attr) - - def __delete_collab_attrs_from_clone(self, clone: Any) -> None: - """Remove collaborator private attributes from FLSpec clone before - transitioning from Collaborator step to Aggregator step. - - Args: - clone (Any): The clone to remove attributes from. - """ - # Update collaborator private attributes by taking latest - # parameters from clone, then delete attributes from clone. - for attr_name in self.private_attributes: - if hasattr(clone, attr_name): - self.private_attributes.update({attr_name: getattr(clone, attr_name)}) - delattr(clone, attr_name) - def execute_func(self, ctx: Any, f_name: str, callback: Callable) -> Any: """Execute remote function f. @@ -172,11 +171,11 @@ def execute_func(self, ctx: Any, f_name: str, callback: Callable) -> Any: Returns: Any: The result of the function execution. """ - self.__set_collaborator_attrs_to_clone(ctx) + self._set_private_attrs_to_clone(ctx) callback(ctx, f_name) - self.__delete_collab_attrs_from_clone(ctx) + self._delete_private_attrs_from_clone(ctx) return ctx @@ -227,48 +226,6 @@ def __init__( else: self.private_attributes_callable = private_attributes_callable - def get_name(self) -> str: - """Gets the name of the aggregator. - - Returns: - str: The name of the aggregator. - """ - return self.name - - def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) -> None: - """Initialize private attributes of Aggregator object by invoking the - callable specified by user.""" - if self.private_attributes_callable is not None: - self.private_attributes = self.private_attributes_callable(**self.kwargs) - elif private_attrs: - self.private_attributes = private_attrs - - def __set_agg_attrs_to_clone(self, clone: Any) -> None: - """Set aggregator private attributes to FLSpec clone before transition - from Aggregator step to collaborator steps. - - Args: - clone (Any): The clone to set attributes to. - """ - # set aggregator private attributes as - # clone attributes - for name, attr in self.private_attributes.items(): - setattr(clone, name, attr) - - def __delete_agg_attrs_from_clone(self, clone: Any) -> None: - """Remove aggregator private attributes from FLSpec clone before - transition from Aggregator step to collaborator steps. - - Args: - clone (Any): The clone to remove attributes from. - """ - # Update aggregator private attributes by taking latest - # parameters from clone, then delete attributes from clone. - for attr_name in self.private_attributes: - if hasattr(clone, attr_name): - self.private_attributes.update({attr_name: getattr(clone, attr_name)}) - delattr(clone, attr_name) - def execute_func( self, ctx: Any, f_name: str, callback: Callable, clones: Optional[Any] = None ) -> Any: @@ -284,13 +241,13 @@ def execute_func( Returns: Any: The result of the function execution. """ - self.__set_agg_attrs_to_clone(ctx) + self._set_private_attrs_to_clone(ctx) if clones is not None: callback(ctx, f_name, clones) else: callback(ctx, f_name) - self.__delete_agg_attrs_from_clone(ctx) + self._delete_private_attrs_from_clone(ctx) return ctx From 188adffb8acc2c444c5a97a0d48b1e56634005f1 Mon Sep 17 00:00:00 2001 From: refai06 Date: Thu, 17 Oct 2024 14:07:38 +0530 Subject: [PATCH 7/9] comments update Signed-off-by: refai06 --- .../experimental/component/aggregator/aggregator.py | 9 +++++---- .../component/collaborator/collaborator.py | 8 +++++--- openfl/experimental/interface/participants.py | 12 ++++++------ 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/openfl/experimental/component/aggregator/aggregator.py b/openfl/experimental/component/aggregator/aggregator.py index bba3969008..080190ac64 100644 --- a/openfl/experimental/component/aggregator/aggregator.py +++ b/openfl/experimental/component/aggregator/aggregator.py @@ -109,15 +109,17 @@ def __initialize_private_attributes(self, kwargs: Dict) -> None: self.__private_attrs = self.__private_attrs_callable(**kwargs) def __set_private_attrs_to_clone(self, clone: Any) -> None: - """Set private_attrs to clone as attributes.""" + """Set private_attrs of Aggregator as attributes of FLSpec clone""" if len(self.__private_attrs) > 0: for name, attr in self.__private_attrs.items(): setattr(clone, name, attr) def __delete_private_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None: """ - Remove aggregator private attributes from FLSpec clone before - transition from Aggregator step to collaborator steps. + Remove aggregator private attributes from FLSpec clone + before transition from aggregator step to collaborator steps. + Instead of removing private attributes this method can also replace + private attributes with a string (required in checkpointing) """ # Update aggregator private attributes by taking latest # parameters from clone, then delete attributes from clone. @@ -363,7 +365,6 @@ def do_task(self, f_name: str) -> Any: f_name = f.__name__ self.flow._display_transition_logs(f, parent_func) - # Transition check if aggregator_to_collaborator(f, parent_func): # Transition encountered, break the loop diff --git a/openfl/experimental/component/collaborator/collaborator.py b/openfl/experimental/component/collaborator/collaborator.py index 527d7ee940..d8e8a7fcd7 100644 --- a/openfl/experimental/component/collaborator/collaborator.py +++ b/openfl/experimental/component/collaborator/collaborator.py @@ -67,7 +67,7 @@ def __initialize_private_attributes(self, kwargs: Dict) -> None: self.__private_attrs = self.__private_attrs_callable(**kwargs) def __set_private_attrs_to_clone(self, clone: Any) -> None: - """Set private_attrs to clone as attributes. + """Set private_attrs of Collaborator as attributes of FLSpec clone. Args: clone (FLSpec): Clone to which private attributes are to be @@ -82,8 +82,10 @@ def __set_private_attrs_to_clone(self, clone: Any) -> None: def __delete_private_attrs_from_clone(self, clone: Any, replace_str: str = None) -> None: """ - Remove aggregator private attributes from FLSpec clone before - transition from Aggregator step to collaborator steps + Remove collaborator private attributes from FLSpec clone + before transition from Collaborator step to aggregator steps. + Instead of removing private attributes this method can also replace + private attributes with a string (required in checkpointing) Args: clone (FLSpec): Clone from which private attributes are to be diff --git a/openfl/experimental/interface/participants.py b/openfl/experimental/interface/participants.py index 9881cc017f..f9ac7f5e93 100644 --- a/openfl/experimental/interface/participants.py +++ b/openfl/experimental/interface/participants.py @@ -43,16 +43,16 @@ def name(self, name: str): self._name = name def get_name(self) -> str: - """Gets the name of the aggregator/collaborator. + """Gets the name of Participant (aggregator or collaborator). Returns: - str: The name of the aggregator/collaborator. + str: The name of the aggregator or collaborator. """ return self._name def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) -> None: - """Initialize private attributes of aggregator/collaborator by invoking the - callable specified by user.""" + """Initialize private attributes of Participant (aggregator or collaborator) + by invoking the callable specified by user.""" if self.private_attributes_callable is not None: self.private_attributes = self.private_attributes_callable(**self.kwargs) elif private_attrs: @@ -60,7 +60,7 @@ def initialize_private_attributes(self, private_attrs: Dict[Any, Any] = None) -> def _set_private_attrs_to_clone(self, clone: Any) -> None: """Set private attributes to FLSpec clone before - transitioning from Aggregator step to collaborator steps. + transitioning from aggregator to collaborator or vice versa Args: clone (Any): The clone to set attributes to. @@ -71,7 +71,7 @@ def _set_private_attrs_to_clone(self, clone: Any) -> None: def _delete_private_attrs_from_clone(self, clone: Any) -> None: """Remove private attributes from FLSpec clone before - transition + transitioning from collaborator to aggregator or vice versa Args: clone (Any): The clone to remove attributes from. From 0e962283ba0ce8fc54c4a25d0dc13b88ebb2ba61 Mon Sep 17 00:00:00 2001 From: refai06 Date: Wed, 23 Oct 2024 18:49:38 +0530 Subject: [PATCH 8/9] Optimize code: attribute check Signed-off-by: refai06 --- .../testflow_privateattributes.py | 10 +++--- ...ibutes_initialization_with_both_options.py | 10 +++--- ...ributes_initialization_without_callable.py | 10 +++--- .../src/testflow_privateattributes.py | 30 +++++++++--------- .../src/testflow_privateattributes.py | 30 +++++++++--------- .../src/testflow_privateattributes.py | 31 +++++++++---------- 6 files changed, 60 insertions(+), 61 deletions(-) diff --git a/tests/github/experimental/testflow_privateattributes.py b/tests/github/experimental/testflow_privateattributes.py index 11173a8432..56e3abccff 100644 --- a/tests/github/experimental/testflow_privateattributes.py +++ b/tests/github/experimental/testflow_privateattributes.py @@ -106,8 +106,8 @@ def join(self, inputs): for idx, collab in enumerate(inputs): if ( - hasattr(collab, "train_loader") is True - or hasattr(collab, "test_loader") is True + hasattr(collab, "train_loader") + or hasattr(collab, "test_loader") ): # Error - we are able to access collaborator attributes TestFlowPrivateAttributes.error_list.append( @@ -159,8 +159,8 @@ def validate_collab_private_attr(self, private_attr, step_name): # Collaborator private attributes should not be accessible if ( type(self.collaborators[idx]) is not str - or hasattr(self.runtime, "_collaborators") is True - or hasattr(self.runtime, "__collaborators") is True + or hasattr(self.runtime, "_collaborators") + or hasattr(self.runtime, "__collaborators") ): # Error - we are able to access collaborator attributes TestFlowPrivateAttributes.error_list.append( @@ -183,7 +183,7 @@ def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name): + f"private attributes not accessible {bcolors.ENDC}" ) - if hasattr(self.runtime, "_aggregator") is True: + if hasattr(self.runtime, "_aggregator"): # Error - we are able to access aggregator attributes TestFlowPrivateAttributes.error_list.append( step_name + "_aggregator_attributes_found" diff --git a/tests/github/experimental/testflow_privateattributes_initialization_with_both_options.py b/tests/github/experimental/testflow_privateattributes_initialization_with_both_options.py index d14a8863b8..54c7dcdfe4 100644 --- a/tests/github/experimental/testflow_privateattributes_initialization_with_both_options.py +++ b/tests/github/experimental/testflow_privateattributes_initialization_with_both_options.py @@ -106,8 +106,8 @@ def join(self, inputs): for idx, collab in enumerate(inputs): if ( - hasattr(collab, "train_loader_via_callable") is True - or hasattr(collab, "test_loader_via_callable") is True + hasattr(collab, "train_loader_via_callable") + or hasattr(collab, "test_loader_via_callable") ): # Error - we are able to access collaborator attributes TestFlowPrivateAttributes.error_list.append( @@ -159,8 +159,8 @@ def validate_collab_private_attr(self, private_attr, step_name): # Collaborator private attributes should not be accessible if ( type(self.collaborators[idx]) is not str - or hasattr(self.runtime, "_collaborators") is True - or hasattr(self.runtime, "__collaborators") is True + or hasattr(self.runtime, "_collaborators") + or hasattr(self.runtime, "__collaborators") ): # Error - we are able to access collaborator attributes TestFlowPrivateAttributes.error_list.append( @@ -183,7 +183,7 @@ def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name): + f"private attributes not accessible {bcolors.ENDC}" ) - if hasattr(self.runtime, "_aggregator") is True: + if hasattr(self.runtime, "_aggregator"): # Error - we are able to access aggregator attributes TestFlowPrivateAttributes.error_list.append( step_name + "_aggregator_attributes_found" diff --git a/tests/github/experimental/testflow_privateattributes_initialization_without_callable.py b/tests/github/experimental/testflow_privateattributes_initialization_without_callable.py index e91f94cd20..dba021eb68 100644 --- a/tests/github/experimental/testflow_privateattributes_initialization_without_callable.py +++ b/tests/github/experimental/testflow_privateattributes_initialization_without_callable.py @@ -106,8 +106,8 @@ def join(self, inputs): for idx, collab in enumerate(inputs): if ( - hasattr(collab, "train_loader") is True - or hasattr(collab, "test_loader") is True + hasattr(collab, "train_loader") + or hasattr(collab, "test_loader") ): # Error - we are able to access collaborator attributes TestFlowPrivateAttributes.error_list.append( @@ -159,8 +159,8 @@ def validate_collab_private_attr(self, private_attr, step_name): # Collaborator private attributes should not be accessible if ( type(self.collaborators[idx]) is not str - or hasattr(self.runtime, "_collaborators") is True - or hasattr(self.runtime, "__collaborators") is True + or hasattr(self.runtime, "_collaborators") + or hasattr(self.runtime, "__collaborators") ): # Error - we are able to access collaborator attributes TestFlowPrivateAttributes.error_list.append( @@ -183,7 +183,7 @@ def validate_agg_private_attrs(self, private_attr_1, private_attr_2, step_name): + f"private attributes not accessible {bcolors.ENDC}" ) - if hasattr(self.runtime, "_aggregator") is True: + if hasattr(self.runtime, "_aggregator"): # Error - we are able to access aggregator attributes TestFlowPrivateAttributes.error_list.append( step_name + "_aggregator_attributes_found" diff --git a/tests/github/experimental/workspace/testcase_private_attributes/src/testflow_privateattributes.py b/tests/github/experimental/workspace/testcase_private_attributes/src/testflow_privateattributes.py index 782259b3d2..1f76015b45 100644 --- a/tests/github/experimental/workspace/testcase_private_attributes/src/testflow_privateattributes.py +++ b/tests/github/experimental/workspace/testcase_private_attributes/src/testflow_privateattributes.py @@ -105,8 +105,8 @@ def join(self, inputs): for input in enumerate(inputs): collab = input[1].input if ( - hasattr(input, "train_loader") is True - or hasattr(input, "test_loader") is True + hasattr(input, "train_loader") + or hasattr(input, "test_loader") ): # Error - we are able to access collaborator attributes TestFlowPrivateAttributes.ERROR_LIST.append( @@ -146,17 +146,17 @@ def end(self): def validate_agg_private_attr(self, step_name, **private_attrs_kwargs): """ - Validate that aggregator can only access their own attributes + Validate that aggregator can only access their own attributes Args: - step_name: Name of the step being validated + step_name: Name of the step being validated private_attr_kwargs: Keyword arguments specifying the names of private attributes for the aggregator and collaborators. """ agg_attrs = private_attrs_kwargs.get('aggr',[]) collab_attrs = private_attrs_kwargs.get('collabs', []) # Aggregator should only be able to access its own attributes - # check for missing aggregator attributes + # check for missing aggregator attributes inaccessible_agg_attrs = [attr for attr in agg_attrs if not hasattr(self, attr)] if inaccessible_agg_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( @@ -167,8 +167,8 @@ def validate_agg_private_attr(self, step_name, **private_attrs_kwargs): + f"accessible {bcolors.ENDC}" ) - # check for collaborator private attributes that should not be accessible - breached_collab_attrs = [attr for attr in collab_attrs if hasattr(self,attr) is True] + # check for collaborator private attributes that should not be accessible + breached_collab_attrs = [attr for attr in collab_attrs if hasattr(self,attr)] if breached_collab_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( step_name + "_collaborator_attributes_found" @@ -184,8 +184,8 @@ def validate_agg_private_attr(self, step_name, **private_attrs_kwargs): # Collaborator attributes should not be accessible in aggregator step if ( type(self.collaborators[idx]) is not str - or hasattr(self.runtime, "_collaborators") is True - or hasattr(self.runtime, "__collaborators") is True + or hasattr(self.runtime, "_collaborators") + or hasattr(self.runtime, "__collaborators") ): # Error - we are able to access collaborator attributes TestFlowPrivateAttributes.ERROR_LIST.append( @@ -203,15 +203,15 @@ def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): Validate that collaborators can only access their own attributes Args: - step_name: Name of the step being validated + step_name: Name of the step being validated private_attr_kwargs: Keyword arguments specifying the names of private attributes for the aggregator and collaborators. """ agg_attrs = private_attrs_kwargs.get('aggr',[]) collab_attrs = private_attrs_kwargs.get('collabs', []) # Collaborator should only be able to access its own attributes - - # check for missing collaborators attributes + + # check for missing collaborators attributes inaccessible_collab_attrs = [attr for attr in collab_attrs if not hasattr(self,attr)] if inaccessible_collab_attrs: @@ -222,15 +222,15 @@ def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Collab " + f"private attributes not accessible {bcolors.ENDC}" ) - # check for aggregator private attributes that should not be accessible - breached_agg_attr = [attr for attr in agg_attrs if hasattr(self, attr) is True] + # check for aggregator private attributes that should not be accessible + breached_agg_attr = [attr for attr in agg_attrs if hasattr(self, attr)] if breached_agg_attr: TestFlowPrivateAttributes.ERROR_LIST.append( step_name + "_aggregator_attributes_found" ) print( - f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Aggregator" + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Aggregator" + f" private attributes accessible: {','.join(breached_agg_attr)} {bcolors.ENDC}" ) diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/testflow_privateattributes.py b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/testflow_privateattributes.py index 2ab7a8af87..49127fef0d 100644 --- a/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/testflow_privateattributes.py +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_with_both_options/src/testflow_privateattributes.py @@ -105,8 +105,8 @@ def join(self, inputs): for input in enumerate(inputs): collab = input[1].input if ( - hasattr(input, "train_loader_via_callable") is True - or hasattr(input, "test_loader_via_callable") is True + hasattr(input, "train_loader_via_callable") + or hasattr(input, "test_loader_via_callable") ): # Error - we are able to access collaborator attributes TestFlowPrivateAttributes.ERROR_LIST.append( @@ -146,17 +146,17 @@ def end(self): def validate_agg_private_attr(self, step_name, **private_attrs_kwargs): """ - Validate that aggregator can only access their own attributes + Validate that aggregator can only access their own attributes Args: - step_name: Name of the step being validated + step_name: Name of the step being validated private_attr_kwargs: Keyword arguments specifying the names of private attributes for the aggregator and collaborators. """ agg_attrs = private_attrs_kwargs.get('aggr',[]) collab_attrs = private_attrs_kwargs.get('collabs', []) # Aggregator should only be able to access its own attributes - # check for missing aggregator attributes + # check for missing aggregator attributes inaccessible_agg_attrs = [attr for attr in agg_attrs if not hasattr(self, attr)] if inaccessible_agg_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( @@ -167,8 +167,8 @@ def validate_agg_private_attr(self, step_name, **private_attrs_kwargs): + f"accessible {bcolors.ENDC}" ) - # check for collaborator private attributes that should not be accessible - breached_collab_attrs = [attr for attr in collab_attrs if hasattr(self,attr) is True] + # check for collaborator private attributes that should not be accessible + breached_collab_attrs = [attr for attr in collab_attrs if hasattr(self,attr)] if breached_collab_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( step_name + "_collaborator_attributes_found" @@ -184,8 +184,8 @@ def validate_agg_private_attr(self, step_name, **private_attrs_kwargs): # Collaborator attributes should not be accessible in aggregator step if ( type(self.collaborators[idx]) is not str - or hasattr(self.runtime, "_collaborators") is True - or hasattr(self.runtime, "__collaborators") is True + or hasattr(self.runtime, "_collaborators") + or hasattr(self.runtime, "__collaborators") ): # Error - we are able to access collaborator attributes TestFlowPrivateAttributes.ERROR_LIST.append( @@ -203,15 +203,15 @@ def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): Validate that collaborators can only access their own attributes Args: - step_name: Name of the step being validated + step_name: Name of the step being validated private_attr_kwargs: Keyword arguments specifying the names of private attributes for the aggregator and collaborators. """ agg_attrs = private_attrs_kwargs.get('aggr',[]) collab_attrs = private_attrs_kwargs.get('collabs', []) # Collaborator should only be able to access its own attributes - - # check for missing collaborators attributes + + # check for missing collaborators attributes inaccessible_collab_attrs = [attr for attr in collab_attrs if not hasattr(self,attr)] if inaccessible_collab_attrs: @@ -222,15 +222,15 @@ def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Collab " + f"private attributes not accessible {bcolors.ENDC}" ) - # check for aggregator private attributes that should not be accessible - breached_agg_attr = [attr for attr in agg_attrs if hasattr(self, attr) is True] + # check for aggregator private attributes that should not be accessible + breached_agg_attr = [attr for attr in agg_attrs if hasattr(self, attr)] if breached_agg_attr: TestFlowPrivateAttributes.ERROR_LIST.append( step_name + "_aggregator_attributes_found" ) print( - f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Aggregator" + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Aggregator" + f" private attributes accessible: {','.join(breached_agg_attr)} {bcolors.ENDC}" ) diff --git a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/testflow_privateattributes.py b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/testflow_privateattributes.py index 94dfdba64f..5ce49505c6 100644 --- a/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/testflow_privateattributes.py +++ b/tests/github/experimental/workspace/testcase_private_attributes_initialization_without_callable/src/testflow_privateattributes.py @@ -105,8 +105,8 @@ def join(self, inputs): for input in enumerate(inputs): collab = input[1].input if ( - hasattr(input, "train_loader") is True - or hasattr(input, "test_loader") is True + hasattr(input, "train_loader") + or hasattr(input, "test_loader") ): # Error - we are able to access collaborator attributes TestFlowPrivateAttributes.ERROR_LIST.append( @@ -146,17 +146,17 @@ def end(self): def validate_agg_private_attr(self, step_name, **private_attrs_kwargs): """ - Validate that aggregator can only access their own attributes + Validate that aggregator can only access their own attributes Args: - step_name: Name of the step being validated + step_name: Name of the step being validated private_attr_kwargs: Keyword arguments specifying the names of private attributes for the aggregator and collaborators. """ agg_attrs = private_attrs_kwargs.get('aggr',[]) collab_attrs = private_attrs_kwargs.get('collabs', []) # Aggregator should only be able to access its own attributes - # check for missing aggregator attributes + # check for missing aggregator attributes inaccessible_agg_attrs = [attr for attr in agg_attrs if not hasattr(self, attr)] if inaccessible_agg_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( @@ -167,8 +167,8 @@ def validate_agg_private_attr(self, step_name, **private_attrs_kwargs): + f"accessible {bcolors.ENDC}" ) - # check for collaborator private attributes that should not be accessible - breached_collab_attrs = [attr for attr in collab_attrs if hasattr(self,attr) is True] + # check for collaborator private attributes that should not be accessible + breached_collab_attrs = [attr for attr in collab_attrs if hasattr(self,attr)] if breached_collab_attrs: TestFlowPrivateAttributes.ERROR_LIST.append( step_name + "_collaborator_attributes_found" @@ -184,8 +184,8 @@ def validate_agg_private_attr(self, step_name, **private_attrs_kwargs): # Collaborator attributes should not be accessible in aggregator step if ( type(self.collaborators[idx]) is not str - or hasattr(self.runtime, "_collaborators") is True - or hasattr(self.runtime, "__collaborators") is True + or hasattr(self.runtime, "_collaborators") + or hasattr(self.runtime, "__collaborators") ): # Error - we are able to access collaborator attributes TestFlowPrivateAttributes.ERROR_LIST.append( @@ -202,15 +202,15 @@ def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): """ Validate that collaborators can only access their own attributes Args: - step_name: Name of the step being validated + step_name: Name of the step being validated private_attr_kwargs: Keyword arguments specifying the names of private attributes for the aggregator and collaborators. """ agg_attrs = private_attrs_kwargs.get('aggr',[]) collab_attrs = private_attrs_kwargs.get('collabs', []) # Collaborator should only be able to access its own attributes - - # check for missing collaborators attributes + + # check for missing collaborators attributes inaccessible_collab_attrs = [attr for attr in collab_attrs if not hasattr(self,attr)] if inaccessible_collab_attrs: @@ -221,15 +221,15 @@ def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Collab " + f"private attributes not accessible {bcolors.ENDC}" ) - # check for aggregator private attributes that should not be accessible - breached_agg_attr = [attr for attr in agg_attrs if hasattr(self, attr) is True] + # check for aggregator private attributes that should not be accessible + breached_agg_attr = [attr for attr in agg_attrs if hasattr(self, attr)] if breached_agg_attr: TestFlowPrivateAttributes.ERROR_LIST.append( step_name + "_aggregator_attributes_found" ) print( - f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Aggregator" + f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Aggregator" + f" private attributes accessible: {','.join(breached_agg_attr)} {bcolors.ENDC}" ) @@ -243,4 +243,3 @@ def validate_collab_private_attrs(self, step_name, **private_attrs_kwargs): f"{bcolors.FAIL} ... Attribute test failed in {step_name} - Aggregator" + f" private attributes accessible {bcolors.ENDC}" ) - \ No newline at end of file From 71c6d2d70c5b6a500ed58fa20203af6024c36ab5 Mon Sep 17 00:00:00 2001 From: rajith Date: Mon, 4 Nov 2024 13:58:37 +0530 Subject: [PATCH 9/9] Validating Log File Path for cli function --- openfl/interface/cli.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/openfl/interface/cli.py b/openfl/interface/cli.py index ba7b9c2133..07aae45257 100755 --- a/openfl/interface/cli.py +++ b/openfl/interface/cli.py @@ -182,9 +182,17 @@ def cli(context, log_level, no_warnings): # This will be overridden later with user selected debugging level disable_warnings() log_file = os.getenv("LOG_FILE") - # Validate log_file using allow list approach - if log_file and not re.match(r"^[\w\-.]+$", log_file): - raise ValueError("Invalid log file path") + # Validate log_file with tighter restrictions + if log_file: + log_file = os.path.normpath(log_file) + if not re.match(r"^logs/[\w\-.]+$", log_file) or ".." in log_file or log_file.startswith("/"): + raise ValueError("Invalid log file path") + + # Ensure the log file is in the 'logs' directory + allowed_directory = Path("logs").resolve() + full_path = (allowed_directory / log_file).resolve() + if not str(full_path).startswith(str(allowed_directory)): + raise ValueError("Log file path is not allowed") setup_logging(log_level, log_file) sys.stdout.reconfigure(encoding="utf-8")