Skip to content

Commit

Permalink
Add confusion matrix to federated experiment logging
Browse files Browse the repository at this point in the history
  • Loading branch information
JMGaljaard committed May 8, 2022
1 parent 0e29ba5 commit 1a45017
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 46 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ WORKDIR /opt/federation-lab
RUN apt-get update \
&& apt-get install -y python3.9

# Setup pip3.9
# Setup pip3.9 for dependencies
RUN apt install -y curl python3.9-distutils
RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
RUN python3 get-pip.py

# Add Pre-downloaded models (otherwise needs be run every-time)
ADD data/ data/

Expand Down
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,27 @@ many small files is slow (as they will be compressed individually). The command
kubectl cp --namespace test fl-extractor:/opt/federation-lab/logging ./logging
```

⚠️ Make sure to test your configurations before running your experiments, to ensure that your data is written in a
persistent fashion. Writing to a directory that is not mounted to an NFS disk may result in dataloss.

⚠️ For federated learning experiments, data is written by the Federator to a disk, as such only a single mount of an
NFS is needed (see for example how [`V1PytorchTrainJob`](fltk/util/cluster/client.py)s are constructed by the Orchestrator).
It is advisable to load the data of Federated learning experiments using the `pandas` library, this can be done as follows.
For data exploration, using a Jupyter Notebook may be advisable.

```python
import pandas as pd
experiment_file = 'path/to/your/experiment.csv'
df = pd.read_csv(experiment_file)

df
```
This should display the contents of the csv file parsed into a `pd.DataFrame`, which should have the following schema.
```
round_id,train_duration,test_duration,round_duration,num_epochs,trained_items,accuracy,train_loss,test_loss,timestamp,node_name,confusion_matrix
```


### Launching an experiment
We have now completed the setup of the project and can continue by running actual experiments. If no errors occur, this
should. You may also skip this step and work on your code, but it might be good to test your deployment
Expand Down Expand Up @@ -469,4 +490,5 @@ Which will collect and run all the tests in the repository, and show in `verbose

## Known issues / Limitations

* Currently, there is no GPU support in the Docker containers.
* Currently, there is no GPU support in the Docker containers, for this the `Dockerfile` will need to be updated to
accomodate for this.
43 changes: 23 additions & 20 deletions fltk/core/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import time
from typing import Tuple, Any

import numpy as np
import sklearn
import time
import torch

from fltk.core.node import Node
Expand All @@ -20,7 +22,7 @@ def __init__(self, identifier: str, rank: int, world_size: int, config: Config):

