Skip to content

Commit

Permalink
Add collaborator side metric logging
Browse files Browse the repository at this point in the history
Signed-off-by: Shah, Karan <[email protected]>
  • Loading branch information
MasterSkepticista committed Dec 21, 2024
1 parent aab8baf commit 3c5e525
Showing 1 changed file with 25 additions and 9 deletions.
34 changes: 25 additions & 9 deletions openfl/component/collaborator/collaborator.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def __init__(
self.callbacks = callbacks_module.CallbackList(
callbacks,
add_memory_profiler=log_memory_usage,
add_metric_writer=write_logs,
origin=self.collaborator_name,
)

Expand Down Expand Up @@ -184,12 +185,14 @@ def run(self):
self.callbacks.on_round_begin(round_num)

# Run tasks
logs = {}
for task in tasks:
self.do_task(task, round_num)
metrics = self.do_task(task, round_num)
logs.update(metrics)

# Round end
self.tensor_db.clean_up(self.db_store_rounds)
self.callbacks.on_round_end(round_num)
self.callbacks.on_round_end(round_num, logs)

# Experiment end
self.callbacks.on_experiment_end()
Expand Down Expand Up @@ -235,12 +238,15 @@ def get_tasks(self):

return tasks, round_number, sleep_time, time_to_quit

def do_task(self, task, round_number):
def do_task(self, task, round_number) -> dict:
"""Perform the specified task.
Args:
task (list_of_str): List of tasks.
round_number (int): Actual round number.
Returns:
A dictionary of reportable metrics of the current collaborator for the task.
"""
# map this task to an actual function name and kwargs
if hasattr(self.task_runner, "TASK_REGISTRY"):
Expand Down Expand Up @@ -329,7 +335,8 @@ def do_task(self, task, round_number):

# send the results for this tasks; delta and compression will occur in
# this function
self.send_task_results(global_output_tensor_dict, round_number, task_name)
metrics = self.send_task_results(global_output_tensor_dict, round_number, task_name)
return metrics

def get_numpy_dict_for_tensorkeys(self, tensor_keys):
"""Get tensor dictionary for specified tensorkey set.
Expand Down Expand Up @@ -462,13 +469,16 @@ def get_aggregated_tensor_from_aggregator(self, tensor_key, require_lossless=Fal

return nparray

def send_task_results(self, tensor_dict, round_number, task_name):
def send_task_results(self, tensor_dict, round_number, task_name) -> dict:
"""Send task results to the aggregator.
Args:
tensor_dict (dict): Tensor dictionary.
round_number (int): Actual round number.
task_name (string): Task name.
Returns:
A dictionary of reportable metrics of the current collaborator for the task.
"""
named_tensors = [self.nparray_to_named_tensor(k, v) for k, v in tensor_dict.items()]

Expand All @@ -485,15 +495,19 @@ def send_task_results(self, tensor_dict, round_number, task_name):

logger.debug("%s data size = %s", task_name, data_size)

metrics = {}
for tensor in tensor_dict:
tensor_name, origin, fl_round, report, tags = tensor

if report:
logger.info(
f"Round {round_number}, collaborator {self.collaborator_name} "
f"is sending metric for task {task_name}:"
f" {tensor_name}\t{tensor_dict[tensor]:f}"
# This tensor must be a scalar metric
assert (
tensor_dict[tensor].ndim == 0
), "Expected `report` tensor to be a scalar, instead got one with shape {}".format(
tensor_dict[tensor].shape
)
value = float(tensor_dict[tensor])
metrics.update({f"{self.collaborator_name}/{task_name}/{tensor_name}": value})

self.client.send_local_task_results(
self.collaborator_name,
Expand All @@ -503,6 +517,8 @@ def send_task_results(self, tensor_dict, round_number, task_name):
named_tensors,
)

return metrics

def nparray_to_named_tensor(self, tensor_key, nparray):
"""Construct the NamedTensor Protobuf.
Expand Down

0 comments on commit 3c5e525

Please sign in to comment.