Skip to content

Commit

Permalink
Create interfaces for OpenFL components with methods that can be impl…
Browse files Browse the repository at this point in the history
…ementedCreate interfaces for OpenFL components with methods that can be implemented

Create AggregatorInterface and CollaboratorInterface in openflinterface/interactive_api/experiment.py
Support passing the aggregator object and collaborator object in the method FLExperiment.start()
If aggregator object or collaborator object is None, the default aggregator and collaborator objects will
be created at director and envoy services.

Known issue:
Since the AggregatorInterface and CollaboratorInterface are for ad-hoc algorithm development. We should modify the
definitions of the model, data loader and task interfaces, since we are going to support some algorithms other than Keras, TensorFlow or PyTorch.

Fixes securefederatedai#802

Signed-off-by: He, Dan H <[email protected]>
Signed-off-by: Jiang, Jiaqiu <[email protected]>
Signed-off-by: Li, Qingqing <[email protected]>
Signed-off-by: Wang, Le <[email protected]>
Signed-off-by: Wu, Caili <[email protected]>
Signed-off-by: He, Dan H <[email protected]>
  • Loading branch information
danhe1 committed Apr 28, 2023
1 parent 60911a6 commit cf710a7
Show file tree
Hide file tree
Showing 2 changed files with 256 additions and 5 deletions.
27 changes: 25 additions & 2 deletions openfl/federated/plan/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,17 @@ def get_aggregator(self, tensor_dict=None):

defaults[SETTINGS]['log_metric_callback'] = log_metric_callback
if self.aggregator_ is None:
self.aggregator_ = Plan.build(**defaults, initial_tensor_dict=tensor_dict)
# Try loading the aggregator from the serialized object file
api_layer = self.config['api_layer']
obj_file = api_layer['settings']['aggregator_interface_file']
self.aggregator_ = self.restore_object(obj_file)

if self.aggregator_ is None: # Failed to load the object file
self.aggregator_ = Plan.build(**defaults, initial_tensor_dict=tensor_dict)
else:
# Pass the settings to the deserialized object
self.aggregator_.initialize(**defaults[SETTINGS],
initial_tensor_dict=tensor_dict)

return self.aggregator_

Expand Down Expand Up @@ -491,7 +501,16 @@ def get_collaborator(self, collaborator_name, root_certificate=None, private_key
)

if self.collaborator_ is None:
self.collaborator_ = Plan.build(**defaults)
# Try loading the collaborator from the serialized object file
api_layer = self.config['api_layer']
obj_file = api_layer['settings']['collaborator_interface_file']
self.collaborator_ = self.restore_object(obj_file)

if self.collaborator_ is None: # Failed to load from the object file
self.collaborator_ = Plan.build(**defaults)
else:
# Pass the settings to the deserialized object
self.collaborator_.initialize(**defaults[SETTINGS])

return self.collaborator_

Expand Down Expand Up @@ -588,8 +607,12 @@ def get_serializer_plugin(self, **kwargs):

def restore_object(self, filename):
"""Deserialize an object."""
import os

serializer_plugin = self.get_serializer_plugin()
if serializer_plugin is None:
return None
if not os.path.exists(filename):
return None
obj = serializer_plugin.restore_object(filename)
return obj
234 changes: 231 additions & 3 deletions openfl/interface/interactive_api/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,14 @@ def remove_experiment_data(self):
self.logger.info(log_message)

def prepare_workspace_distribution(self, model_provider, task_keeper, data_loader,
aggregator, collaborator,
task_assigner,
pip_install_options: Tuple[str] = ()):
"""Prepare an archive from a user workspace."""
# Save serialized python objects to disc
self._serialize_interface_objects(model_provider, task_keeper, data_loader, task_assigner)
self._serialize_interface_objects(model_provider, task_keeper, data_loader,
aggregator, collaborator,
task_assigner)
# Save the prepared plan
Plan.dump(Path(f'./plan/{self.plan.name}'), self.plan.config, freeze=False)