self.loss_function = self.config.get_loss_function()()
self.optimizer = get_optimizer(self.config.optimizer)(self.net.parameters(),
**self.config.optimizer_args)
**self.config.optimizer_args)
self.scheduler = MinCapableStepLR(self.optimizer,
self.config.scheduler_step_size,
self.config.scheduler_gamma,
Expand Down Expand Up @@ -101,18 +103,18 @@ def train(self, num_epochs: int):

def set_tau_eff(self, total):
client_weight = self.get_client_datasize() / total
n = self.get_client_datasize() # pylint: disable=invalid-name
E = self.config.epochs # pylint: disable=invalid-name
n = self.get_client_datasize() # pylint: disable=invalid-name
E = self.config.epochs # pylint: disable=invalid-name
B = 16 # nicely hardcoded :) # pylint: disable=invalid-name
tau_eff = int(E * n / B) * client_weight
if hasattr(self.optimizer, 'set_tau_eff'):
self.optimizer.set_tau_eff(tau_eff)

def test(self):
def test(self) -> Tuple[float, float, np.array]:
"""
Function implementing federated learning test loop.
@return: Final running loss statistic and accuracy statistic.
@rtype: Tuple[float, float]
@return: Statistics on test-set given a (partially) trained model; accuracy, loss, and confusion matrix.
@rtype: Tuple[float, float, np.array]
"""
start_time = time.time()
correct = 0
Expand All @@ -126,42 +128,43 @@ def test(self):

outputs = self.net(images)

_, predicted = torch.max(outputs.data, 1) # pylint: disable=no-member
_, predicted = torch.max(outputs.data, 1) # pylint: disable=no-member
total += labels.size(0)
correct += (predicted == labels).sum().item()

targets_.extend(labels.cpu().view_as(predicted).numpy())
pred_.extend(predicted.cpu().numpy())

loss += self.loss_function(outputs, labels).item()

# Calculate learning statistics
loss /= len(self.dataset.get_test_loader().dataset)
accuracy = 100.0 * correct / total
# confusion_mat = confusion_matrix(targets_, pred_)
# accuracy_per_class = confusion_mat.diagonal() / confusion_mat.sum(1)
#
# class_precision = calculate_class_precision(confusion_mat)
# class_recall = calculate_class_recall(confusion_mat)

confusion_mat = sklearn.metrics.confusion_matrix(targets_, pred_)

end_time = time.time()
duration = end_time - start_time
self.logger.info(f'Test duration is {duration} seconds')
return accuracy, loss
return accuracy, loss, confusion_mat

def get_client_datasize(self): # pylint: disable=missing-function-docstring
def get_client_datasize(self): # pylint: disable=missing-function-docstring
return len(self.dataset.get_train_sampler())

def exec_round(self, num_epochs: int) -> Tuple[Any, Any, Any, Any, float, float, float]:
def exec_round(self, num_epochs: int) -> Tuple[Any, Any, Any, Any, float, float, float, np.array]:
"""
Function as access point for the Federator Node to kick-off a remote learning round on a client.
@param num_epochs: Number of epochs to run
@type num_epochs: int
@return: Tuple containing the statistics of the training round.
@rtype: Tuple
@return: Tuple containing the statistics of the training round; loss, weights, accuracy, test_loss, make-span,
training make-span, testing make-span, and confusion matrix.
@rtype: Tuple[Any, Any, Any, Any, float, float, float, np.array]
"""
start = time.time()

loss, weights = self.train(num_epochs)
time_mark_between = time.time()
accuracy, test_loss = self.test()
accuracy, test_loss, test_conf_matrix = self.test()

end = time.time()
round_duration = end - start
Expand All @@ -173,7 +176,7 @@ def exec_round(self, num_epochs: int) -> Tuple[Any, Any, Any, Any, float, float,
self.optimizer.pre_communicate()
for k, value in weights.items():
weights[k] = value.cpu()
return loss, weights, accuracy, test_loss, round_duration, train_duration, test_duration
return loss, weights, accuracy, test_loss, round_duration, train_duration, test_duration, test_conf_matrix

def __del__(self):
self.logger.info(f'Client {self.id} is stopping')
47 changes: 24 additions & 23 deletions fltk/core/federator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import copy

import numpy as np
import sklearn
import time
from dataclasses import dataclass
from pathlib import Path
from typing import List, Union
from typing import List, Union, Tuple

import torch

Expand Down Expand Up @@ -80,8 +83,8 @@ def create_clients(self):
client_name = f'client{client_id}'
client = Client(client_name, client_id, world_size, copy.deepcopy(self.config))
self.clients.append(
LocalClient(client_name, client, 0, DataContainer(client_name, self.config.output_path,
ClientRecord, self.config.save_data_append)))
LocalClient(client_name, client, 0, DataContainer(client_name, self.config.output_path,
ClientRecord, self.config.save_data_append)))
self.logger.info(f'Client "{client_name}" created')

def register_client(self, client_name: str, rank: int):
Expand All @@ -98,8 +101,8 @@ def register_client(self, client_name: str, rank: int):
if self.config.single_machine:
self.logger.warning('This function should not be called when in single machine mode!')
self.clients.append(
LocalClient(client_name, client_name, rank, DataContainer(client_name, self.config.output_path,
ClientRecord, self.config.save_data_append)))
LocalClient(client_name, client_name, rank, DataContainer(client_name, self.config.output_path,
ClientRecord, self.config.save_data_append)))

def stop_all_clients(self):
"""
Expand Down Expand Up @@ -202,14 +205,15 @@ def set_tau_eff(self):
# responses.append((client, _remote_method_async(Client.set_tau_eff, client.ref, total)))
# torch.futures.wait_all([x[1] for x in responses])

def test(self, net):
def test(self, net) -> Tuple[float, float, np.array]:
"""
Function to test the learned global model by the Federator. This does not take the client distributions in
account but is centralized.
@param net: Global network to be tested on the Federator centralized testing dataset.
@type net: torch.nn.Module
@return: Accuracy and loss of the global network on a (subset) of the testing data.
@rtype: Tuple[float, float]
@return: Accuracy and loss of the global network on a (subset) of the testing data, and the confusion matrix
corresponding to the models' predictions.
@rtype: Tuple[float, float, np.array]
"""
start_time = time.time()
correct = 0
Expand All @@ -233,15 +237,12 @@ def test(self, net):
loss += self.loss_function(outputs, labels).item()
loss /= len(self.dataset.get_test_loader().dataset)
accuracy = 100.0 * correct / total
# confusion_mat = confusion_matrix(targets_, pred_)
# accuracy_per_class = confusion_mat.diagonal() / confusion_mat.sum(1)
#
# class_precision = calculate_class_precision(confusion_mat)
# class_recall = calculate_class_recall(confusion_mat)
confusion_mat = sklearn.metrics.confusion_matrix(targets_, pred_)

end_time = time.time()
duration = end_time - start_time
self.logger.info(f'Test duration is {duration} seconds')
return accuracy, loss
return accuracy, loss, confusion_mat

def exec_round(self, com_round_id: int):
"""
Expand Down Expand Up @@ -271,18 +272,16 @@ def exec_round(self, com_round_id: int):
# Client training
training_futures: List[torch.Future] = [] # pylint: disable=no-member

