diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index 7a2f0aee9d..a8d6024719 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -152,6 +152,7 @@ def __init__( self.callbacks = callbacks_module.CallbackList( callbacks, add_memory_profiler=log_memory_usage, + add_metric_writer=write_logs, origin=self.collaborator_name, ) @@ -184,12 +185,14 @@ def run(self): self.callbacks.on_round_begin(round_num) # Run tasks + logs = {} for task in tasks: - self.do_task(task, round_num) + metrics = self.do_task(task, round_num) + logs.update(metrics) # Round end self.tensor_db.clean_up(self.db_store_rounds) - self.callbacks.on_round_end(round_num) + self.callbacks.on_round_end(round_num, logs) # Experiment end self.callbacks.on_experiment_end() @@ -235,12 +238,15 @@ def get_tasks(self): return tasks, round_number, sleep_time, time_to_quit - def do_task(self, task, round_number): + def do_task(self, task, round_number) -> dict: """Perform the specified task. Args: task (list_of_str): List of tasks. round_number (int): Actual round number. + + Returns: + A dictionary of reportable metrics of the current collaborator for the task. """ # map this task to an actual function name and kwargs if hasattr(self.task_runner, "TASK_REGISTRY"): @@ -329,7 +335,8 @@ def do_task(self, task, round_number): # send the results for this tasks; delta and compression will occur in # this function - self.send_task_results(global_output_tensor_dict, round_number, task_name) + metrics = self.send_task_results(global_output_tensor_dict, round_number, task_name) + return metrics def get_numpy_dict_for_tensorkeys(self, tensor_keys): """Get tensor dictionary for specified tensorkey set. @@ -462,13 +469,16 @@ def get_aggregated_tensor_from_aggregator(self, tensor_key, require_lossless=Fal return nparray - def send_task_results(self, tensor_dict, round_number, task_name): + def send_task_results(self, tensor_dict, round_number, task_name) -> dict: """Send task results to the aggregator. Args: tensor_dict (dict): Tensor dictionary. round_number (int): Actual round number. task_name (string): Task name. + + Returns: + A dictionary of reportable metrics of the current collaborator for the task. """ named_tensors = [self.nparray_to_named_tensor(k, v) for k, v in tensor_dict.items()] @@ -485,15 +495,19 @@ def send_task_results(self, tensor_dict, round_number, task_name): logger.debug("%s data size = %s", task_name, data_size) + metrics = {} for tensor in tensor_dict: tensor_name, origin, fl_round, report, tags = tensor if report: - logger.info( - f"Round {round_number}, collaborator {self.collaborator_name} " - f"is sending metric for task {task_name}:" - f" {tensor_name}\t{tensor_dict[tensor]:f}" + # This tensor must be a scalar metric + assert ( + tensor_dict[tensor].ndim == 0 + ), "Expected `report` tensor to be a scalar, instead got one with shape {}".format( + tensor_dict[tensor].shape ) + value = float(tensor_dict[tensor]) + metrics.update({f"{self.collaborator_name}/{task_name}/{tensor_name}": value}) self.client.send_local_task_results( self.collaborator_name, @@ -503,6 +517,8 @@ def send_task_results(self, tensor_dict, round_number, task_name): named_tensors, ) + return metrics + def nparray_to_named_tensor(self, tensor_key, nparray): """Construct the NamedTensor Protobuf.