Expand All @@ -193,6 +196,8 @@ def prepare_workspace_distribution(self, model_provider, task_keeper, data_loade

def start(self, *, model_provider, task_keeper, data_loader,
rounds_to_train: int,
aggregator=None,
collaborator=None,
task_assigner=None,
override_config: dict = None,
delta_updates: bool = False,
Expand All @@ -210,6 +215,8 @@ def start(self, *, model_provider, task_keeper, data_loader,
task_keeper - Task Interface instance.
data_loader - Data Interface instance.
rounds_to_train - required number of training rounds for the experiment.
aggregator - Aggregator Interface instance.
collaborator - Collaborator Interface instance.
delta_updates - [bool] Tells if collaborators should send delta updates
for the locally tuned models. If set to False, whole checkpoints will be sent.
opt_treatment - Optimizer state treatment policy.
Expand All @@ -234,10 +241,13 @@ def start(self, *, model_provider, task_keeper, data_loader,
override_config=override_config,
model_interface_file='model_obj.pkl',
tasks_interface_file='tasks_obj.pkl',
dataloader_interface_file='loader_obj.pkl')
dataloader_interface_file='loader_obj.pkl',
aggregator_interface_file='aggregator_obj.pkl',
collaborator_interface_file='collaborator_obj.pkl')

self.prepare_workspace_distribution(
model_provider, task_keeper, data_loader,
aggregator, collaborator,
task_assigner,
pip_install_options
)
Expand Down Expand Up @@ -360,6 +370,8 @@ def _prepare_plan(self, model_provider, data_loader,
override_config=None,
model_interface_file='model_obj.pkl', tasks_interface_file='tasks_obj.pkl',
dataloader_interface_file='loader_obj.pkl',
aggregator_interface_file='aggregator_obj.pkl',
collaborator_interface_file = 'collaborator_obj.pkl',
aggregation_function_interface_file='aggregation_function_obj.pkl',
task_assigner_file='task_assigner_obj.pkl'):
"""Fill plan.yaml file using user provided setting."""
Expand Down Expand Up @@ -410,6 +422,8 @@ def _prepare_plan(self, model_provider, data_loader,
'model_interface_file': model_interface_file,
'tasks_interface_file': tasks_interface_file,
'dataloader_interface_file': dataloader_interface_file,
'aggregator_interface_file': aggregator_interface_file,
'collaborator_interface_file': collaborator_interface_file,
'aggregation_function_interface_file': aggregation_function_interface_file,
'task_assigner_file': task_assigner_file
}
Expand All @@ -423,6 +437,8 @@ def _serialize_interface_objects(
model_provider,
task_keeper,
data_loader,
aggregator,
collaborator,
task_assigner
):
"""Save python objects to be restored on collaborators."""
Expand All @@ -436,12 +452,15 @@ def _serialize_interface_objects(
'model_interface_file': model_provider,
'tasks_interface_file': task_keeper,
'dataloader_interface_file': data_loader,
'aggregator_interface_file': aggregator,
'collaborator_interface_file': collaborator,
'aggregation_function_interface_file': task_keeper.aggregation_functions,
'task_assigner_file': task_assigner
}

for filename, object_ in obj_dict.items():
serializer.serialize(object_, self.plan.config['api_layer']['settings'][filename])
if object_ is not None:
serializer.serialize(object_, self.plan.config['api_layer']['settings'][filename])


class TaskKeeper:
Expand Down Expand Up @@ -668,3 +687,212 @@ def get_train_data_size(self):
def get_valid_data_size(self):
"""Information for aggregation."""
raise NotImplementedError


class AggregatorInterface:
"""
The class to define an aggregator.
The experiment manager can define a customized aggregator object for some ad-hoc
federated learning task.
"""

def __init__(self, **kwargs):
self.kwargs = kwargs


def initialize(self,

aggregator_uuid,
federation_uuid,
authorized_cols,

rounds_to_train=256,
single_col_cert_common_name=None,
**kwargs):
"""
This method is called by the Plan component.
Args:
aggregator_uuid (str): Aggregation ID.
federation_uuid (str): Federation ID.
authorized_cols (list of str): The list of IDs of enrolled collaborators.
init_state_path* (str): The location of the initial weight file.
last_state_path* (str): The file location to store the latest weight.
best_state_path* (str): The file location to store the weight of the best model.
"""

self.round_number = 0
self.single_col_cert_common_name = single_col_cert_common_name
if self.single_col_cert_common_name is None:
self.single_col_cert_common_name = ''

self.rounds_to_train = rounds_to_train

self.authorized_cols = authorized_cols
self.uuid = aggregator_uuid
self.federation_uuid = federation_uuid

self.logger = getLogger(__name__)

self.quit_job_sent_to = []

self.kwargs.update(kwargs)


def valid_collaborator_cn_and_id(self, cert_common_name,
collaborator_common_name):
"""
Determine if the collaborator certificate and ID are valid for this federation.
Args:
cert_common_name: Common name for security certificate
collaborator_common_name: Common name for collaborator
Returns:
bool: True means the collaborator common name matches the name in
the security certificate.
"""
# if self.test_mode_whitelist is None, then the common_name must
# match collaborator_common_name and be in authorized_cols
# FIXME: '' instead of None is just for protobuf compatibility.
# Cleaner solution?
if self.single_col_cert_common_name == '':
return (cert_common_name == collaborator_common_name
and collaborator_common_name in self.authorized_cols)
# otherwise, common_name must be in whitelist and
# collaborator_common_name must be in authorized_cols
else:
return (cert_common_name == self.single_col_cert_common_name
and collaborator_common_name in self.authorized_cols)


def get_tasks(self, collaborator_name):
"""
RPC called by a collaborator to determine which tasks to perform.
Args:
collaborator_name: str
Requested collaborator name
Returns:
tasks: list[str]
List of tasks to be performed by the requesting collaborator
for the current round.
sleep_time: int
time_to_quit: bool
"""
raise NotImplementedError


def get_aggregated_tensor(self, collaborator_name, tensor_name,
round_number, report, tags, require_lossless):
"""
RPC called by collaborator.
Performs local lookup to determine if there is an aggregated tensor available \
that matches the request.
Args:
collaborator_name : str
Requested tensor key collaborator name
tensor_name: str
require_lossless: bool
round_number: int
report: bool
tags: tuple[str, ...]
Returns:
named_tensor : protobuf NamedTensor
the tensor requested by the collaborator
"""
raise NotImplementedError


def send_local_task_results(self, collaborator_name, round_number, task_name,
data_size, named_tensors):
"""
RPC called by collaborator.
Transmits collaborator's task results to the aggregator.
Args:
collaborator_name: str
task_name: str
round_number: int
data_size: int
named_tensors: protobuf NamedTensor
Returns:
None
"""
raise NotImplementedError


def all_quit_jobs_sent(self):
"""Assert all quit jobs are sent to collaborators."""
return set(self.quit_job_sent_to) == set(self.authorized_cols)



class CollaboratorInterface:
"""
The class to define a collaborator.
The experiment manager can define a customized aggregator object for some ad-hoc
federated learning task.
"""

def __init__(self, **kwargs):
self.kwargs = kwargs


def initialize(self, collaborator_name,
aggregator_uuid,
federation_uuid,
client,
**kwargs):
"""
This method is called by the Plan component, to pass the settings to the
collaborator.
Args:
collaborator_name (string): The common name for the collaborator
aggregator_uuid: The unique id for the client
federation_uuid: The unique id for the federation
"""
self.single_col_cert_common_name = None

if self.single_col_cert_common_name is None:
self.single_col_cert_common_name = '' # for protobuf compatibility
# we would really want this as an object

self.collaborator_name = collaborator_name
self.aggregator_uuid = aggregator_uuid
self.federation_uuid = federation_uuid

self.client = client

self.logger = getLogger(__name__)

self.kwargs.update(kwargs)


def run(self):
"""
Run the collaborator, called by the Envoy service.
"""
raise NotImplementedError


def set_available_devices(self, cuda: Tuple[str] = ()):
"""
This method is called by the Envoy service.
Set available CUDA devices.
Cuda tuple contains string indeces, ('1', '3').
"""
self.cuda_devices = cuda

0 comments on commit cf710a7

Please sign in to comment.