From 1a45017a4e9d4e63157759f913ef1c5348426eab Mon Sep 17 00:00:00 2001 From: Jeroen Galjaard Date: Sat, 30 Apr 2022 16:06:09 +0200 Subject: [PATCH] Add confusion matrix to federated experiment logging --- Dockerfile | 3 ++- README.md | 24 ++++++++++++++++++- fltk/core/client.py | 43 +++++++++++++++++---------------- fltk/core/federator.py | 47 +++++++++++++++++++------------------ fltk/util/data_container.py | 17 +++++++++++++- 5 files changed, 88 insertions(+), 46 deletions(-) diff --git a/Dockerfile b/Dockerfile index ad7bc67e..e7828026 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,10 +15,11 @@ WORKDIR /opt/federation-lab RUN apt-get update \ && apt-get install -y python3.9 -# Setup pip3.9 +# Setup pip3.9 for dependencies RUN apt install -y curl python3.9-distutils RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py RUN python3 get-pip.py + # Add Pre-downloaded models (otherwise needs be run every-time) ADD data/ data/ diff --git a/README.md b/README.md index f2707aba..4f7bfaf4 100644 --- a/README.md +++ b/README.md @@ -419,6 +419,27 @@ many small files is slow (as they will be compressed individually). The command kubectl cp --namespace test fl-extractor:/opt/federation-lab/logging ./logging ``` +⚠️ Make sure to test your configurations before running your experiments, to ensure that your data is written in a +persistent fashion. Writing to a directory that is not mounted to an NFS disk may result in dataloss. + +⚠️ For federated learning experiments, data is written by the Federator to a disk, as such only a single mount of an +NFS is needed (see for example how [`V1PytorchTrainJob`](fltk/util/cluster/client.py)s are constructed by the Orchestrator). +It is advisable to load the data of Federated learning experiments using the `pandas` library, this can be done as follows. +For data exploration, using a Jupyter Notebook may be advisable. + +```python +import pandas as pd +experiment_file = 'path/to/your/experiment.csv' +df = pd.read_csv(experiment_file) + +df +``` +This should display the contents of the csv file parsed into a `pd.DataFrame`, which should have the following schema. +``` +round_id,train_duration,test_duration,round_duration,num_epochs,trained_items,accuracy,train_loss,test_loss,timestamp,node_name,confusion_matrix +``` + + ### Launching an experiment We have now completed the setup of the project and can continue by running actual experiments. If no errors occur, this should. You may also skip this step and work on your code, but it might be good to test your deployment @@ -469,4 +490,5 @@ Which will collect and run all the tests in the repository, and show in `verbose ## Known issues / Limitations -* Currently, there is no GPU support in the Docker containers. +* Currently, there is no GPU support in the Docker containers, for this the `Dockerfile` will need to be updated to +accomodate for this. diff --git a/fltk/core/client.py b/fltk/core/client.py index 97725489..cace04c0 100644 --- a/fltk/core/client.py +++ b/fltk/core/client.py @@ -1,6 +1,8 @@ -import time from typing import Tuple, Any +import numpy as np +import sklearn +import time import torch from fltk.core.node import Node @@ -20,7 +22,7 @@ def __init__(self, identifier: str, rank: int, world_size: int, config: Config): self.loss_function = self.config.get_loss_function()() self.optimizer = get_optimizer(self.config.optimizer)(self.net.parameters(), - **self.config.optimizer_args) + **self.config.optimizer_args) self.scheduler = MinCapableStepLR(self.optimizer, self.config.scheduler_step_size, self.config.scheduler_gamma, @@ -101,18 +103,18 @@ def train(self, num_epochs: int): def set_tau_eff(self, total): client_weight = self.get_client_datasize() / total - n = self.get_client_datasize() # pylint: disable=invalid-name - E = self.config.epochs # pylint: disable=invalid-name + n = self.get_client_datasize() # pylint: disable=invalid-name + E = self.config.epochs # pylint: disable=invalid-name B = 16 # nicely hardcoded :) # pylint: disable=invalid-name tau_eff = int(E * n / B) * client_weight if hasattr(self.optimizer, 'set_tau_eff'): self.optimizer.set_tau_eff(tau_eff) - def test(self): + def test(self) -> Tuple[float, float, np.array]: """ Function implementing federated learning test loop. - @return: Final running loss statistic and accuracy statistic. - @rtype: Tuple[float, float] + @return: Statistics on test-set given a (partially) trained model; accuracy, loss, and confusion matrix. + @rtype: Tuple[float, float, np.array] """ start_time = time.time() correct = 0 @@ -126,7 +128,7 @@ def test(self): outputs = self.net(images) - _, predicted = torch.max(outputs.data, 1) # pylint: disable=no-member + _, predicted = torch.max(outputs.data, 1) # pylint: disable=no-member total += labels.size(0) correct += (predicted == labels).sum().item() @@ -134,34 +136,35 @@ def test(self): pred_.extend(predicted.cpu().numpy()) loss += self.loss_function(outputs, labels).item() + + # Calculate learning statistics loss /= len(self.dataset.get_test_loader().dataset) accuracy = 100.0 * correct / total - # confusion_mat = confusion_matrix(targets_, pred_) - # accuracy_per_class = confusion_mat.diagonal() / confusion_mat.sum(1) - # - # class_precision = calculate_class_precision(confusion_mat) - # class_recall = calculate_class_recall(confusion_mat) + + confusion_mat = sklearn.metrics.confusion_matrix(targets_, pred_) + end_time = time.time() duration = end_time - start_time self.logger.info(f'Test duration is {duration} seconds') - return accuracy, loss + return accuracy, loss, confusion_mat - def get_client_datasize(self): # pylint: disable=missing-function-docstring + def get_client_datasize(self): # pylint: disable=missing-function-docstring return len(self.dataset.get_train_sampler()) - def exec_round(self, num_epochs: int) -> Tuple[Any, Any, Any, Any, float, float, float]: + def exec_round(self, num_epochs: int) -> Tuple[Any, Any, Any, Any, float, float, float, np.array]: """ Function as access point for the Federator Node to kick-off a remote learning round on a client. @param num_epochs: Number of epochs to run @type num_epochs: int - @return: Tuple containing the statistics of the training round. - @rtype: Tuple + @return: Tuple containing the statistics of the training round; loss, weights, accuracy, test_loss, make-span, + training make-span, testing make-span, and confusion matrix. + @rtype: Tuple[Any, Any, Any, Any, float, float, float, np.array] """ start = time.time() loss, weights = self.train(num_epochs) time_mark_between = time.time() - accuracy, test_loss = self.test() + accuracy, test_loss, test_conf_matrix = self.test() end = time.time() round_duration = end - start @@ -173,7 +176,7 @@ def exec_round(self, num_epochs: int) -> Tuple[Any, Any, Any, Any, float, float, self.optimizer.pre_communicate() for k, value in weights.items(): weights[k] = value.cpu() - return loss, weights, accuracy, test_loss, round_duration, train_duration, test_duration + return loss, weights, accuracy, test_loss, round_duration, train_duration, test_duration, test_conf_matrix def __del__(self): self.logger.info(f'Client {self.id} is stopping') diff --git a/fltk/core/federator.py b/fltk/core/federator.py index 75a45b22..906ffbed 100644 --- a/fltk/core/federator.py +++ b/fltk/core/federator.py @@ -1,8 +1,11 @@ import copy + +import numpy as np +import sklearn import time from dataclasses import dataclass from pathlib import Path -from typing import List, Union +from typing import List, Union, Tuple import torch @@ -80,8 +83,8 @@ def create_clients(self): client_name = f'client{client_id}' client = Client(client_name, client_id, world_size, copy.deepcopy(self.config)) self.clients.append( - LocalClient(client_name, client, 0, DataContainer(client_name, self.config.output_path, - ClientRecord, self.config.save_data_append))) + LocalClient(client_name, client, 0, DataContainer(client_name, self.config.output_path, + ClientRecord, self.config.save_data_append))) self.logger.info(f'Client "{client_name}" created') def register_client(self, client_name: str, rank: int): @@ -98,8 +101,8 @@ def register_client(self, client_name: str, rank: int): if self.config.single_machine: self.logger.warning('This function should not be called when in single machine mode!') self.clients.append( - LocalClient(client_name, client_name, rank, DataContainer(client_name, self.config.output_path, - ClientRecord, self.config.save_data_append))) + LocalClient(client_name, client_name, rank, DataContainer(client_name, self.config.output_path, + ClientRecord, self.config.save_data_append))) def stop_all_clients(self): """ @@ -202,14 +205,15 @@ def set_tau_eff(self): # responses.append((client, _remote_method_async(Client.set_tau_eff, client.ref, total))) # torch.futures.wait_all([x[1] for x in responses]) - def test(self, net): + def test(self, net) -> Tuple[float, float, np.array]: """ Function to test the learned global model by the Federator. This does not take the client distributions in account but is centralized. @param net: Global network to be tested on the Federator centralized testing dataset. @type net: torch.nn.Module - @return: Accuracy and loss of the global network on a (subset) of the testing data. - @rtype: Tuple[float, float] + @return: Accuracy and loss of the global network on a (subset) of the testing data, and the confusion matrix + corresponding to the models' predictions. + @rtype: Tuple[float, float, np.array] """ start_time = time.time() correct = 0 @@ -233,15 +237,12 @@ def test(self, net): loss += self.loss_function(outputs, labels).item() loss /= len(self.dataset.get_test_loader().dataset) accuracy = 100.0 * correct / total - # confusion_mat = confusion_matrix(targets_, pred_) - # accuracy_per_class = confusion_mat.diagonal() / confusion_mat.sum(1) - # - # class_precision = calculate_class_precision(confusion_mat) - # class_recall = calculate_class_recall(confusion_mat) + confusion_mat = sklearn.metrics.confusion_matrix(targets_, pred_) + end_time = time.time() duration = end_time - start_time self.logger.info(f'Test duration is {duration} seconds') - return accuracy, loss + return accuracy, loss, confusion_mat def exec_round(self, com_round_id: int): """ @@ -271,18 +272,16 @@ def exec_round(self, com_round_id: int): # Client training training_futures: List[torch.Future] = [] # pylint: disable=no-member - # def cb_factory(future: torch.Future, method, client, client_weights, client_sizes, num_epochs, name): - # future.then(lambda x: method(x, client, client_weights, client_sizes, num_epochs, client.name)) - - def training_cb(fut: torch.Future, client_ref: LocalClient, client_weights, client_sizes, # pylint: disable=no-member + def training_cb(fut: torch.Future, client_ref: LocalClient, client_weights, client_sizes, num_epochs): # pylint: disable=no-member - train_loss, weights, accuracy, test_loss, round_duration, train_duration, test_duration = fut.wait() + train_loss, weights, accuracy, test_loss, round_duration, train_duration, test_duration, c_mat = fut.wait() self.logger.info(f'Training callback for client {client_ref.name} with accuracy={accuracy}') client_weights[client_ref.name] = weights client_data_size = self.message(client_ref.ref, Client.get_client_datasize) client_sizes[client_ref.name] = client_data_size - client_ref.exp_data.append(ClientRecord(com_round_id, train_duration, test_duration, round_duration, - num_epochs, 0, accuracy, train_loss, test_loss)) + c_record = ClientRecord(com_round_id, train_duration, test_duration, round_duration, num_epochs, 0, + accuracy, train_loss, test_loss, confusion_matrix=c_mat) + client_ref.exp_data.append(c_record) for client in selected_clients: future = self.message_async(client.ref, Client.exec_round, num_epochs) @@ -304,10 +303,12 @@ def all_futures_done(futures: List[torch.Future]) -> bool: # pylint: disable=no updated_model = self.aggregation_method(client_weights, client_sizes) self.update_nn_parameters(updated_model) - test_accuracy, test_loss = self.test(self.net) + test_accuracy, test_loss, conf_mat = self.test(self.net) self.logger.info(f'[Round {com_round_id:>3}] Federator has a accuracy of {test_accuracy} and loss={test_loss}') end_time = time.time() duration = end_time - start_time - self.exp_data.append(FederatorRecord(len(selected_clients), com_round_id, duration, test_loss, test_accuracy)) + record = FederatorRecord(len(selected_clients), com_round_id, duration, test_loss, test_accuracy, + confusion_matrix=conf_mat) + self.exp_data.append(record) self.logger.info(f'[Round {com_round_id:>3}] Round duration is {duration} seconds') diff --git a/fltk/util/data_container.py b/fltk/util/data_container.py index 8f53a551..1bd8a462 100644 --- a/fltk/util/data_container.py +++ b/fltk/util/data_container.py @@ -1,4 +1,6 @@ import csv + +import numpy as np import time from dataclasses import dataclass from pathlib import Path @@ -20,7 +22,7 @@ class FederatorRecord(DataRecord): # Accuracy per class? timestamp: float = time.time() node_name: str = '' - + confusion_matrix: np.array = None @dataclass class ClientRecord(DataRecord): @@ -36,9 +38,14 @@ class ClientRecord(DataRecord): # Accuracy per class? timestamp: float = time.time() node_name: str = '' + confusion_matrix: np.array = None class DataContainer: + """ + Datacontainer class for collecting experiment data. By default an 'Excel' compatible format is used by numpy and + the csv library. As such, it is adviced to use a library such as `pandas` to load data for analysis purposes. + """ records: List[DataRecord] file_name: str file_handle: TextIO @@ -76,8 +83,16 @@ def append(self, record: DataRecord): self.file_handle.flush() def save(self): + """ + Function to save the encapsulated data to the experiment file. The format is by default 'excel' compatible, + resulting in the capability of loading complex objects such as ndarrays as a field. + @return: None + @rtype: None + """ if self.append_mode: return + import numpy as np + np.set_printoptions(linewidth=10**6) dw = csv.DictWriter(self.file_handle, self.record_type.__annotations__) dw.writeheader() # print(f'Saving {len(self.records)} for node {self.name}')