# def cb_factory(future: torch.Future, method, client, client_weights, client_sizes, num_epochs, name):
# future.then(lambda x: method(x, client, client_weights, client_sizes, num_epochs, client.name))

def training_cb(fut: torch.Future, client_ref: LocalClient, client_weights, client_sizes, # pylint: disable=no-member
def training_cb(fut: torch.Future, client_ref: LocalClient, client_weights, client_sizes,
num_epochs): # pylint: disable=no-member
train_loss, weights, accuracy, test_loss, round_duration, train_duration, test_duration = fut.wait()
train_loss, weights, accuracy, test_loss, round_duration, train_duration, test_duration, c_mat = fut.wait()
self.logger.info(f'Training callback for client {client_ref.name} with accuracy={accuracy}')
client_weights[client_ref.name] = weights
client_data_size = self.message(client_ref.ref, Client.get_client_datasize)
client_sizes[client_ref.name] = client_data_size
client_ref.exp_data.append(ClientRecord(com_round_id, train_duration, test_duration, round_duration,
num_epochs, 0, accuracy, train_loss, test_loss))
c_record = ClientRecord(com_round_id, train_duration, test_duration, round_duration, num_epochs, 0,
accuracy, train_loss, test_loss, confusion_matrix=c_mat)
client_ref.exp_data.append(c_record)

for client in selected_clients:
future = self.message_async(client.ref, Client.exec_round, num_epochs)
Expand All @@ -304,10 +303,12 @@ def all_futures_done(futures: List[torch.Future]) -> bool: # pylint: disable=no
updated_model = self.aggregation_method(client_weights, client_sizes)
self.update_nn_parameters(updated_model)

test_accuracy, test_loss = self.test(self.net)
test_accuracy, test_loss, conf_mat = self.test(self.net)
self.logger.info(f'[Round {com_round_id:>3}] Federator has a accuracy of {test_accuracy} and loss={test_loss}')

end_time = time.time()
duration = end_time - start_time
self.exp_data.append(FederatorRecord(len(selected_clients), com_round_id, duration, test_loss, test_accuracy))
record = FederatorRecord(len(selected_clients), com_round_id, duration, test_loss, test_accuracy,
confusion_matrix=conf_mat)
self.exp_data.append(record)
self.logger.info(f'[Round {com_round_id:>3}] Round duration is {duration} seconds')
17 changes: 16 additions & 1 deletion fltk/util/data_container.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import csv

import numpy as np
import time
from dataclasses import dataclass
from pathlib import Path
Expand All @@ -20,7 +22,7 @@ class FederatorRecord(DataRecord):
# Accuracy per class?
timestamp: float = time.time()
node_name: str = ''

confusion_matrix: np.array = None

@dataclass
class ClientRecord(DataRecord):
Expand All @@ -36,9 +38,14 @@ class ClientRecord(DataRecord):
# Accuracy per class?
timestamp: float = time.time()
node_name: str = ''
confusion_matrix: np.array = None


class DataContainer:
"""
Datacontainer class for collecting experiment data. By default an 'Excel' compatible format is used by numpy and
the csv library. As such, it is adviced to use a library such as `pandas` to load data for analysis purposes.
"""
records: List[DataRecord]
file_name: str
file_handle: TextIO
Expand Down Expand Up @@ -76,8 +83,16 @@ def append(self, record: DataRecord):
self.file_handle.flush()

def save(self):
"""
Function to save the encapsulated data to the experiment file. The format is by default 'excel' compatible,
resulting in the capability of loading complex objects such as ndarrays as a field.
@return: None
@rtype: None
"""
if self.append_mode:
return
import numpy as np
np.set_printoptions(linewidth=10**6)
dw = csv.DictWriter(self.file_handle, self.record_type.__annotations__)
dw.writeheader()
# print(f'Saving {len(self.records)} for node {self.name}')
Expand Down

0 comments on commit 1a45017

Please sign in to comment.