From c23981a5fecdeaa23fa3c54db0e21156bca9f00b Mon Sep 17 00:00:00 2001 From: Bart Cox Date: Thu, 8 Apr 2021 14:43:57 +0200 Subject: [PATCH] fltk --- .dockerignore | 9 + .gitignore | 12 + Dockerfile | 41 +++ README.md | 96 +++++- configs/experiment.yaml | 16 + deploy/templates/client_stub_default.yml | 21 ++ deploy/templates/client_stub_medium.yml | 21 ++ deploy/templates/client_stub_slow.yml | 21 ++ deploy/templates/system_stub.yml | 23 ++ fltk/__init__.py | 2 + fltk/__main__.py | 39 +++ fltk/client.py | 331 +++++++++++++++++++ fltk/datasets/__init__.py | 4 + fltk/datasets/cifar10.py | 51 +++ fltk/datasets/cifar100.py | 45 +++ fltk/datasets/data_distribution/__init__.py | 1 + fltk/datasets/data_distribution/iid_equal.py | 19 ++ fltk/datasets/dataset.py | 113 +++++++ fltk/datasets/distributed/__init__.py | 4 + fltk/datasets/distributed/cifar10.py | 87 +++++ fltk/datasets/distributed/cifar100.py | 45 +++ fltk/datasets/distributed/dataset.py | 139 ++++++++ fltk/datasets/distributed/fashion_mnist.py | 33 ++ fltk/datasets/fashion_mnist.py | 33 ++ fltk/federator.py | 270 +++++++++++++++ fltk/launch.py | 68 ++++ fltk/nets/__init__.py | 6 + fltk/nets/cifar_100_resnet.py | 146 ++++++++ fltk/nets/cifar_100_vgg.py | 65 ++++ fltk/nets/cifar_10_cnn.py | 49 +++ fltk/nets/cifar_10_resnet.py | 115 +++++++ fltk/nets/fashion_mnist_cnn.py | 31 ++ fltk/nets/fashion_mnist_resnet.py | 62 ++++ fltk/nets/reddit_lstm.py | 61 ++++ fltk/nets/simple.py | 89 +++++ fltk/schedulers/__init__.py | 1 + fltk/schedulers/min_lr_step.py | 48 +++ fltk/strategy/__init__.py | 0 fltk/strategy/aggregation.py | 36 ++ fltk/strategy/client_selection.py | 4 + fltk/util/__init__.py | 0 fltk/util/arguments.py | 293 ++++++++++++++++ fltk/util/base_config.py | 313 ++++++++++++++++++ fltk/util/data_loader_utils.py | 90 +++++ fltk/util/default_models.py | 47 +++ fltk/util/fed_avg.py | 12 + fltk/util/generate_data_distribution.py | 70 ++++ fltk/util/iid_equal.py | 19 ++ fltk/util/label_replacement.py | 12 + fltk/util/log.py | 9 + fltk/util/results.py | 20 ++ fltk/util/tensor_converter.py | 20 ++ setup.py | 35 ++ 53 files changed, 3195 insertions(+), 2 deletions(-) create mode 100644 .dockerignore create mode 100644 Dockerfile create mode 100644 configs/experiment.yaml create mode 100644 deploy/templates/client_stub_default.yml create mode 100644 deploy/templates/client_stub_medium.yml create mode 100644 deploy/templates/client_stub_slow.yml create mode 100644 deploy/templates/system_stub.yml create mode 100644 fltk/__init__.py create mode 100644 fltk/__main__.py create mode 100644 fltk/client.py create mode 100644 fltk/datasets/__init__.py create mode 100644 fltk/datasets/cifar10.py create mode 100644 fltk/datasets/cifar100.py create mode 100644 fltk/datasets/data_distribution/__init__.py create mode 100644 fltk/datasets/data_distribution/iid_equal.py create mode 100644 fltk/datasets/dataset.py create mode 100644 fltk/datasets/distributed/__init__.py create mode 100644 fltk/datasets/distributed/cifar10.py create mode 100644 fltk/datasets/distributed/cifar100.py create mode 100644 fltk/datasets/distributed/dataset.py create mode 100644 fltk/datasets/distributed/fashion_mnist.py create mode 100644 fltk/datasets/fashion_mnist.py create mode 100644 fltk/federator.py create mode 100644 fltk/launch.py create mode 100644 fltk/nets/__init__.py create mode 100644 fltk/nets/cifar_100_resnet.py create mode 100644 fltk/nets/cifar_100_vgg.py create mode 100644 fltk/nets/cifar_10_cnn.py create mode 100644 fltk/nets/cifar_10_resnet.py create mode 100644 fltk/nets/fashion_mnist_cnn.py create mode 100644 fltk/nets/fashion_mnist_resnet.py create mode 100644 fltk/nets/reddit_lstm.py create mode 100644 fltk/nets/simple.py create mode 100644 fltk/schedulers/__init__.py create mode 100644 fltk/schedulers/min_lr_step.py create mode 100644 fltk/strategy/__init__.py create mode 100644 fltk/strategy/aggregation.py create mode 100644 fltk/strategy/client_selection.py create mode 100644 fltk/util/__init__.py create mode 100644 fltk/util/arguments.py create mode 100644 fltk/util/base_config.py create mode 100644 fltk/util/data_loader_utils.py create mode 100644 fltk/util/default_models.py create mode 100644 fltk/util/fed_avg.py create mode 100644 fltk/util/generate_data_distribution.py create mode 100644 fltk/util/iid_equal.py create mode 100644 fltk/util/label_replacement.py create mode 100644 fltk/util/log.py create mode 100644 fltk/util/results.py create mode 100644 fltk/util/tensor_converter.py create mode 100644 setup.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..fa6a6105 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,9 @@ +venv +default_models +data_loaders +data/cifar-10-batches-py +data/cifar-100-python.tar.gz +data/FashionMNIST +data/cifar-100-python +data/cifar-10-python.tar.gz +simple_example diff --git a/.gitignore b/.gitignore index b6e47617..eebb00f2 100644 --- a/.gitignore +++ b/.gitignore @@ -127,3 +127,15 @@ dmypy.json # Pyre type checker .pyre/ + + +venv +venv-* +default_models +data +data_loaders +simple_example +output +docker_data +.idea +*.tmp.txt \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..b42cd68c --- /dev/null +++ b/Dockerfile @@ -0,0 +1,41 @@ +# Base image to start with +FROM ubuntu:20.04 + +# Who maintains this DockerFile +MAINTAINER Bart Cox + +# Run build without interactive dialogue +ARG DEBIAN_FRONTEND=noninteractive + +ENV GLOO_SOCKET_IFNAME=eth0 +ENV TP_SOCKET_IFNAME=eth0 + +# Define the working directory of the current Docker container +WORKDIR /opt/federation-lab + +# Update the Ubuntu software repository +RUN apt-get update \ + && apt-get install -y vim curl python3 python3-pip net-tools iproute2 + +# Copy the current folder to the working directory +COPY setup.py ./ + +# Install all required packages for the generator +RUN pip3 setup.py install + +#RUN mkdir -p ./data/MNIST +#COPY ./data/MNIST ../data/MNIST +ADD fltk ./fedsim +#RUN ls -la +COPY federated_learning.py ./ +COPY custom_mnist.py ./ +#RUN ls -la ./fedsim + +# Expose the container's port to the host OS +EXPOSE 5000 + +# Run command by default for the executing container +# CMD ["python3", "/opt/Generatrix/rpc_parameter_server.py", "--world_size=2", "--rank=0", "--master_addr=192.168.144.2"] + +#CMD python3 /opt/federation-lab/rpc_parameter_server.py --world_size=$WORLD_SIZE --rank=$RANK --master_addr=10.5.0.11 +CMD python3 /opt/federation-lab/federated_learning.py $RANK $WORLD_SIZE 10.5.0.11 \ No newline at end of file diff --git a/README.md b/README.md index 2965f7d8..a562bb43 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,94 @@ -# fltk -Federation Learning Toolkit +# FLTK - Federation Learning Toolkit +[![License](https://img.shields.io/badge/license-BSD-blue.svg)](LICENSE) + +This toolkit is can be used to run Federated Learning experiments. +Pytorch Distributed ([docs](https://pytorch.org/tutorials/beginner/dist_overview.html)) is used in this project. +The goal if this project is to launch Federated Learning nodes in truly distribution fashion. + +## Project structure + +TBD + +## Models + +* Cifar10-CNN +* Cifar10-ResNet +* Cifar100-ResNet +* Cifar100-VGG +* Fashion-MNIST-CNN +* Fashion-MNIST-ResNet +* Reddit-LSTM + +## Datasets + +* Cifar10 +* Cifar100 +* Fashion-MNIST + +## Prerequisites + +When running in docker containers the following dependencies need to be installed: + +* Docker +* Docker-compose + +## Install +```bash +python3 setup.py install +``` + +[comment]: <> (```bash) + +[comment]: <> (pip3 install -r ./requirements.txt) + +[comment]: <> (```) + +## Examples +
Show Examples + +

+ +### Single machine (Native) + +#### Launch single client +Launch Federator +```bash +python3 -m fltk single configs/experiment.yaml --rank=0 +``` +Launch Client +```bash +python3 -m fltk single configs/experiment.yaml --rank=1 +``` + +#### Spawn FL system +```bash +python3 -m fedsim spawn configs/experiment.yaml +``` + +### Two machines (Native) +To start a cross-machine FL system you have to configure the network interface connected to your network. +For example, if your machine is connected to the network via the wifi interface (for example with the name `wlo1`) this has to be configured as shown below: +```bash +os.environ['GLOO_SOCKET_IFNAME'] = 'wlo1' +os.environ['TP_SOCKET_IFNAME'] = 'wlo1' +``` +Use `ifconfig` to find the name of the interface name on your machine. + +### Docker Compose + +```bash +docker-compose up +``` + +TBD + +### Google Cloud Platform +TBD + +

+
+ +## Known issues + +* Currently, there is no GPU support. Not for native nor for docker compose +* First epoch only can be slow (6x - 8x slower) \ No newline at end of file diff --git a/configs/experiment.yaml b/configs/experiment.yaml new file mode 100644 index 00000000..16640559 --- /dev/null +++ b/configs/experiment.yaml @@ -0,0 +1,16 @@ +--- +# Experiment configuration +total_epochs: 10 +epochs_per_cycle: 1 +wait_for_clients: true +net: Cifar10CNN +dataset: cifar10 +experiment_prefix: 'experiment_multi_machine' +output_location: 'output' +tensor_board_active: true +clients_per_round: 2 +system: + federator: + hostname: '192.168.0.129' + clients: + amount: 2 diff --git a/deploy/templates/client_stub_default.yml b/deploy/templates/client_stub_default.yml new file mode 100644 index 00000000..d65e2624 --- /dev/null +++ b/deploy/templates/client_stub_default.yml @@ -0,0 +1,21 @@ +client_name: # name can be anything +# container_name: federation-lab-client2 # what the name for this container would be + restart: "no" # if it crashes for example + build: . # look for the docker file where this file is currently located + volumes: + - ./docker_data:/opt/federation-lab/data + - ./default_models:/opt/federation-lab/default_models + - ./data_loaders:/opt/federation-lab/data_loaders + environment: + - PYTHONUNBUFFERED=1 + - RANK={rank} + - WORLD_SIZE={world_size} + ports: + - "5002:5000" # {machine-port}:{docker-port} + depends_on: + - "fl_server" + deploy: + resources: + limits: + cpus: '1.25' + memory: 1024M diff --git a/deploy/templates/client_stub_medium.yml b/deploy/templates/client_stub_medium.yml new file mode 100644 index 00000000..7083a3b6 --- /dev/null +++ b/deploy/templates/client_stub_medium.yml @@ -0,0 +1,21 @@ +client_name: # name can be anything +# container_name: federation-lab-client2 # what the name for this container would be + restart: "no" # if it crashes for example + build: . # look for the docker file where this file is currently located + volumes: + - ./docker_data:/opt/federation-lab/data + - ./default_models:/opt/federation-lab/default_models + - ./data_loaders:/opt/federation-lab/data_loaders + environment: + - PYTHONUNBUFFERED=1 + - RANK={rank} + - WORLD_SIZE={world_size} + ports: + - "5002:5000" # {machine-port}:{docker-port} + depends_on: + - "fl_server" + deploy: + resources: + limits: + cpus: '0.75' + memory: 1024M diff --git a/deploy/templates/client_stub_slow.yml b/deploy/templates/client_stub_slow.yml new file mode 100644 index 00000000..03a3fe48 --- /dev/null +++ b/deploy/templates/client_stub_slow.yml @@ -0,0 +1,21 @@ +client_name: # name can be anything +# container_name: federation-lab-client2 # what the name for this container would be + restart: "no" # if it crashes for example + build: . # look for the docker file where this file is currently located + volumes: + - ./docker_data:/opt/federation-lab/data + - ./default_models:/opt/federation-lab/default_models + - ./data_loaders:/opt/federation-lab/data_loaders + environment: + - PYTHONUNBUFFERED=1 + - RANK={rank} + - WORLD_SIZE={world_size} + ports: + - "5002:5000" # {machine-port}:{docker-port} + depends_on: + - "fl_server" + deploy: + resources: + limits: + cpus: '0.5' + memory: 1024M diff --git a/deploy/templates/system_stub.yml b/deploy/templates/system_stub.yml new file mode 100644 index 00000000..eda5fc6d --- /dev/null +++ b/deploy/templates/system_stub.yml @@ -0,0 +1,23 @@ +# creating a multi-container docker +version: "3.3" +services: + fl_server: # name can be anything + container_name: federation-lab-server # what the name for this container would be + restart: "no" # if it crashes for example + build: . # look for the docker file where this file is currently located + volumes: +# - ./data/MNIST:/opt/federation-lab/data/MNIST + - ./output:/opt/federation-lab/output + environment: + - PYTHONUNBUFFERED=1 + - RANK=0 + - WORLD_SIZE={world_size} + ports: + - "5000:5000" # {machine-port}:{docker-port} + networks: + default: + ipv4_address: 10.5.0.11 +networks: + default: + external: + name: local_network_dev \ No newline at end of file diff --git a/fltk/__init__.py b/fltk/__init__.py new file mode 100644 index 00000000..1032f085 --- /dev/null +++ b/fltk/__init__.py @@ -0,0 +1,2 @@ + +__version__ = '0.1.1' \ No newline at end of file diff --git a/fltk/__main__.py b/fltk/__main__.py new file mode 100644 index 00000000..04feb462 --- /dev/null +++ b/fltk/__main__.py @@ -0,0 +1,39 @@ +import os +import sys +import torch.distributed.rpc as rpc +import logging + +import yaml +import argparse + +import torch.multiprocessing as mp +from fltk.federator import Federator +from fltk.launch import run_single, run_spawn +from fltk.util.base_config import BareConfig + +logging.basicConfig(level=logging.DEBUG) + +def main(): + parser = argparse.ArgumentParser(description='Experiment launcher for the Federated Learning Testbed') + parser.add_argument('mode', choices=['single', 'spawn']) + parser.add_argument('config', type=str) + parser.add_argument('--rank', type=int) + + args = parser.parse_args() + with open(args.config) as file: + cfg = BareConfig() + yaml_data = yaml.load(file, Loader=yaml.FullLoader) + cfg.merge_yaml(yaml_data) + if args.mode == 'single': + if args.rank is None: + print('Missing rank argument when in \'single\' mode!') + parser.print_help() + exit(1) + world_size = yaml_data['system']['clients']['amount'] + 1 + master_address = yaml_data['system']['federator']['hostname'] + run_single(rank=args.rank, world_size=world_size, host=master_address, args=cfg) + else: + run_spawn(cfg) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/fltk/client.py b/fltk/client.py new file mode 100644 index 00000000..50ac9c1b --- /dev/null +++ b/fltk/client.py @@ -0,0 +1,331 @@ +import copy +import datetime +import os +import random +import time +from dataclasses import dataclass +from typing import List + +import torch +from torch.distributed import rpc +import logging +import numpy as np +from sklearn.metrics import confusion_matrix +from sklearn.metrics import classification_report + +from fltk.datasets.distributed import DistCIFAR10Dataset +from fltk.schedulers import MinCapableStepLR +from fltk.util.arguments import Arguments +from fltk.util.log import FLLogger + +import yaml + +from fltk.util.results import EpochData + +logging.basicConfig(level=logging.DEBUG) + + + +def _call_method(method, rref, *args, **kwargs): + """helper for _remote_method()""" + return method(rref.local_value(), *args, **kwargs) + +def _remote_method(method, rref, *args, **kwargs): + """ + executes method(*args, **kwargs) on the from the machine that owns rref + + very similar to rref.remote().method(*args, **kwargs), but method() doesn't have to be in the remote scope + """ + args = [method, rref] + list(args) + return rpc.rpc_sync(rref.owner(), _call_method, args=args, kwargs=kwargs) + +def _remote_method_async(method, rref, *args, **kwargs): + args = [method, rref] + list(args) + return rpc.rpc_async(rref.owner(), _call_method, args=args, kwargs=kwargs) + +class Client: + counter = 0 + finished_init = False + dataset = None + epoch_results: List[EpochData] = [] + epoch_counter = 0 + + + def __init__(self, id, log_rref, rank, world_size, config = None): + logging.info(f'Welcome to client {id}') + self.id = id + self.log_rref = log_rref + self.rank = rank + self.world_size = world_size + # self.args = Arguments(logging) + self.args = config + self.args.init_logger(logging) + self.device = self.init_device() + self.set_net(self.load_default_model()) + self.loss_function = self.args.get_loss_function()() + self.optimizer = torch.optim.SGD(self.net.parameters(), + lr=self.args.get_learning_rate(), + momentum=self.args.get_momentum()) + self.scheduler = MinCapableStepLR(self.args.get_logger(), self.optimizer, + self.args.get_scheduler_step_size(), + self.args.get_scheduler_gamma(), + self.args.get_min_lr()) + + def init_device(self): + if torch.cuda.is_available(): + return torch.device("cuda:0") + else: + return torch.device("cpu") + + def ping(self): + return 'pong' + + def rpc_test(self): + sleep_time = random.randint(1, 5) + time.sleep(sleep_time) + self.local_log(f'sleep for {sleep_time} seconds') + self.counter += 1 + log_line = f'Number of times called: {self.counter}' + self.local_log(log_line) + self.remote_log(log_line) + + def remote_log(self, message): + _remote_method_async(FLLogger.log, self.log_rref, self.id, message, time.time()) + + def local_log(self, message): + logging.info(f'[{self.id}: {time.time()}]: {message}') + + # def init_device(self, device: str): + # return torch.device(device) + + def set_configuration(self, config: str): + yaml_config = yaml.safe_load(config) + + def init(self): + pass + + def init_dataloader(self, ): + self.args.distributed = True + self.args.rank = self.rank + self.args.world_size = self.world_size + self.dataset = DistCIFAR10Dataset(self.args) + self.finished_init = True + logging.info('Done with init') + + def is_ready(self): + return self.finished_init + + def set_net(self, net): + self.net = net + self.net.to(self.device) + + def load_model_from_file(self, model_file_path): + model_class = self.args.get_net() + default_model_path = os.path.join(self.args.get_default_model_folder_path(), model_class.__name__ + ".model") + + return self.load_model_from_file(default_model_path) + + def get_nn_parameters(self): + """ + Return the NN's parameters. + """ + return self.net.state_dict() + + def load_default_model(self): + """ + Load a model from default model file. + + This is used to ensure consistent default model behavior. + """ + model_class = self.args.get_net() + default_model_path = os.path.join(self.args.get_default_model_folder_path(), model_class.__name__ + ".model") + + return self.load_model_from_file(default_model_path) + + def load_model_from_file(self, model_file_path): + """ + Load a model from a file. + + :param model_file_path: string + """ + model_class = self.args.get_net() + model = model_class() + + if os.path.exists(model_file_path): + try: + model.load_state_dict(torch.load(model_file_path)) + except: + self.args.get_logger().warning("Couldn't load model. Attempting to map CUDA tensors to CPU to solve error.") + + model.load_state_dict(torch.load(model_file_path, map_location=torch.device('cpu'))) + else: + self.args.get_logger().warning("Could not find model: {}".format(model_file_path)) + + return model + + def get_client_index(self): + """ + Returns the client index. + """ + return self.client_idx + + def get_nn_parameters(self): + """ + Return the NN's parameters. + """ + return self.net.state_dict() + + def update_nn_parameters(self, new_params): + """ + Update the NN's parameters. + + :param new_params: New weights for the neural network + :type new_params: dict + """ + self.net.load_state_dict(copy.deepcopy(new_params), strict=True) + self.remote_log(f'Weigths of the model are updated') + + def load_default_model(self): + """ + Load a model from default model file. + + This is used to ensure consistent default model behavior. + """ + model_class = self.args.get_net() + default_model_path = os.path.join(self.args.get_default_model_folder_path(), model_class.__name__ + ".model") + + return self.load_model_from_file(default_model_path) + + def train(self, epoch): + """ + :param epoch: Current epoch # + :type epoch: int + """ + # self.net.train() + + # save model + if self.args.should_save_model(epoch): + self.save_model(epoch, self.args.get_epoch_save_start_suffix()) + + running_loss = 0.0 + final_running_loss = 0.0 + if self.args.distributed: + self.dataset.train_sampler.set_epoch(epoch) + + max_cycles = 50 + for i, (inputs, labels) in enumerate(self.dataset.get_train_loader(), 0): + inputs, labels = inputs.to(self.device), labels.to(self.device) + + # zero the parameter gradients + self.optimizer.zero_grad() + + # forward + backward + optimize + outputs = self.net(inputs) + loss = self.loss_function(outputs, labels) + loss.backward() + self.optimizer.step() + + # print statistics + running_loss += loss.item() + if i % self.args.get_log_interval() == 0: + self.args.get_logger().info('[%d, %5d] loss: %.3f' % (epoch, i, running_loss / self.args.get_log_interval())) + final_running_loss = running_loss / self.args.get_log_interval() + running_loss = 0.0 + # self.args.get_logger().info('[%d, %5d] loss: %.3f' % (epoch, i, running_loss / self.args.get_log_interval())) + if i >= max_cycles: + break + + self.scheduler.step() + + # save model + if self.args.should_save_model(epoch): + self.save_model(epoch, self.args.get_epoch_save_end_suffix()) + + return final_running_loss, self.get_nn_parameters() + + def test(self): + self.net.eval() + + correct = 0 + total = 0 + targets_ = [] + pred_ = [] + loss = 0.0 + with torch.no_grad(): + for (images, labels) in self.dataset.get_test_loader(): + images, labels = images.to(self.device), labels.to(self.device) + + outputs = self.net(images) + _, predicted = torch.max(outputs.data, 1) + total += labels.size(0) + correct += (predicted == labels).sum().item() + + targets_.extend(labels.cpu().view_as(predicted).numpy()) + pred_.extend(predicted.cpu().numpy()) + + loss += self.loss_function(outputs, labels).item() + + accuracy = 100 * correct / total + # print(targets_) + # print(pred_) + confusion_mat = confusion_matrix(targets_, pred_) + + class_precision = self.calculate_class_precision(confusion_mat) + class_recall = self.calculate_class_recall(confusion_mat) + + self.args.get_logger().debug('Test set: Accuracy: {}/{} ({:.0f}%)'.format(correct, total, accuracy)) + self.args.get_logger().debug('Test set: Loss: {}'.format(loss)) + self.args.get_logger().debug("Classification Report:\n" + classification_report(targets_, pred_)) + self.args.get_logger().debug("Confusion Matrix:\n" + str(confusion_mat)) + self.args.get_logger().debug("Class precision: {}".format(str(class_precision))) + self.args.get_logger().debug("Class recall: {}".format(str(class_recall))) + + return accuracy, loss, class_precision, class_recall + + def run_epochs(self, num_epoch): + start_time_train = datetime.datetime.now() + loss = weights = None + for e in range(num_epoch): + loss, weights = self.train(self.epoch_counter) + self.epoch_counter += 1 + elapsed_time_train = datetime.datetime.now() - start_time_train + train_time_ms = int(elapsed_time_train.total_seconds()*1000) + + start_time_test = datetime.datetime.now() + accuracy, test_loss, class_precision, class_recall = self.test() + elapsed_time_test = datetime.datetime.now() - start_time_test + test_time_ms = int(elapsed_time_test.total_seconds()*1000) + + data = EpochData(self.epoch_counter, train_time_ms, test_time_ms, loss, accuracy, test_loss, class_precision, class_recall, client_id=self.id) + self.epoch_results.append(data) + return data, weights + + def save_model(self, epoch, suffix): + """ + Saves the model if necessary. + """ + self.args.get_logger().debug("Saving model to flat file storage. Save #{}", epoch) + + if not os.path.exists(self.args.get_save_model_folder_path()): + os.mkdir(self.args.get_save_model_folder_path()) + + full_save_path = os.path.join(self.args.get_save_model_folder_path(), "model_" + str(self.client_idx) + "_" + str(epoch) + "_" + suffix + ".model") + torch.save(self.get_nn_parameters(), full_save_path) + + def calculate_class_precision(self, confusion_mat): + """ + Calculates the precision for each class from a confusion matrix. + """ + return np.diagonal(confusion_mat) / np.sum(confusion_mat, axis=0) + + def calculate_class_recall(self, confusion_mat): + """ + Calculates the recall for each class from a confusion matrix. + """ + return np.diagonal(confusion_mat) / np.sum(confusion_mat, axis=1) + + def get_client_datasize(self): + return len(self.dataset.get_train_sampler()) + + def __del__(self): + print(f'Client {self.id} is stopping') diff --git a/fltk/datasets/__init__.py b/fltk/datasets/__init__.py new file mode 100644 index 00000000..56ca5b05 --- /dev/null +++ b/fltk/datasets/__init__.py @@ -0,0 +1,4 @@ +from .dataset import Dataset +from .cifar10 import CIFAR10Dataset +from .cifar100 import CIFAR100Dataset +from .fashion_mnist import FashionMNISTDataset diff --git a/fltk/datasets/cifar10.py b/fltk/datasets/cifar10.py new file mode 100644 index 00000000..82e375e4 --- /dev/null +++ b/fltk/datasets/cifar10.py @@ -0,0 +1,51 @@ +from .dataset import Dataset +from torchvision import datasets +from torchvision import transforms +from torch.utils.data import DataLoader, DistributedSampler + + +class CIFAR10Dataset(Dataset): + + def __init__(self, args): + super(CIFAR10Dataset, self).__init__(args) + + def load_train_dataset(self): + self.get_args().get_logger().debug("Loading CIFAR10 train data") + + normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) + transform = transforms.Compose([ + transforms.RandomHorizontalFlip(), + transforms.RandomCrop(32, 4), + transforms.ToTensor(), + normalize + ]) + + train_dataset = datasets.CIFAR10(root=self.get_args().get_data_path(), train=True, download=True, transform=transform) + sampler = DistributedSampler(train_dataset, rank=self.args.get_rank(), num_replicas=self.args.get_world_size()) if self.args.get_distributed() else None + train_loader = DataLoader(train_dataset, batch_size=len(train_dataset), sampler=sampler) + self.args.set_sampler(sampler) + + train_data = self.get_tuple_from_data_loader(train_loader) + + self.get_args().get_logger().debug("Finished loading CIFAR10 train data") + + return train_data + + def load_test_dataset(self): + self.get_args().get_logger().debug("Loading CIFAR10 test data") + + normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) + transform = transforms.Compose([ + transforms.ToTensor(), + normalize + ]) + test_dataset = datasets.CIFAR10(root=self.get_args().get_data_path(), train=False, download=True, transform=transform) + sampler = DistributedSampler(test_dataset, rank=self.args.get_rank(), num_replicas=self.args.get_world_size()) if self.args.get_distributed() else None + test_loader = DataLoader(test_dataset, batch_size=len(test_dataset), sampler=sampler) + self.args.set_sampler(sampler) + + test_data = self.get_tuple_from_data_loader(test_loader) + + self.get_args().get_logger().debug("Finished loading CIFAR10 test data") + + return test_data diff --git a/fltk/datasets/cifar100.py b/fltk/datasets/cifar100.py new file mode 100644 index 00000000..186a98dc --- /dev/null +++ b/fltk/datasets/cifar100.py @@ -0,0 +1,45 @@ +from .dataset import Dataset +from torchvision import datasets +from torchvision import transforms +from torch.utils.data import DataLoader + +class CIFAR100Dataset(Dataset): + + def __init__(self, args): + super(CIFAR100Dataset, self).__init__(args) + + def load_train_dataset(self): + self.get_args().get_logger().debug("Loading CIFAR100 train data") + + normalize = transforms.Normalize(mean=[0.507, 0.487, 0.441], std=[0.267, 0.256, 0.276]) + transform = transforms.Compose([ + transforms.RandomHorizontalFlip(), + transforms.RandomCrop(32, 4), + transforms.ToTensor(), + normalize + ]) + train_dataset = datasets.CIFAR100(root=self.get_args().get_data_path(), train=True, download=True, transform=transform) + train_loader = DataLoader(train_dataset, batch_size=len(train_dataset)) + + train_data = self.get_tuple_from_data_loader(train_loader) + + self.get_args().get_logger().debug("Finished loading CIFAR100 train data") + + return train_data + + def load_test_dataset(self): + self.get_args().get_logger().debug("Loading CIFAR100 test data") + + normalize = transforms.Normalize(mean=[0.507, 0.487, 0.441], std=[0.267, 0.256, 0.276]) + transform = transforms.Compose([ + transforms.ToTensor(), + normalize + ]) + test_dataset = datasets.CIFAR100(root=self.get_args().get_data_path(), train=False, download=True, transform=transform) + test_loader = DataLoader(test_dataset, batch_size=len(test_dataset)) + + test_data = self.get_tuple_from_data_loader(test_loader) + + self.get_args().get_logger().debug("Finished loading CIFAR100 test data") + + return test_data diff --git a/fltk/datasets/data_distribution/__init__.py b/fltk/datasets/data_distribution/__init__.py new file mode 100644 index 00000000..ab7c6369 --- /dev/null +++ b/fltk/datasets/data_distribution/__init__.py @@ -0,0 +1 @@ +from .iid_equal import distribute_batches_equally diff --git a/fltk/datasets/data_distribution/iid_equal.py b/fltk/datasets/data_distribution/iid_equal.py new file mode 100644 index 00000000..c47bcc16 --- /dev/null +++ b/fltk/datasets/data_distribution/iid_equal.py @@ -0,0 +1,19 @@ +import torch + +def distribute_batches_equally(train_data_loader, num_workers): + """ + Gives each worker the same number of batches of training data. + + :param train_data_loader: Training data loader + :type train_data_loader: torch.utils.data.DataLoader + :param num_workers: number of workers + :type num_workers: int + """ + distributed_dataset = [[] for i in range(num_workers)] + + for batch_idx, (data, target) in enumerate(train_data_loader): + worker_idx = batch_idx % num_workers + + distributed_dataset[worker_idx].append((data, target)) + + return distributed_dataset diff --git a/fltk/datasets/dataset.py b/fltk/datasets/dataset.py new file mode 100644 index 00000000..17e6a4c6 --- /dev/null +++ b/fltk/datasets/dataset.py @@ -0,0 +1,113 @@ +from abc import abstractmethod +from torch.utils.data import DataLoader +from torch.utils.data import TensorDataset +import torch +import numpy + +from fltk.util.arguments import Arguments + + +class Dataset: + + def __init__(self, args: Arguments): + self.args = args + self.train_dataset = self.load_train_dataset() + self.test_dataset = self.load_test_dataset() + + def get_args(self): + """ + Returns the arguments. + + :return: Arguments + """ + return self.args + + def get_train_dataset(self): + """ + Returns the train dataset. + + :return: tuple + """ + return self.train_dataset + + def get_test_dataset(self): + """ + Returns the test dataset. + + :return: tuple + """ + return self.test_dataset + + @abstractmethod + def load_train_dataset(self): + """ + Loads & returns the training dataset. + + :return: tuple + """ + raise NotImplementedError("load_train_dataset() isn't implemented") + + @abstractmethod + def load_test_dataset(self): + """ + Loads & returns the test dataset. + + :return: tuple + """ + raise NotImplementedError("load_test_dataset() isn't implemented") + + def get_train_loader(self, batch_size, **kwargs): + """ + Return the data loader for the train dataset. + + :param batch_size: batch size of data loader + :type batch_size: int + :return: torch.utils.data.DataLoader + """ + return Dataset.get_data_loader_from_data(batch_size, self.train_dataset[0], self.train_dataset[1], **kwargs) + + def get_test_loader(self, batch_size, **kwargs): + """ + Return the data loader for the test dataset. + + :param batch_size: batch size of data loader + :type batch_size: int + :return: torch.utils.data.DataLoader + """ + return Dataset.get_data_loader_from_data(batch_size, self.test_dataset[0], self.test_dataset[1], **kwargs) + + @staticmethod + def get_data_loader_from_data(batch_size, X, Y, **kwargs): + """ + Get a data loader created from a given set of data. + + :param batch_size: batch size of data loader + :type batch_size: int + :param X: data features + :type X: numpy.Array() + :param Y: data labels + :type Y: numpy.Array() + :return: torch.utils.data.DataLoader + """ + X_torch = torch.from_numpy(X).float() + + if "classification_problem" in kwargs and kwargs["classification_problem"] == False: + Y_torch = torch.from_numpy(Y).float() + else: + Y_torch = torch.from_numpy(Y).long() + dataset = TensorDataset(X_torch, Y_torch) + + kwargs.pop("classification_problem", None) + + return DataLoader(dataset, batch_size=batch_size, **kwargs) + + @staticmethod + def get_tuple_from_data_loader(data_loader): + """ + Get a tuple representation of the data stored in a data loader. + + :param data_loader: data loader to get data from + :type data_loader: torch.utils.data.DataLoader + :return: tuple + """ + return (next(iter(data_loader))[0].numpy(), next(iter(data_loader))[1].numpy()) diff --git a/fltk/datasets/distributed/__init__.py b/fltk/datasets/distributed/__init__.py new file mode 100644 index 00000000..b6c5e3bf --- /dev/null +++ b/fltk/datasets/distributed/__init__.py @@ -0,0 +1,4 @@ +from .dataset import DistDataset +from .cifar10 import DistCIFAR10Dataset +# from .cifar100 import CIFAR100Dataset +# from .fashion_mnist import FashionMNISTDataset diff --git a/fltk/datasets/distributed/cifar10.py b/fltk/datasets/distributed/cifar10.py new file mode 100644 index 00000000..cda04807 --- /dev/null +++ b/fltk/datasets/distributed/cifar10.py @@ -0,0 +1,87 @@ +from torchvision import datasets +from torchvision import transforms +from torch.utils.data import DataLoader, DistributedSampler + +from fltk.datasets.distributed.dataset import DistDataset + + +class DistCIFAR10Dataset(DistDataset): + + def __init__(self, args): + super(DistCIFAR10Dataset, self).__init__(args) + self.init_train_dataset() + self.init_test_dataset() + + + def init_train_dataset(self): + dist_loader_text = "distributed" if self.args.get_distributed() else "" + self.get_args().get_logger().debug(f"Loading '{dist_loader_text}' CIFAR10 train data") + normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) + transform = transforms.Compose([ + transforms.RandomHorizontalFlip(), + transforms.RandomCrop(32, 4), + transforms.ToTensor(), + normalize + ]) + self.train_dataset = datasets.CIFAR10(root=self.get_args().get_data_path(), train=True, download=True, + transform=transform) + self.train_sampler = DistributedSampler(self.train_dataset, rank=self.args.get_rank(), + num_replicas=self.args.get_world_size()) if self.args.get_distributed() else None + self.train_loader = DataLoader(self.train_dataset, batch_size=16, sampler=self.train_sampler) + # self.train_loader = DataLoader(self.train_dataset, batch_size=len(self.train_dataset), sampler=self.train_sampler) + + def init_test_dataset(self): + self.get_args().get_logger().debug("Loading CIFAR10 test data") + + normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) + transform = transforms.Compose([ + transforms.ToTensor(), + normalize + ]) + self.test_dataset = datasets.CIFAR10(root=self.get_args().get_data_path(), train=False, download=True, + transform=transform) + self.test_sampler = DistributedSampler(self.test_dataset, rank=self.args.get_rank(), + num_replicas=self.args.get_world_size()) if self.args.get_distributed() else None + # self.test_sampler = None + self.test_loader = DataLoader(self.test_dataset, batch_size=16, sampler=self.test_sampler) + + def load_train_dataset(self): + self.get_args().get_logger().debug("Loading CIFAR10 train data") + + normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) + transform = transforms.Compose([ + transforms.RandomHorizontalFlip(), + transforms.RandomCrop(32, 4), + transforms.ToTensor(), + normalize + ]) + + train_dataset = datasets.CIFAR10(root=self.get_args().get_data_path(), train=True, download=True, transform=transform) + sampler = DistributedSampler(train_dataset, rank=self.args.get_rank(), num_replicas=self.args.get_world_size()) if self.args.get_distributed() else None + train_loader = DataLoader(train_dataset, batch_size=len(train_dataset), sampler=sampler) + self.args.set_sampler(sampler) + + train_data = self.get_tuple_from_data_loader(train_loader) + dist_loader_text = "distributed" if self.args.get_distributed() else "" + self.get_args().get_logger().debug(f"Finished loading '{dist_loader_text}' CIFAR10 train data") + + return train_data + + def load_test_dataset(self): + self.get_args().get_logger().debug("Loading CIFAR10 test data") + + normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) + transform = transforms.Compose([ + transforms.ToTensor(), + normalize + ]) + test_dataset = datasets.CIFAR10(root=self.get_args().get_data_path(), train=False, download=True, transform=transform) + sampler = DistributedSampler(test_dataset, rank=self.args.get_rank(), num_replicas=self.args.get_world_size()) if self.args.get_distributed() else None + test_loader = DataLoader(test_dataset, batch_size=len(test_dataset), sampler=sampler) + self.args.set_sampler(sampler) + + test_data = self.get_tuple_from_data_loader(test_loader) + + self.get_args().get_logger().debug("Finished loading CIFAR10 test data") + + return test_data diff --git a/fltk/datasets/distributed/cifar100.py b/fltk/datasets/distributed/cifar100.py new file mode 100644 index 00000000..186a98dc --- /dev/null +++ b/fltk/datasets/distributed/cifar100.py @@ -0,0 +1,45 @@ +from .dataset import Dataset +from torchvision import datasets +from torchvision import transforms +from torch.utils.data import DataLoader + +class CIFAR100Dataset(Dataset): + + def __init__(self, args): + super(CIFAR100Dataset, self).__init__(args) + + def load_train_dataset(self): + self.get_args().get_logger().debug("Loading CIFAR100 train data") + + normalize = transforms.Normalize(mean=[0.507, 0.487, 0.441], std=[0.267, 0.256, 0.276]) + transform = transforms.Compose([ + transforms.RandomHorizontalFlip(), + transforms.RandomCrop(32, 4), + transforms.ToTensor(), + normalize + ]) + train_dataset = datasets.CIFAR100(root=self.get_args().get_data_path(), train=True, download=True, transform=transform) + train_loader = DataLoader(train_dataset, batch_size=len(train_dataset)) + + train_data = self.get_tuple_from_data_loader(train_loader) + + self.get_args().get_logger().debug("Finished loading CIFAR100 train data") + + return train_data + + def load_test_dataset(self): + self.get_args().get_logger().debug("Loading CIFAR100 test data") + + normalize = transforms.Normalize(mean=[0.507, 0.487, 0.441], std=[0.267, 0.256, 0.276]) + transform = transforms.Compose([ + transforms.ToTensor(), + normalize + ]) + test_dataset = datasets.CIFAR100(root=self.get_args().get_data_path(), train=False, download=True, transform=transform) + test_loader = DataLoader(test_dataset, batch_size=len(test_dataset)) + + test_data = self.get_tuple_from_data_loader(test_loader) + + self.get_args().get_logger().debug("Finished loading CIFAR100 test data") + + return test_data diff --git a/fltk/datasets/distributed/dataset.py b/fltk/datasets/distributed/dataset.py new file mode 100644 index 00000000..46458de1 --- /dev/null +++ b/fltk/datasets/distributed/dataset.py @@ -0,0 +1,139 @@ +from abc import abstractmethod +from torch.utils.data import DataLoader +from torch.utils.data import TensorDataset +import torch +import numpy + +from fltk.util.arguments import Arguments + + +class DistDataset: + + train_sampler = None + test_sampler = None + train_dataset = None + test_dataset = None + train_loader = None + test_loader = None + def __init__(self, args: Arguments): + self.args = args + # self.train_dataset = self.load_train_dataset() + # self.test_dataset = self.load_test_dataset() + + def get_args(self): + """ + Returns the arguments. + + :return: Arguments + """ + return self.args + + # def get_train_dataset(self): + # """ + # Returns the train dataset. + # + # :return: tuple + # """ + # return self.train_dataset + # + # def get_test_dataset(self): + # """ + # Returns the test dataset. + # + # :return: tuple + # """ + # return self.test_dataset + + def get_train_loader(self): + return self.train_loader + + def get_test_loader(self): + return self.test_loader + + def get_train_sampler(self): + return self.train_sampler + + def get_test_sampler(self): + return self.test_sampler + + @abstractmethod + def init_train_dataset(self): + raise NotImplementedError("load_train_dataset() isn't implemented") + + @abstractmethod + def init_test_dataset(self): + raise NotImplementedError("load_train_dataset() isn't implemented") + + # @abstractmethod + # def load_train_dataset(self): + # """ + # Loads & returns the training dataset. + # + # :return: tuple + # """ + # raise NotImplementedError("load_train_dataset() isn't implemented") + # + # @abstractmethod + # def load_test_dataset(self): + # """ + # Loads & returns the test dataset. + # + # :return: tuple + # """ + # raise NotImplementedError("load_test_dataset() isn't implemented") + + # def get_train_loader(self, batch_size, **kwargs): + # """ + # Return the data loader for the train dataset. + # + # :param batch_size: batch size of data loader + # :type batch_size: int + # :return: torch.utils.data.DataLoader + # """ + # return Dataset.get_data_loader_from_data(batch_size, self.train_dataset[0], self.train_dataset[1], **kwargs) + # + # def get_test_loader(self, batch_size, **kwargs): + # """ + # Return the data loader for the test dataset. + # + # :param batch_size: batch size of data loader + # :type batch_size: int + # :return: torch.utils.data.DataLoader + # """ + # return Dataset.get_data_loader_from_data(batch_size, self.test_dataset[0], self.test_dataset[1], **kwargs) + # + # @staticmethod + # def get_data_loader_from_data(batch_size, X, Y, **kwargs): + # """ + # Get a data loader created from a given set of data. + # + # :param batch_size: batch size of data loader + # :type batch_size: int + # :param X: data features + # :type X: numpy.Array() + # :param Y: data labels + # :type Y: numpy.Array() + # :return: torch.utils.data.DataLoader + # """ + # X_torch = torch.from_numpy(X).float() + # + # if "classification_problem" in kwargs and kwargs["classification_problem"] == False: + # Y_torch = torch.from_numpy(Y).float() + # else: + # Y_torch = torch.from_numpy(Y).long() + # dataset = TensorDataset(X_torch, Y_torch) + # + # kwargs.pop("classification_problem", None) + # + # return DataLoader(dataset, batch_size=batch_size, **kwargs) + # + # @staticmethod + # def get_tuple_from_data_loader(data_loader): + # """ + # Get a tuple representation of the data stored in a data loader. + # + # :param data_loader: data loader to get data from + # :type data_loader: torch.utils.data.DataLoader + # :return: tuple + # """ + # return (next(iter(data_loader))[0].numpy(), next(iter(data_loader))[1].numpy()) diff --git a/fltk/datasets/distributed/fashion_mnist.py b/fltk/datasets/distributed/fashion_mnist.py new file mode 100644 index 00000000..0f851cfa --- /dev/null +++ b/fltk/datasets/distributed/fashion_mnist.py @@ -0,0 +1,33 @@ +from .dataset import Dataset +from torchvision import datasets +from torchvision import transforms +from torch.utils.data import DataLoader + +class FashionMNISTDataset(Dataset): + + def __init__(self, args): + super(FashionMNISTDataset, self).__init__(args) + + def load_train_dataset(self): + self.get_args().get_logger().debug("Loading Fashion MNIST train data") + + train_dataset = datasets.FashionMNIST(self.get_args().get_data_path(), train=True, download=True, transform=transforms.Compose([transforms.ToTensor()])) + train_loader = DataLoader(train_dataset, batch_size=len(train_dataset)) + + train_data = self.get_tuple_from_data_loader(train_loader) + + self.get_args().get_logger().debug("Finished loading Fashion MNIST train data") + + return train_data + + def load_test_dataset(self): + self.get_args().get_logger().debug("Loading Fashion MNIST test data") + + test_dataset = datasets.FashionMNIST(self.get_args().get_data_path(), train=False, download=True, transform=transforms.Compose([transforms.ToTensor()])) + test_loader = DataLoader(test_dataset, batch_size=len(test_dataset)) + + test_data = self.get_tuple_from_data_loader(test_loader) + + self.get_args().get_logger().debug("Finished loading Fashion MNIST test data") + + return test_data diff --git a/fltk/datasets/fashion_mnist.py b/fltk/datasets/fashion_mnist.py new file mode 100644 index 00000000..0f851cfa --- /dev/null +++ b/fltk/datasets/fashion_mnist.py @@ -0,0 +1,33 @@ +from .dataset import Dataset +from torchvision import datasets +from torchvision import transforms +from torch.utils.data import DataLoader + +class FashionMNISTDataset(Dataset): + + def __init__(self, args): + super(FashionMNISTDataset, self).__init__(args) + + def load_train_dataset(self): + self.get_args().get_logger().debug("Loading Fashion MNIST train data") + + train_dataset = datasets.FashionMNIST(self.get_args().get_data_path(), train=True, download=True, transform=transforms.Compose([transforms.ToTensor()])) + train_loader = DataLoader(train_dataset, batch_size=len(train_dataset)) + + train_data = self.get_tuple_from_data_loader(train_loader) + + self.get_args().get_logger().debug("Finished loading Fashion MNIST train data") + + return train_data + + def load_test_dataset(self): + self.get_args().get_logger().debug("Loading Fashion MNIST test data") + + test_dataset = datasets.FashionMNIST(self.get_args().get_data_path(), train=False, download=True, transform=transforms.Compose([transforms.ToTensor()])) + test_loader = DataLoader(test_dataset, batch_size=len(test_dataset)) + + test_data = self.get_tuple_from_data_loader(test_loader) + + self.get_args().get_logger().debug("Finished loading Fashion MNIST test data") + + return test_data diff --git a/fltk/federator.py b/fltk/federator.py new file mode 100644 index 00000000..bdfe0055 --- /dev/null +++ b/fltk/federator.py @@ -0,0 +1,270 @@ +import datetime +import time +from typing import List + +from dataclass_csv import DataclassWriter +from torch.distributed import rpc + +from fltk.client import Client +from fltk.datasets.data_distribution import distribute_batches_equally +from fltk.strategy.client_selection import random_selection +from fltk.util.arguments import Arguments +from fltk.util.base_config import BareConfig +from fltk.util.data_loader_utils import load_train_data_loader, load_test_data_loader, \ + generate_data_loaders_from_distributed_dataset +from fltk.util.fed_avg import average_nn_parameters +from fltk.util.log import FLLogger +from torchsummary import summary +from torch.utils.tensorboard import SummaryWriter +from pathlib import Path +import logging + +from fltk.util.results import EpochData +from fltk.util.tensor_converter import convert_distributed_data_into_numpy + +logging.basicConfig(level=logging.DEBUG) + +def _call_method(method, rref, *args, **kwargs): + return method(rref.local_value(), *args, **kwargs) + + +def _remote_method(method, rref, *args, **kwargs): + args = [method, rref] + list(args) + return rpc.rpc_sync(rref.owner(), _call_method, args=args, kwargs=kwargs) + +def _remote_method_async(method, rref, *args, **kwargs): + args = [method, rref] + list(args) + return rpc.rpc_async(rref.owner(), _call_method, args=args, kwargs=kwargs) + +class ClientRef: + ref = None + name = "" + data_size = 0 + tb_writer = None + + def __init__(self, name, ref, tensorboard_writer): + self.name = name + self.ref = ref + self.tb_writer = tensorboard_writer + + def __repr__(self): + return self.name + +class Federator: + """ + Central component of the Federated Learning System: The Federator + + The Federator is in charge of the following tasks: + - Have a copy of the global model + - Client selection + - Aggregating the client model weights/gradients + - Saving all the metrics + - Use tensorboard to report metrics + - Keep track of timing + + """ + clients: List[ClientRef] = [] + epoch_counter = 0 + client_data = {} + + def __init__(self, client_id_triple, num_epochs = 3, config=None): + log_rref = rpc.RRef(FLLogger()) + self.log_rref = log_rref + self.num_epoch = num_epochs + self.config = config + self.tb_path = config.output_location + self.ensure_path_exists(self.tb_path) + self.tb_writer = SummaryWriter(f'{self.tb_path}/{config.experiment_prefix}_federator') + self.create_clients(client_id_triple) + self.config.init_logger(logging) + + def create_clients(self, client_id_triple): + # bare_config = BareConfig(self.config.logger) + for id, rank, world_size in client_id_triple: + client = rpc.remote(id, Client, kwargs=dict(id=id, log_rref=self.log_rref, rank=rank, world_size=world_size, config=self.config)) + writer = SummaryWriter(f'{self.tb_path}/{self.config.experiment_prefix}_client_{id}') + self.clients.append(ClientRef(id, client, tensorboard_writer=writer)) + self.client_data[id] = [] + + def select_clients(self, n = 2): + return random_selection(self.clients, n) + + def ping_all(self): + for client in self.clients: + logging.info(f'Sending ping to {client}') + t_start = time.time() + answer = _remote_method(Client.ping, client.ref) + t_end = time.time() + duration = (t_end - t_start)*1000 + logging.info(f'Ping to {client} is {duration:.3}ms') + + def rpc_test_all(self): + for client in self.clients: + res = _remote_method_async(Client.rpc_test, client.ref) + while not res.done(): + pass + + def client_load_data(self): + for client in self.clients: + _remote_method_async(Client.init_dataloader, client.ref) + + def clients_ready(self): + all_ready = False + ready_clients = [] + while not all_ready: + responses = [] + for client in self.clients: + if client.name not in ready_clients: + responses.append((client, _remote_method_async(Client.is_ready, client.ref))) + all_ready = True + for res in responses: + result = res[1].wait() + if result: + logging.info(f'{res[0]} is ready') + ready_clients.append(res[0]) + else: + logging.info(f'Waiting for {res[0]}') + all_ready = False + + time.sleep(2) + logging.info('All clients are ready') + + def remote_run_epoch(self, epochs): + responses = [] + client_weights = [] + selected_clients = self.select_clients(self.config.clients_per_round) + for client in selected_clients: + responses.append((client, _remote_method_async(Client.run_epochs, client.ref, num_epoch=epochs))) + self.epoch_counter += epochs + for res in responses: + epoch_data, weights = res[1].wait() + self.client_data[epoch_data.client_id].append(epoch_data) + logging.info(f'{res[0]} had a loss of {epoch_data.loss}') + logging.info(f'{res[0]} had a epoch data of {epoch_data}') + # res[0].tb_writer.add_scalar('training loss', + # epoch_data.loss_train / 1000, # for every 1000 minibatches + # epoch * len(trainloader) + i) + + res[0].tb_writer.add_scalar('training loss', + epoch_data.loss_train, # for every 1000 minibatches + self.epoch_counter * res[0].data_size) + + client_weights.append(weights) + updated_model = average_nn_parameters(client_weights) + + responses = [] + for client in self.clients: + responses.append( + (client, _remote_method_async(Client.update_nn_parameters, client.ref, new_params=updated_model))) + + for res in responses: + res[1].wait() + logging.info('Weights are updated') + + # def remote_train_sync(self, epoch): + # # logging.info('Starting epoch training') + # responses = [] + # # for client in self.clients: + # # _remote_method(Client.train, client.ref, epoch=epoch) + # + # client_weights = [] + # selected_clients = self.select_clients() + # for client in selected_clients: + # responses.append((client, _remote_method_async(Client.train, client.ref, epoch=epoch))) + # + # for res in responses: + # # logging.info(f'Waiting for {res[0]}') + # loss, weights = res[1].wait() + # logging.info(f'{res[0]} had a loss of {loss}') + # client_weights.append(weights) + # # logging.info('Done with one epoch') + # updated_model = average_nn_parameters(client_weights) + # + # responses = [] + # for client in self.clients: + # responses.append((client, _remote_method_async(Client.update_nn_parameters, client.ref, new_params=updated_model))) + # + # for res in responses: + # res[1].wait() + # logging.info('Weights are updated') + + def update_client_data_sizes(self): + # logging.info('Starting epoch testing') + responses = [] + # for client in self.clients: + # _remote_method(Client.train, client.ref, epoch=epoch) + + for client in self.clients: + responses.append((client, _remote_method_async(Client.get_client_datasize, client.ref))) + + for res in responses: + # logging.info(f'Waiting for {res[0]}') + res[0].data_size = res[1].wait() + # logging.info(f'{res[0]} had a result of accuracy={accuracy}, loss={loss}, class_precision={class_precision}, class_recall={class_recall}') + logging.info(f'{res[0]} had a result of datasize={res[0].data_size}') + # logging.info('Done with one epoch') + + def remote_test_sync(self): + # logging.info('Starting epoch testing') + responses = [] + # for client in self.clients: + # _remote_method(Client.train, client.ref, epoch=epoch) + + for client in self.clients: + responses.append((client, _remote_method_async(Client.test, client.ref))) + + for res in responses: + # logging.info(f'Waiting for {res[0]}') + accuracy, loss, class_precision, class_recall = res[1].wait() + # logging.info(f'{res[0]} had a result of accuracy={accuracy}, loss={loss}, class_precision={class_precision}, class_recall={class_recall}') + logging.info(f'{res[0]} had a result of accuracy={accuracy}') + # logging.info('Done with one epoch') + + def save_epoch_data(self): + file_output = f'./{self.config.output_location}' + self.ensure_path_exists(file_output) + for key in self.client_data: + filename = f'{file_output}/{key}_epochs.csv' + logging.info(f'Saving data at {filename}') + with open(filename, "w") as f: + w = DataclassWriter(f, self.client_data[key], EpochData) + w.write() + + def ensure_path_exists(self, path): + Path(path).mkdir(parents=True, exist_ok=True) + + def run(self): + """ + Main loop of the Federator + :return: + """ + + # Init clients + # # Make sure the clients have loaded all the data + self.client_load_data() + self.ping_all() + self.clients_ready() + self.update_client_data_sizes() + + epoch_to_run = self.num_epoch + addition = 0 + epoch_to_run = self.config.epochs + epoch_size = self.config.epochs_per_cycle + for epoch in range(epoch_to_run): + # print(f'Running epoch {epoch}') + # # Run all epoch + # for epoch in range(1, 20): + print(f'Running epoch {epoch}') + self.remote_run_epoch(epoch_size) + addition += 1 + + # self.remote_train_sync(epoch) + # self.remote_test_sync() + # rpc.shutdown() + logging.info('Printing client data') + print(self.client_data) + + logging.info(f'Saving data') + self.save_epoch_data() + logging.info(f'Federator is stopping') + diff --git a/fltk/launch.py b/fltk/launch.py new file mode 100644 index 00000000..5aaa5a43 --- /dev/null +++ b/fltk/launch.py @@ -0,0 +1,68 @@ +import os +import sys +import torch.distributed.rpc as rpc +import logging + +import yaml +import argparse + +import torch.multiprocessing as mp +from fltk.federator import Federator +from fltk.util.base_config import BareConfig + +logging.basicConfig(level=logging.DEBUG) + + +def run_ps(rpc_ids_triple, args): + print(f'Starting the federator...') + fed = Federator(rpc_ids_triple, config=args) + fed.run() + +def run_single(rank, world_size, host = None, args = None): + logging.info(f'Starting with rank={rank} and world size={world_size}') + if host: + os.environ['MASTER_ADDR'] = host + else: + os.environ['MASTER_ADDR'] = '0.0.0.0' + os.environ['MASTER_PORT'] = '5000' + os.environ['GLOO_SOCKET_IFNAME'] = 'wlo1' + os.environ['TP_SOCKET_IFNAME'] = 'wlo1' + logging.info(f'Starting with host={os.environ["MASTER_ADDR"]} and port={os.environ["MASTER_PORT"]}') + options = rpc.TensorPipeRpcBackendOptions( + num_worker_threads=16, + rpc_timeout=0, # infinite timeout + init_method=f'tcp://{os.environ["MASTER_ADDR"]}:{os.environ["MASTER_PORT"]}' + ) + + if rank != 0: + logging.info(f'Starting worker {rank}') + rpc.init_rpc( + f"client{rank}", + rank=rank, + world_size=world_size, + rpc_backend_options=options, + ) + # trainer passively waiting for ps to kick off training iterations + else: + logging.info('Starting the ps') + rpc.init_rpc( + "ps", + rank=rank, + world_size=world_size, + rpc_backend_options=options + + ) + run_ps([(f"client{r}", r, world_size) for r in range(1, world_size)], args) + # block until all rpc finish + rpc.shutdown() + + +def run_spawn(config): + world_size = config.world_size + master_address = config.federator_host + mp.spawn( + run_single, + args=(world_size, master_address, config), + nprocs=world_size, + join=True + ) \ No newline at end of file diff --git a/fltk/nets/__init__.py b/fltk/nets/__init__.py new file mode 100644 index 00000000..432dbca9 --- /dev/null +++ b/fltk/nets/__init__.py @@ -0,0 +1,6 @@ +from .cifar_10_cnn import Cifar10CNN +from .cifar_100_resnet import Cifar100ResNet +from .fashion_mnist_cnn import FashionMNISTCNN +from .fashion_mnist_resnet import FashionMNISTResNet +from .cifar_10_resnet import Cifar10ResNet +from .cifar_100_vgg import Cifar100VGG \ No newline at end of file diff --git a/fltk/nets/cifar_100_resnet.py b/fltk/nets/cifar_100_resnet.py new file mode 100644 index 00000000..4651fe25 --- /dev/null +++ b/fltk/nets/cifar_100_resnet.py @@ -0,0 +1,146 @@ +import torch.nn as nn +import torch.nn.functional as F + +class BasicBlock(nn.Module): + """Basic Block for resnet 18 and resnet 34 + """ + + #BasicBlock and BottleNeck block + #have different output size + #we use class attribute expansion + #to distinct + expansion = 1 + + def __init__(self, in_channels, out_channels, stride=1): + super().__init__() + + #residual function + self.residual_function = nn.Sequential( + nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False), + nn.BatchNorm2d(out_channels), + nn.ReLU(inplace=True), + nn.Conv2d(out_channels, out_channels * BasicBlock.expansion, kernel_size=3, padding=1, bias=False), + nn.BatchNorm2d(out_channels * BasicBlock.expansion) + ) + + #shortcut + self.shortcut = nn.Sequential() + + #the shortcut output dimension is not the same with residual function + #use 1*1 convolution to match the dimension + if stride != 1 or in_channels != BasicBlock.expansion * out_channels: + self.shortcut = nn.Sequential( + nn.Conv2d(in_channels, out_channels * BasicBlock.expansion, kernel_size=1, stride=stride, bias=False), + nn.BatchNorm2d(out_channels * BasicBlock.expansion) + ) + + def forward(self, x): + return nn.ReLU(inplace=True)(self.residual_function(x) + self.shortcut(x)) + +class BottleNeck(nn.Module): + """Residual block for resnet over 50 layers + """ + expansion = 4 + def __init__(self, in_channels, out_channels, stride=1): + super().__init__() + self.residual_function = nn.Sequential( + nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False), + nn.BatchNorm2d(out_channels), + nn.ReLU(inplace=True), + nn.Conv2d(out_channels, out_channels, stride=stride, kernel_size=3, padding=1, bias=False), + nn.BatchNorm2d(out_channels), + nn.ReLU(inplace=True), + nn.Conv2d(out_channels, out_channels * BottleNeck.expansion, kernel_size=1, bias=False), + nn.BatchNorm2d(out_channels * BottleNeck.expansion), + ) + + self.shortcut = nn.Sequential() + + if stride != 1 or in_channels != out_channels * BottleNeck.expansion: + self.shortcut = nn.Sequential( + nn.Conv2d(in_channels, out_channels * BottleNeck.expansion, stride=stride, kernel_size=1, bias=False), + nn.BatchNorm2d(out_channels * BottleNeck.expansion) + ) + + def forward(self, x): + return nn.ReLU(inplace=True)(self.residual_function(x) + self.shortcut(x)) + +class Cifar100ResNet(nn.Module): + + def __init__(self, block = BasicBlock, num_block =[2, 2, 2, 2], num_classes=100): + super(Cifar100ResNet, self).__init__() + + self.in_channels = 64 + + self.conv1 = nn.Sequential( + nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False), + nn.BatchNorm2d(64), + nn.ReLU(inplace=True)) + #we use a different inputsize than the original paper + #so conv2_x's stride is 1 + self.conv2_x = self._make_layer(block, 64, num_block[0], 1) + self.conv3_x = self._make_layer(block, 128, num_block[1], 2) + self.conv4_x = self._make_layer(block, 256, num_block[2], 2) + self.conv5_x = self._make_layer(block, 512, num_block[3], 2) + self.avg_pool = nn.AdaptiveAvgPool2d((1, 1)) + self.fc = nn.Linear(512 * block.expansion, num_classes) + + def _make_layer(self, block, out_channels, num_blocks, stride): + """make resnet layers(by layer i didnt mean this 'layer' was the + same as a neuron netowork layer, ex. conv layer), one layer may + contain more than one residual block + Args: + block: block type, basic block or bottle neck block + out_channels: output depth channel number of this layer + num_blocks: how many blocks per layer + stride: the stride of the first block of this layer + Return: + return a resnet layer + """ + + # we have num_block blocks per layer, the first block + # could be 1 or 2, other blocks would always be 1 + strides = [stride] + [1] * (num_blocks - 1) + layers = [] + for stride in strides: + layers.append(block(self.in_channels, out_channels, stride)) + self.in_channels = out_channels * block.expansion + + return nn.Sequential(*layers) + + def forward(self, x): + output = self.conv1(x) + output = self.conv2_x(output) + output = self.conv3_x(output) + output = self.conv4_x(output) + output = self.conv5_x(output) + output = self.avg_pool(output) + output = output.view(output.size(0), -1) + output = self.fc(output) + + return output + +def resnet18(): + """ return a ResNet 18 object + """ + return Cifar100ResNet(BasicBlock, [2, 2, 2, 2]) + +def resnet34(): + """ return a ResNet 34 object + """ + return Cifar100ResNet(BasicBlock, [3, 4, 6, 3]) + +def resnet50(): + """ return a ResNet 50 object + """ + return Cifar100ResNet(BottleNeck, [3, 4, 6, 3]) + +def resnet101(): + """ return a ResNet 101 object + """ + return Cifar100ResNet(BottleNeck, [3, 4, 23, 3]) + +def resnet152(): + """ return a ResNet 152 object + """ + return Cifar100ResNet(BottleNeck, [3, 8, 36, 3]) diff --git a/fltk/nets/cifar_100_vgg.py b/fltk/nets/cifar_100_vgg.py new file mode 100644 index 00000000..112b969a --- /dev/null +++ b/fltk/nets/cifar_100_vgg.py @@ -0,0 +1,65 @@ +import torch +import torch.nn as nn + +cfg = { + 'A' : [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'], + 'B' : [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'], + 'D' : [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'], + 'E' : [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M'] +} + + +def make_layers(cfg, batch_norm=False): + layers = [] + + input_channel = 3 + for l in cfg: + if l == 'M': + layers += [nn.MaxPool2d(kernel_size=2, stride=2)] + continue + + layers += [nn.Conv2d(input_channel, l, kernel_size=3, padding=1)] + + if batch_norm: + layers += [nn.BatchNorm2d(l)] + + layers += [nn.ReLU(inplace=True)] + input_channel = l + + return nn.Sequential(*layers) + +class Cifar100VGG(nn.Module): + + def __init__(self, features = make_layers(cfg['D'], batch_norm=True), num_class=100): + super(Cifar100VGG, self).__init__() + self.features = features + + self.classifier = nn.Sequential( + nn.Linear(512, 4096), + nn.ReLU(inplace=True), + nn.Dropout(), + nn.Linear(4096, 4096), + nn.ReLU(inplace=True), + nn.Dropout(), + nn.Linear(4096, num_class) + ) + + def forward(self, x): + output = self.features(x) + output = output.view(output.size()[0], -1) + output = self.classifier(output) + + return output + + +def vgg11_bn(): + return Cifar100VGG(make_layers(cfg['A'], batch_norm=True)) + +def vgg13_bn(): + return Cifar100VGG(make_layers(cfg['B'], batch_norm=True)) + +def vgg16_bn(): + return Cifar100VGG(make_layers(cfg['D'], batch_norm=True)) + +def vgg19_bn(): + return Cifar100VGG(make_layers(cfg['E'], batch_norm=True)) \ No newline at end of file diff --git a/fltk/nets/cifar_10_cnn.py b/fltk/nets/cifar_10_cnn.py new file mode 100644 index 00000000..bf4c0b2e --- /dev/null +++ b/fltk/nets/cifar_10_cnn.py @@ -0,0 +1,49 @@ +import torch +import torch.nn as nn +import torch.nn.functional as F + +class Cifar10CNN(nn.Module): + + def __init__(self): + super(Cifar10CNN, self).__init__() + + self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1) + self.bn1 = nn.BatchNorm2d(32) + self.conv2 = nn.Conv2d(32, 32, kernel_size=3, padding=1) + self.bn2 = nn.BatchNorm2d(32) + self.pool1 = nn.MaxPool2d(kernel_size=2) + + self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1) + self.bn3 = nn.BatchNorm2d(64) + self.conv4 = nn.Conv2d(64, 64, kernel_size=3, padding=1) + self.bn4 = nn.BatchNorm2d(64) + self.pool2 = nn.MaxPool2d(kernel_size=2) + + self.conv5 = nn.Conv2d(64, 128, kernel_size=3, padding=1) + self.bn5 = nn.BatchNorm2d(128) + self.conv6 = nn.Conv2d(128, 128, kernel_size=3, padding=1) + self.bn6 = nn.BatchNorm2d(128) + self.pool3 = nn.MaxPool2d(kernel_size=2) + + self.fc1 = nn.Linear(128 * 4 * 4, 128) + self.fc2 = nn.Linear(128, 10) + + def forward(self, x): + x = self.bn1(F.relu(self.conv1(x))) + x = self.bn2(F.relu(self.conv2(x))) + x = self.pool1(x) + + x = self.bn3(F.relu(self.conv3(x))) + x = self.bn4(F.relu(self.conv4(x))) + x = self.pool2(x) + + x = self.bn5(F.relu(self.conv5(x))) + x = self.bn6(F.relu(self.conv6(x))) + x = self.pool3(x) + + x = x.view(-1, 128 * 4 * 4) + + x = self.fc1(x) + x = F.softmax(self.fc2(x)) + + return x diff --git a/fltk/nets/cifar_10_resnet.py b/fltk/nets/cifar_10_resnet.py new file mode 100644 index 00000000..08a22313 --- /dev/null +++ b/fltk/nets/cifar_10_resnet.py @@ -0,0 +1,115 @@ +import torch.nn as nn +import torch.nn.functional as F + + +class BasicBlock(nn.Module): + expansion = 1 + + def __init__(self, in_planes, planes, stride=1): + super(BasicBlock, self).__init__() + self.conv1 = nn.Conv2d( + in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False) + self.bn1 = nn.BatchNorm2d(planes) + self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, + stride=1, padding=1, bias=False) + self.bn2 = nn.BatchNorm2d(planes) + + self.shortcut = nn.Sequential() + if stride != 1 or in_planes != self.expansion*planes: + self.shortcut = nn.Sequential( + nn.Conv2d(in_planes, self.expansion*planes, + kernel_size=1, stride=stride, bias=False), + nn.BatchNorm2d(self.expansion*planes) + ) + + def forward(self, x): + out = F.relu(self.bn1(self.conv1(x))) + out = self.bn2(self.conv2(out)) + out += self.shortcut(x) + out = F.relu(out) + return out + + +class Bottleneck(nn.Module): + expansion = 4 + + def __init__(self, in_planes, planes, stride=1): + super(Bottleneck, self).__init__() + self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False) + self.bn1 = nn.BatchNorm2d(planes) + self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, + stride=stride, padding=1, bias=False) + self.bn2 = nn.BatchNorm2d(planes) + self.conv3 = nn.Conv2d(planes, self.expansion * + planes, kernel_size=1, bias=False) + self.bn3 = nn.BatchNorm2d(self.expansion*planes) + + self.shortcut = nn.Sequential() + if stride != 1 or in_planes != self.expansion*planes: + self.shortcut = nn.Sequential( + nn.Conv2d(in_planes, self.expansion*planes, + kernel_size=1, stride=stride, bias=False), + nn.BatchNorm2d(self.expansion*planes) + ) + + def forward(self, x): + out = F.relu(self.bn1(self.conv1(x))) + out = F.relu(self.bn2(self.conv2(out))) + out = self.bn3(self.conv3(out)) + out += self.shortcut(x) + out = F.relu(out) + return out + + +class Cifar10ResNet(nn.Module): + def __init__(self, block = BasicBlock, num_blocks =[2, 2, 2, 2], num_classes=10): + super(Cifar10ResNet, self).__init__() + self.in_planes = 64 + + self.conv1 = nn.Conv2d(3, 64, kernel_size=3, + stride=1, padding=1, bias=False) + self.bn1 = nn.BatchNorm2d(64) + self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1) + self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2) + self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2) + self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2) + self.linear = nn.Linear(512*block.expansion, num_classes) + + def _make_layer(self, block, planes, num_blocks, stride): + strides = [stride] + [1]*(num_blocks-1) + layers = [] + for stride in strides: + layers.append(block(self.in_planes, planes, stride)) + self.in_planes = planes * block.expansion + return nn.Sequential(*layers) + + def forward(self, x): + out = F.relu(self.bn1(self.conv1(x))) + out = self.layer1(out) + out = self.layer2(out) + out = self.layer3(out) + out = self.layer4(out) + out = F.avg_pool2d(out, 4) + out = out.view(out.size(0), -1) + out = self.linear(out) + return out + + +def ResNet18(): + return Cifar10ResNet(BasicBlock, [2, 2, 2, 2]) + + +def ResNet34(): + return Cifar10ResNet(BasicBlock, [3, 4, 6, 3]) + + +def ResNet50(): + return Cifar10ResNet(Bottleneck, [3, 4, 6, 3]) + + +def ResNet101(): + return Cifar10ResNet(Bottleneck, [3, 4, 23, 3]) + + +def ResNet152(): + return Cifar10ResNet(Bottleneck, [3, 8, 36, 3]) \ No newline at end of file diff --git a/fltk/nets/fashion_mnist_cnn.py b/fltk/nets/fashion_mnist_cnn.py new file mode 100644 index 00000000..0c4532c4 --- /dev/null +++ b/fltk/nets/fashion_mnist_cnn.py @@ -0,0 +1,31 @@ +import torch +import torch.nn as nn +import torch.nn.functional as F + +class FashionMNISTCNN(nn.Module): + + def __init__(self): + super(FashionMNISTCNN, self).__init__() + + self.layer1 = nn.Sequential( + nn.Conv2d(1, 16, kernel_size=5, padding=2), + nn.BatchNorm2d(16), + nn.ReLU(), + nn.MaxPool2d(2)) + self.layer2 = nn.Sequential( + nn.Conv2d(16, 32, kernel_size=5, padding=2), + nn.BatchNorm2d(32), + nn.ReLU(), + nn.MaxPool2d(2)) + + self.fc = nn.Linear(7*7*32, 10) + + def forward(self, x): + x = self.layer1(x) + x = self.layer2(x) + + x = x.view(x.size(0), -1) + + x = self.fc(x) + + return x diff --git a/fltk/nets/fashion_mnist_resnet.py b/fltk/nets/fashion_mnist_resnet.py new file mode 100644 index 00000000..43e09925 --- /dev/null +++ b/fltk/nets/fashion_mnist_resnet.py @@ -0,0 +1,62 @@ +import torch +import torch.nn as nn +import torch.nn.functional as F + +class Residual(nn.Module): + def __init__(self,in_channel,num_channel,use_conv1x1=False,strides=1): + super(Residual,self).__init__() + self.relu=nn.ReLU() + self.bn1=nn.BatchNorm2d(in_channel,eps=1e-3) + self.conv1=nn.Conv2d(in_channels =in_channel,out_channels=num_channel,kernel_size=3,padding=1,stride=strides) + self.bn2=nn.BatchNorm2d(num_channel,eps=1e-3) + self.conv2=nn.Conv2d(in_channels=num_channel,out_channels=num_channel,kernel_size=3,padding=1) + if use_conv1x1: + self.conv3=nn.Conv2d(in_channels=in_channel,out_channels=num_channel,kernel_size=1,stride=strides) + else: + self.conv3=None + + + def forward(self, x): + y=self.conv1(self.relu(self.bn1(x))) + y=self.conv2(self.relu(self.bn2(y))) + # print (y.shape) + if self.conv3: + x=self.conv3(x) + # print (x.shape) + z=y+x + return z + +def ResNet_block(in_channels,num_channels,num_residuals,first_block=False): + layers=[] + for i in range(num_residuals): + if i==0 and not first_block: + layers += [Residual(in_channels,num_channels,use_conv1x1=True,strides=2)] + elif i>0 and not first_block: + layers += [Residual(num_channels,num_channels)] + else: + layers += [Residual(in_channels, num_channels)] + blk=nn.Sequential(*layers) + return blk + +class FashionMNISTResNet(nn.Module): + def __init__(self,in_channel = 1 ,num_classes = 10): + super(FashionMNISTResNet,self).__init__() + self.block1=nn.Sequential(nn.Conv2d(in_channels=in_channel,out_channels=64,kernel_size=7,stride=2,padding=3), + nn.BatchNorm2d(64), + nn.ReLU(), + nn.MaxPool2d(kernel_size=3,stride=2,padding=1)) + self.block2=nn.Sequential(ResNet_block(64,64,2,True), + ResNet_block(64,128,2), + ResNet_block(128,256,2), + ResNet_block(256,512,2)) + self.block3=nn.Sequential(nn.AvgPool2d(kernel_size=3)) + self.Dense=nn.Linear(512,10) + + + def forward(self,x): + y=self.block1(x) + y=self.block2(y) + y=self.block3(y) + y=y.view(-1,512) + y=self.Dense(y) + return y \ No newline at end of file diff --git a/fltk/nets/reddit_lstm.py b/fltk/nets/reddit_lstm.py new file mode 100644 index 00000000..4b7f71f9 --- /dev/null +++ b/fltk/nets/reddit_lstm.py @@ -0,0 +1,61 @@ +import torch.nn as nn +from fltk.nets.simple import SimpleNet +from torch.autograd import Variable + + +class RNNModel(SimpleNet): + """Container module with an encoder, a recurrent module, and a decoder.""" + + def __init__(self, name, created_time, rnn_type, ntoken, ninp, nhid, nlayers, dropout=0.5, tie_weights=False): + super(RNNModel, self).__init__(name=name, created_time=created_time) + self.drop = nn.Dropout(dropout) + self.encoder = nn.Embedding(ntoken, ninp) + if rnn_type in ['LSTM', 'GRU']: + self.rnn = getattr(nn, rnn_type)(ninp, nhid, nlayers, dropout=dropout) + else: + try: + nonlinearity = {'RNN_TANH': 'tanh', 'RNN_RELU': 'relu'}[rnn_type] + except KeyError: + raise ValueError( """An invalid option for `--model` was supplied, + options are ['LSTM', 'GRU', 'RNN_TANH' or 'RNN_RELU']""") + self.rnn = nn.RNN(ninp, nhid, nlayers, nonlinearity=nonlinearity, dropout=dropout) + self.decoder = nn.Linear(nhid, ntoken) + + # Optionally tie weights as in: + # "Using the Output Embedding to Improve Language Models" (Press & Wolf 2016) + # https://arxiv.org/abs/1608.05859 + # and + # "Tying Word Vectors and Word Classifiers: A Loss Framework for Language Modeling" (Inan et al. 2016) + # https://arxiv.org/abs/1611.01462 + + if tie_weights: + if nhid != ninp: + raise ValueError('When using the tied flag, nhid must be equal to emsize') + self.decoder.weight = self.encoder.weight + + self.init_weights() + + self.rnn_type = rnn_type + self.nhid = nhid + self.nlayers = nlayers + + def init_weights(self): + initrange = 0.1 + self.encoder.weight.data.uniform_(-initrange, initrange) + self.decoder.bias.data.fill_(0) + self.decoder.weight.data.uniform_(-initrange, initrange) + + def forward(self, input, hidden): + emb = self.drop(self.encoder(input)) + output, hidden = self.rnn(emb, hidden) + output = self.drop(output) + decoded = self.decoder(output.view(output.size(0)*output.size(1), output.size(2))) + return decoded.view(output.size(0), output.size(1), decoded.size(1)), hidden + + def init_hidden(self, bsz): + weight = next(self.parameters()).data + if self.rnn_type == 'LSTM': + return (Variable(weight.new(self.nlayers, bsz, self.nhid).zero_()), + Variable(weight.new(self.nlayers, bsz, self.nhid).zero_())) + else: + return Variable(weight.new(self.nlayers, bsz, self.nhid).zero_()) \ No newline at end of file diff --git a/fltk/nets/simple.py b/fltk/nets/simple.py new file mode 100644 index 00000000..7c7a5e76 --- /dev/null +++ b/fltk/nets/simple.py @@ -0,0 +1,89 @@ +import argparse +import torch +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim +from torchvision import datasets, transforms +from torch.autograd import Variable +import numpy as np +import datetime + + +class SimpleNet(nn.Module): + def __init__(self, name=None, created_time=None): + super(SimpleNet, self).__init__() + self.created_time = created_time + self.name=name + + + + def visualize(self, vis, epoch, acc, loss=None, eid='main', is_poisoned=False, name=None): + if name is None: + name = self.name + '_poisoned' if is_poisoned else self.name + vis.line(X=np.array([epoch]), Y=np.array([acc]), name=name, win='vacc_{0}'.format(self.created_time), env=eid, + update='append' if vis.win_exists('vacc_{0}'.format(self.created_time), env=eid) else None, + opts=dict(showlegend=True, title='Accuracy_{0}'.format(self.created_time), + width=700, height=400)) + if loss is not None: + vis.line(X=np.array([epoch]), Y=np.array([loss]), name=name, env=eid, + win='vloss_{0}'.format(self.created_time), + update='append' if vis.win_exists('vloss_{0}'.format(self.created_time), env=eid) else None, + opts=dict(showlegend=True, title='Loss_{0}'.format(self.created_time), width=700, height=400)) + + return + + + + def train_vis(self, vis, epoch, data_len, batch, loss, eid='main', name=None, win='vtrain'): + + vis.line(X=np.array([(epoch-1)*data_len+batch]), Y=np.array([loss]), + env=eid, + name=f'{name}' if name is not None else self.name, win=f'{win}_{self.created_time}', + update='append' if vis.win_exists(f'{win}_{self.created_time}', env=eid) else None, + opts=dict(showlegend=True, width=700, height=400, title='Train loss_{0}'.format(self.created_time))) + + + + def save_stats(self, epoch, loss, acc): + self.stats['epoch'].append(epoch) + self.stats['loss'].append(loss) + self.stats['acc'].append(acc) + + + def copy_params(self, state_dict, coefficient_transfer=100): + + own_state = self.state_dict() + + for name, param in state_dict.items(): + if name in own_state: + shape = param.shape + # no cuda + # random_tensor = (torch.cuda.FloatTensor(shape).random_(0, 100) <= coefficient_transfer).type( + # torch.cuda.FloatTensor) + random_tensor = (torch.FloatTensor(shape).random_(0, 100) <= coefficient_transfer).type( + torch.FloatTensor) + negative_tensor = (random_tensor*-1)+1 + # own_state[name].copy_(param) + own_state[name].copy_(param.clone()) + + + + +class SimpleMnist(SimpleNet): + def __init__(self, name=None, created_time=None): + super(SimpleMnist, self).__init__(name, created_time) + self.conv1 = nn.Conv2d(1, 10, kernel_size=5) + self.conv2 = nn.Conv2d(10, 20, kernel_size=5) + self.conv2_drop = nn.Dropout2d() + self.fc1 = nn.Linear(320, 50) + self.fc2 = nn.Linear(50, 10) + + + def forward(self, x): + x = F.relu(F.max_pool2d(self.conv1(x), 2)) + x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) + x = x.view(-1, 320) + x = F.relu(self.fc1(x)) + x = F.dropout(x, training=self.training) + x = self.fc2(x) + return F.log_softmax(x, dim=1) \ No newline at end of file diff --git a/fltk/schedulers/__init__.py b/fltk/schedulers/__init__.py new file mode 100644 index 00000000..d5bf41f5 --- /dev/null +++ b/fltk/schedulers/__init__.py @@ -0,0 +1 @@ +from .min_lr_step import MinCapableStepLR diff --git a/fltk/schedulers/min_lr_step.py b/fltk/schedulers/min_lr_step.py new file mode 100644 index 00000000..6a44fdcb --- /dev/null +++ b/fltk/schedulers/min_lr_step.py @@ -0,0 +1,48 @@ +class MinCapableStepLR: + + def __init__(self, logger, optimizer, step_size, gamma, min_lr): + """ + :param logger: logger + :type logger: loguru.logger + :param optimizer: + :type optimizer: torch.optim + :param step_size: # of epochs between LR updates + :type step_size: int + :param gamma: multiplication factor for LR update + :type gamma: float + :param min_lr: minimum learning rate + :type min_lr: float + """ + self.logger = logger + + self.optimizer = optimizer + self.step_size = step_size + self.gamma = gamma + self.min_lr = min_lr + + self.epoch_idx = 0 + + def step(self): + """ + Adjust the learning rate as necessary. + """ + self.increment_epoch_index() + + if self.is_time_to_update_lr(): + self.logger.debug("Updating LR for optimizer") + + self.update_lr() + + def is_time_to_update_lr(self): + return self.epoch_idx % self.step_size == 0 + + def update_lr(self): + if self.optimizer.param_groups[0]['lr'] * self.gamma >= self.min_lr: + self.optimizer.param_groups[0]['lr'] *= self.gamma + else: + self.logger.warning("Updating LR would place it below min LR. Skipping LR update.") + + self.logger.debug("New LR: {}".format(self.optimizer.param_groups[0]['lr'])) + + def increment_epoch_index(self): + self.epoch_idx += 1 diff --git a/fltk/strategy/__init__.py b/fltk/strategy/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fltk/strategy/aggregation.py b/fltk/strategy/aggregation.py new file mode 100644 index 00000000..96dc866f --- /dev/null +++ b/fltk/strategy/aggregation.py @@ -0,0 +1,36 @@ + + + +def average_nn_parameters(parameters): + """ + Averages passed parameters. + :param parameters: nn model named parameters + :type parameters: list + """ + new_params = {} + for name in parameters[0].keys(): + new_params[name] = sum([param[name].data for param in parameters]) / len(parameters) + + return new_params + +def fed_average_nn_parameters(parameters, sizes): + new_params = {} + sum_size = 0 + + # print('size'+ str(sizes)) + + for client in parameters: + for name in parameters[client].keys(): + try: + new_params[name].data += (parameters[client][name].data * sizes[client]) + except: + new_params[name] = (parameters[client][name].data * sizes[client]) + # print('first agg') + sum_size += sizes[client] + + for name in new_params: + new_params[name].data /= sum_size + + # new_params = [new_params[name].data / sum_size for name in new_params.keys()] + + return new_params \ No newline at end of file diff --git a/fltk/strategy/client_selection.py b/fltk/strategy/client_selection.py new file mode 100644 index 00000000..34900ce8 --- /dev/null +++ b/fltk/strategy/client_selection.py @@ -0,0 +1,4 @@ +import numpy as np + +def random_selection(clients, n): + return np.random.choice(clients, n, replace=False) \ No newline at end of file diff --git a/fltk/util/__init__.py b/fltk/util/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fltk/util/arguments.py b/fltk/util/arguments.py new file mode 100644 index 00000000..7945113d --- /dev/null +++ b/fltk/util/arguments.py @@ -0,0 +1,293 @@ +import torch.nn.functional as F + +import torch +import json + +# Setting the seed for Torch +import yaml + +from fltk.nets import Cifar10CNN, FashionMNISTCNN, Cifar100ResNet, FashionMNISTResNet, Cifar10ResNet, Cifar100VGG + +SEED = 1 +torch.manual_seed(SEED) + +class Arguments: + + def __init__(self, logger): + self.logger = logger + + self.batch_size = 10 + self.test_batch_size = 1000 + self.epochs = 1 + self.lr = 0.001 + self.momentum = 0.9 + self.cuda = False + self.shuffle = False + self.log_interval = 10 + self.kwargs = {} + self.contribution_measurement_round = 1 + self.contribution_measurement_metric = 'Influence' + + self.scheduler_step_size = 50 + self.scheduler_gamma = 0.5 + self.min_lr = 1e-10 + + self.round_worker_selection_strategy = None + self.round_worker_selection_strategy_kwargs = None + + self.save_model = False + self.save_temp_model = False + self.save_epoch_interval = 1 + self.save_model_path = "models" + self.epoch_save_start_suffix = "start" + self.epoch_save_end_suffix = "end" + self.get_poison_effort = 'half' + self.num_workers = 50 + # self.num_poisoned_workers = 10 + + self.rank = 0 + self.world_size = 0 + self.data_sampler = None + self.distributed = False + self.available_nets = { + "Cifar100ResNet" : Cifar100ResNet, + "Cifar100VGG" : Cifar100VGG, + "Cifar10CNN" : Cifar10CNN, + "Cifar10ResNet" : Cifar10ResNet, + "FashionMNISTCNN" : FashionMNISTCNN, + "FashionMNISTResNet" : FashionMNISTResNet + + } + self.net = None + self.set_net_by_name('Cifar10CNN') + # self.net = FashionMNISTCNN + # self.net = Cifar100ResNet + # self.net = FashionMNISTResNet + # self.net = Cifar10ResNet + # self.net = Cifar10ResNet + self.dataset_name = 'cifar10' + self.train_data_loader_pickle_path = { + 'cifar10': 'data_loaders/cifar10/train_data_loader.pickle', + 'fashion-mnist': 'data_loaders/fashion-mnist/train_data_loader.pickle', + 'cifar100': 'data_loaders/cifar100/train_data_loader.pickle', + } + + self.test_data_loader_pickle_path = { + 'cifar10': 'data_loaders/cifar10/test_data_loader.pickle', + 'fashion-mnist': 'data_loaders/fashion-mnist/test_data_loader.pickle', + 'cifar100': 'data_loaders/cifar100/test_data_loader.pickle', + } + + # self.train_data_loader_pickle_path = "data_loaders/cifar10/train_data_loader.pickle" + # self.test_data_loader_pickle_path = "data_loaders/cifar10/test_data_loader.pickle" + + # self.train_data_loader_pickle_path = "data_loaders/fashion-mnist/train_data_loader.pickle" + # self.test_data_loader_pickle_path = "data_loaders/fashion-mnist/test_data_loader.pickle" + + # self.train_data_loader_pickle_path = "data_loaders/cifar100/train_data_loader.pickle" + # self.test_data_loader_pickle_path = "data_loaders/cifar100/test_data_loader.pickle" + + self.loss_function = torch.nn.CrossEntropyLoss + + self.default_model_folder_path = "default_models" + + self.data_path = "data" + + def get_distributed(self): + return self.distributed + + def get_rank(self): + return self.rank + + def get_world_size(self): + return self.world_size + + def set_sampler(self, sampler): + self.data_sampler = sampler + + def get_sampler(self): + return self.data_sampler + + def get_round_worker_selection_strategy(self): + return self.round_worker_selection_strategy + + def get_round_worker_selection_strategy_kwargs(self): + return self.round_worker_selection_strategy_kwargs + + def set_round_worker_selection_strategy_kwargs(self, kwargs): + self.round_worker_selection_strategy_kwargs = kwargs + + def set_client_selection_strategy(self, strategy): + self.round_worker_selection_strategy = strategy + + def get_data_path(self): + return self.data_path + + def get_epoch_save_start_suffix(self): + return self.epoch_save_start_suffix + + def get_epoch_save_end_suffix(self): + return self.epoch_save_end_suffix + + def get_dataloader_list(self): + return list(self.train_data_loader_pickle_path.keys()) + + def get_nets_list(self): + return list(self.available_nets.keys()) + + + def set_train_data_loader_pickle_path(self, path, name='cifar10'): + self.train_data_loader_pickle_path[name] = path + + def get_train_data_loader_pickle_path(self): + return self.train_data_loader_pickle_path[self.dataset_name] + + def set_test_data_loader_pickle_path(self, path, name='cifar10'): + self.test_data_loader_pickle_path[name] = path + + def get_test_data_loader_pickle_path(self): + return self.test_data_loader_pickle_path[self.dataset_name] + + def set_net_by_name(self, name: str): + self.net = self.available_nets[name] + # net_dict = { + # 'cifar10-cnn': Cifar10CNN, + # 'fashion-mnist-cnn': FashionMNISTCNN, + # 'cifar100-resnet': Cifar100ResNet, + # 'fashion-mnist-resnet': FashionMNISTResNet, + # 'cifar10-resnet': Cifar10ResNet, + # 'cifar100-vgg': Cifar100VGG, + # } + # self.net = net_dict[name] + + def get_cuda(self): + return self.cuda + + def get_scheduler_step_size(self): + return self.scheduler_step_size + + def get_scheduler_gamma(self): + return self.scheduler_gamma + + def get_min_lr(self): + return self.min_lr + + def get_default_model_folder_path(self): + return self.default_model_folder_path + + def get_num_epochs(self): + return self.epochs + + def set_num_poisoned_workers(self, num_poisoned_workers): + self.num_poisoned_workers = num_poisoned_workers + + def set_num_workers(self, num_workers): + self.num_workers = num_workers + + def set_model_save_path(self, save_model_path): + self.save_model_path = save_model_path + + def get_logger(self): + return self.logger + + def get_loss_function(self): + return self.loss_function + + def get_net(self): + return self.net + + def get_num_workers(self): + return self.num_workers + + def get_num_poisoned_workers(self): + return self.num_poisoned_workers + + def get_poison_effort(self): + return self.get_poison_effort + + def get_learning_rate(self): + return self.lr + + def get_momentum(self): + return self.momentum + + def get_shuffle(self): + return self.shuffle + + def get_batch_size(self): + return self.batch_size + + def get_test_batch_size(self): + return self.test_batch_size + + def get_log_interval(self): + return self.log_interval + + def get_save_model_folder_path(self): + return self.save_model_path + + def get_learning_rate_from_epoch(self, epoch_idx): + lr = self.lr * (self.scheduler_gamma ** int(epoch_idx / self.scheduler_step_size)) + + if lr < self.min_lr: + self.logger.warning("Updating LR would place it below min LR. Skipping LR update.") + + return self.min_lr + + self.logger.debug("LR: {}".format(lr)) + + return lr + + def get_contribution_measurement_round(self): + return self.contribution_measurement_round + + def get_contribution_measurement_metric(self): + return self.contribution_measurement_metric + + def should_save_model(self, epoch_idx): + """ + Returns true/false models should be saved. + + :param epoch_idx: current training epoch index + :type epoch_idx: int + """ + if not self.save_model: + return False + + if epoch_idx == 1 or epoch_idx % self.save_epoch_interval == 0: + return True + + def log(self): + """ + Log this arguments object to the logger. + """ + self.logger.debug("Arguments: {}", str(self)) + + def __str__(self): + return "\nBatch Size: {}\n".format(self.batch_size) + \ + "Test Batch Size: {}\n".format(self.test_batch_size) + \ + "Epochs: {}\n".format(self.epochs) + \ + "Learning Rate: {}\n".format(self.lr) + \ + "Momentum: {}\n".format(self.momentum) + \ + "CUDA Enabled: {}\n".format(self.cuda) + \ + "Shuffle Enabled: {}\n".format(self.shuffle) + \ + "Log Interval: {}\n".format(self.log_interval) + \ + "Scheduler Step Size: {}\n".format(self.scheduler_step_size) + \ + "Scheduler Gamma: {}\n".format(self.scheduler_gamma) + \ + "Scheduler Minimum Learning Rate: {}\n".format(self.min_lr) + \ + "Client Selection Strategy: {}\n".format(self.round_worker_selection_strategy) + \ + "Client Selection Strategy Arguments: {}\n".format(json.dumps(self.round_worker_selection_strategy_kwargs, indent=4, sort_keys=True)) + \ + "Model Saving Enabled: {}\n".format(self.save_model) + \ + "Model Saving Interval: {}\n".format(self.save_epoch_interval) + \ + "Model Saving Path (Relative): {}\n".format(self.save_model_path) + \ + "Epoch Save Start Prefix: {}\n".format(self.epoch_save_start_suffix) + \ + "Epoch Save End Suffix: {}\n".format(self.epoch_save_end_suffix) + \ + "Number of Clients: {}\n".format(self.num_workers) + \ + "Number of Poisoned Clients: {}\n".format(self.num_poisoned_workers) + \ + "NN: {}\n".format(self.net) + \ + "Train Data Loader Path: {}\n".format(self.train_data_loader_pickle_path) + \ + "Test Data Loader Path: {}\n".format(self.test_data_loader_pickle_path) + \ + "Loss Function: {}\n".format(self.loss_function) + \ + "Default Model Folder Path: {}\n".format(self.default_model_folder_path) + \ + "Data Path: {}\n".format(self.data_path) + \ + "Dataset Name: {}\n".format(self.dataset_name) \ No newline at end of file diff --git a/fltk/util/base_config.py b/fltk/util/base_config.py new file mode 100644 index 00000000..3cc8486a --- /dev/null +++ b/fltk/util/base_config.py @@ -0,0 +1,313 @@ +import torch +import json + +from fltk.nets import Cifar10CNN, FashionMNISTCNN, Cifar100ResNet, FashionMNISTResNet, Cifar10ResNet, Cifar100VGG + +SEED = 1 +torch.manual_seed(SEED) + +class BareConfig: + + def __init__(self): + # self.logger = logger + + self.batch_size = 10 + self.test_batch_size = 1000 + self.epochs = 1 + self.lr = 0.001 + self.momentum = 0.9 + self.cuda = False + self.shuffle = False + self.log_interval = 10 + self.kwargs = {} + self.contribution_measurement_round = 1 + self.contribution_measurement_metric = 'Influence' + + self.scheduler_step_size = 50 + self.scheduler_gamma = 0.5 + self.min_lr = 1e-10 + + self.round_worker_selection_strategy = None + self.round_worker_selection_strategy_kwargs = None + + self.save_model = False + self.save_temp_model = False + self.save_epoch_interval = 1 + self.save_model_path = "models" + self.epoch_save_start_suffix = "start" + self.epoch_save_end_suffix = "end" + self.get_poison_effort = 'half' + self.num_workers = 50 + # self.num_poisoned_workers = 10 + + self.federator_host = '0.0.0.0' + self.rank = 0 + self.world_size = 0 + self.data_sampler = None + self.distributed = False + self.available_nets = { + "Cifar100ResNet": Cifar100ResNet, + "Cifar100VGG": Cifar100VGG, + "Cifar10CNN": Cifar10CNN, + "Cifar10ResNet": Cifar10ResNet, + "FashionMNISTCNN": FashionMNISTCNN, + "FashionMNISTResNet": FashionMNISTResNet + + } + self.net = None + self.set_net_by_name('Cifar10CNN') + self.dataset_name = 'cifar10' + self.train_data_loader_pickle_path = { + 'cifar10': 'data_loaders/cifar10/train_data_loader.pickle', + 'fashion-mnist': 'data_loaders/fashion-mnist/train_data_loader.pickle', + 'cifar100': 'data_loaders/cifar100/train_data_loader.pickle', + } + + self.test_data_loader_pickle_path = { + 'cifar10': 'data_loaders/cifar10/test_data_loader.pickle', + 'fashion-mnist': 'data_loaders/fashion-mnist/test_data_loader.pickle', + 'cifar100': 'data_loaders/cifar100/test_data_loader.pickle', + } + self.loss_function = torch.nn.CrossEntropyLoss + self.default_model_folder_path = "default_models" + self.data_path = "data" + + + ########### + # Methods # + ########### + + def merge_yaml(self, cfg = {}): + """ + total_epochs: 20 + epochs_per_cycle: 2 + wait_for_clients: true + net: Cifar10CNN + dataset: cifar10 + experiment_prefix: 'experiment' + output_location: 'output' + tensor_board_active: true + :param yaml_config: + :return: + """ + if 'total_epochs' in cfg: + self.epochs = cfg['total_epochs'] + if 'epochs_per_cycle' in cfg: + self.epochs_per_cycle = cfg['epochs_per_cycle'] + if 'wait_for_clients' in cfg: + self.wait_for_clients = cfg['wait_for_clients'] + if 'net' in cfg: + self.set_net_by_name(cfg['net']) + if 'dataset' in cfg: + self.dataset_name = cfg['dataset'] + if 'experiment_prefix' in cfg: + self.experiment_prefix = cfg['experiment_prefix'] + if 'output_location' in cfg: + self.output_location = cfg['output_location'] + if 'tensor_board_active' in cfg: + self.tensor_board_active = cfg['tensor_board_active'] + if 'clients_per_round' in cfg: + self.clients_per_round = cfg['clients_per_round'] + if 'system' in cfg: + if 'clients' in cfg['system']: + if 'amount' in cfg['system']['clients']: + self.world_size = cfg['system']['clients']['amount'] + 1 + + if 'system' in cfg: + if 'federator' in cfg['system']: + if 'hostname' in cfg['system']['federator']: + self.federator_host = cfg['system']['federator']['hostname'] + + + def init_logger(self, logger): + self.logger = logger + + def get_distributed(self): + return self.distributed + + def get_rank(self): + return self.rank + + def get_world_size(self): + return self.world_size + + def set_sampler(self, sampler): + self.data_sampler = sampler + + def get_sampler(self): + return self.data_sampler + + def get_round_worker_selection_strategy(self): + return self.round_worker_selection_strategy + + def get_round_worker_selection_strategy_kwargs(self): + return self.round_worker_selection_strategy_kwargs + + def set_round_worker_selection_strategy_kwargs(self, kwargs): + self.round_worker_selection_strategy_kwargs = kwargs + + def set_client_selection_strategy(self, strategy): + self.round_worker_selection_strategy = strategy + + def get_data_path(self): + return self.data_path + + def get_epoch_save_start_suffix(self): + return self.epoch_save_start_suffix + + def get_epoch_save_end_suffix(self): + return self.epoch_save_end_suffix + + def get_dataloader_list(self): + return list(self.train_data_loader_pickle_path.keys()) + + def get_nets_list(self): + return list(self.available_nets.keys()) + + def set_train_data_loader_pickle_path(self, path, name='cifar10'): + self.train_data_loader_pickle_path[name] = path + + def get_train_data_loader_pickle_path(self): + return self.train_data_loader_pickle_path[self.dataset_name] + + def set_test_data_loader_pickle_path(self, path, name='cifar10'): + self.test_data_loader_pickle_path[name] = path + + def get_test_data_loader_pickle_path(self): + return self.test_data_loader_pickle_path[self.dataset_name] + + def set_net_by_name(self, name: str): + self.net = self.available_nets[name] + + def get_cuda(self): + return self.cuda + + def get_scheduler_step_size(self): + return self.scheduler_step_size + + def get_scheduler_gamma(self): + return self.scheduler_gamma + + def get_min_lr(self): + return self.min_lr + + def get_default_model_folder_path(self): + return self.default_model_folder_path + + def get_num_epochs(self): + return self.epochs + + def set_num_poisoned_workers(self, num_poisoned_workers): + self.num_poisoned_workers = num_poisoned_workers + + def set_num_workers(self, num_workers): + self.num_workers = num_workers + + def set_model_save_path(self, save_model_path): + self.save_model_path = save_model_path + + def get_logger(self): + return self.logger + + def get_loss_function(self): + return self.loss_function + + def get_net(self): + return self.net + + def get_num_workers(self): + return self.num_workers + + def get_num_poisoned_workers(self): + return self.num_poisoned_workers + + def get_poison_effort(self): + return self.get_poison_effort + + def get_learning_rate(self): + return self.lr + + def get_momentum(self): + return self.momentum + + def get_shuffle(self): + return self.shuffle + + def get_batch_size(self): + return self.batch_size + + def get_test_batch_size(self): + return self.test_batch_size + + def get_log_interval(self): + return self.log_interval + + def get_save_model_folder_path(self): + return self.save_model_path + + def get_learning_rate_from_epoch(self, epoch_idx): + lr = self.lr * (self.scheduler_gamma ** int(epoch_idx / self.scheduler_step_size)) + + if lr < self.min_lr: + self.logger.warning("Updating LR would place it below min LR. Skipping LR update.") + + return self.min_lr + + self.logger.debug("LR: {}".format(lr)) + + return lr + + def get_contribution_measurement_round(self): + return self.contribution_measurement_round + + def get_contribution_measurement_metric(self): + return self.contribution_measurement_metric + + def should_save_model(self, epoch_idx): + """ + Returns true/false models should be saved. + + :param epoch_idx: current training epoch index + :type epoch_idx: int + """ + if not self.save_model: + return False + + if epoch_idx == 1 or epoch_idx % self.save_epoch_interval == 0: + return True + + def log(self): + """ + Log this arguments object to the logger. + """ + self.logger.debug("Arguments: {}", str(self)) + + def __str__(self): + return "\nBatch Size: {}\n".format(self.batch_size) + \ + "Test Batch Size: {}\n".format(self.test_batch_size) + \ + "Epochs: {}\n".format(self.epochs) + \ + "Learning Rate: {}\n".format(self.lr) + \ + "Momentum: {}\n".format(self.momentum) + \ + "CUDA Enabled: {}\n".format(self.cuda) + \ + "Shuffle Enabled: {}\n".format(self.shuffle) + \ + "Log Interval: {}\n".format(self.log_interval) + \ + "Scheduler Step Size: {}\n".format(self.scheduler_step_size) + \ + "Scheduler Gamma: {}\n".format(self.scheduler_gamma) + \ + "Scheduler Minimum Learning Rate: {}\n".format(self.min_lr) + \ + "Client Selection Strategy: {}\n".format(self.round_worker_selection_strategy) + \ + "Client Selection Strategy Arguments: {}\n".format( + json.dumps(self.round_worker_selection_strategy_kwargs, indent=4, sort_keys=True)) + \ + "Model Saving Enabled: {}\n".format(self.save_model) + \ + "Model Saving Interval: {}\n".format(self.save_epoch_interval) + \ + "Model Saving Path (Relative): {}\n".format(self.save_model_path) + \ + "Epoch Save Start Prefix: {}\n".format(self.epoch_save_start_suffix) + \ + "Epoch Save End Suffix: {}\n".format(self.epoch_save_end_suffix) + \ + "Number of Clients: {}\n".format(self.num_workers) + \ + "Number of Poisoned Clients: {}\n".format(self.num_poisoned_workers) + \ + "NN: {}\n".format(self.net) + \ + "Train Data Loader Path: {}\n".format(self.train_data_loader_pickle_path) + \ + "Test Data Loader Path: {}\n".format(self.test_data_loader_pickle_path) + \ + "Loss Function: {}\n".format(self.loss_function) + \ + "Default Model Folder Path: {}\n".format(self.default_model_folder_path) + \ + "Data Path: {}\n".format(self.data_path) + \ + "Dataset Name: {}\n".format(self.dataset_name) \ No newline at end of file diff --git a/fltk/util/data_loader_utils.py b/fltk/util/data_loader_utils.py new file mode 100644 index 00000000..0244e0d2 --- /dev/null +++ b/fltk/util/data_loader_utils.py @@ -0,0 +1,90 @@ +import numpy +from torch.utils.data import DataLoader + +import os +import pickle +import random +from ..datasets import Dataset + +def generate_data_loaders_from_distributed_dataset(distributed_dataset, batch_size): + """ + Generate data loaders from a distributed dataset. + + :param distributed_dataset: Distributed dataset + :type distributed_dataset: list(tuple) + :param batch_size: batch size for data loader + :type batch_size: int + """ + data_loaders = [] + for worker_training_data in distributed_dataset: + data_loaders.append(Dataset.get_data_loader_from_data(batch_size, worker_training_data[0], worker_training_data[1], shuffle=True)) + + return data_loaders + +def load_train_data_loader(logger, args): + """ + Loads the training data DataLoader object from a file if available. + + :param logger: loguru.Logger + :param args: Arguments + """ + if os.path.exists(args.get_train_data_loader_pickle_path()): + dl = load_data_loader_from_file(logger, args.get_train_data_loader_pickle_path()) + return dl + else: + logger.error("Couldn't find train data loader stored in file") + + raise FileNotFoundError("Couldn't find train data loader stored in file") + +def generate_train_loader(args, dataset): + train_dataset = dataset.get_train_dataset() + X, Y = shuffle_data(args, train_dataset) + + return dataset.get_data_loader_from_data(args.get_batch_size(), X, Y) + +def load_test_data_loader(logger, args): + """ + Loads the test data DataLoader object from a file if available. + + :param logger: loguru.Logger + :param args: Arguments + """ + if os.path.exists(args.get_test_data_loader_pickle_path()): + return load_data_loader_from_file(logger, args.get_test_data_loader_pickle_path()) + else: + logger.error("Couldn't find test data loader stored in file") + + raise FileNotFoundError("Couldn't find train data loader stored in file") + +def load_data_loader_from_file(logger, filename) -> DataLoader: + """ + Loads DataLoader object from a file if available. + + :param logger: loguru.Logger + :param filename: string + """ + logger.info("Loading data loader from file: {}".format(filename)) + + with open(filename, "rb") as f: + return load_saved_data_loader(f) + +def generate_test_loader(args, dataset): + test_dataset = dataset.get_test_dataset() + X, Y = shuffle_data(args, test_dataset) + + return dataset.get_data_loader_from_data(args.get_test_batch_size(), X, Y) + +def shuffle_data(args, dataset): + data = list(zip(dataset[0], dataset[1])) + random.shuffle(data) + X, Y = zip(*data) + X = numpy.asarray(X) + Y = numpy.asarray(Y) + + return X, Y + +def load_saved_data_loader(file_obj): + return pickle.load(file_obj) + +def save_data_loader_to_file(data_loader, file_obj): + pickle.dump(data_loader, file_obj) diff --git a/fltk/util/default_models.py b/fltk/util/default_models.py new file mode 100644 index 00000000..eda04fb3 --- /dev/null +++ b/fltk/util/default_models.py @@ -0,0 +1,47 @@ + +import os +import torch +import logging +logging.basicConfig(level=logging.DEBUG) +from fltk.nets import Cifar10CNN, FashionMNISTCNN, Cifar100ResNet, FashionMNISTResNet, Cifar10ResNet, Cifar100VGG +from fltk.util.arguments import Arguments + +if __name__ == '__main__': + args = Arguments(logging) + if not os.path.exists(args.get_default_model_folder_path()): + os.mkdir(args.get_default_model_folder_path()) + + # --------------------------------- + # ----------- Cifar10CNN ---------- + # --------------------------------- + full_save_path = os.path.join(args.get_default_model_folder_path(), "Cifar10CNN.model") + torch.save(Cifar10CNN().state_dict(), full_save_path) + # --------------------------------- + # --------- Cifar10ResNet --------- + # --------------------------------- + full_save_path = os.path.join(args.get_default_model_folder_path(), "Cifar10ResNet.model") + torch.save(Cifar10ResNet().state_dict(), full_save_path) + + # --------------------------------- + # -------- FashionMNISTCNN -------- + # --------------------------------- + full_save_path = os.path.join(args.get_default_model_folder_path(), "FashionMNISTCNN.model") + torch.save(FashionMNISTCNN().state_dict(), full_save_path) + + # --------------------------------- + # ------ FashionMNISTResNet ------- + # --------------------------------- + full_save_path = os.path.join(args.get_default_model_folder_path(), "FashionMNISTResNet.model") + torch.save(FashionMNISTResNet().state_dict(), full_save_path) + + # --------------------------------- + # ----------- Cifar100CNN --------- + # --------------------------------- + full_save_path = os.path.join(args.get_default_model_folder_path(), "Cifar100ResNet.model") + torch.save(Cifar100ResNet().state_dict(), full_save_path) + + # --------------------------------- + # ----------- Cifar100VGG --------- + # --------------------------------- + full_save_path = os.path.join(args.get_default_model_folder_path(), "Cifar100VGG.model") + torch.save(Cifar100VGG().state_dict(), full_save_path) \ No newline at end of file diff --git a/fltk/util/fed_avg.py b/fltk/util/fed_avg.py new file mode 100644 index 00000000..e60d1684 --- /dev/null +++ b/fltk/util/fed_avg.py @@ -0,0 +1,12 @@ +def average_nn_parameters(parameters): + """ + Averages passed parameters. + + :param parameters: nn model named parameters + :type parameters: list + """ + new_params = {} + for name in parameters[0].keys(): + new_params[name] = sum([param[name].data for param in parameters]) / len(parameters) + + return new_params diff --git a/fltk/util/generate_data_distribution.py b/fltk/util/generate_data_distribution.py new file mode 100644 index 00000000..34135c0b --- /dev/null +++ b/fltk/util/generate_data_distribution.py @@ -0,0 +1,70 @@ +import pathlib +import os +import logging + +from fltk.datasets import CIFAR10Dataset, FashionMNISTDataset, CIFAR100Dataset +from fltk.util.arguments import Arguments +from fltk.util.data_loader_utils import generate_train_loader, generate_test_loader, save_data_loader_to_file + +logging.basicConfig(level=logging.DEBUG) + + +if __name__ == '__main__': + args = Arguments(logging) + + # --------------------------------- + # ------------ CIFAR10 ------------ + # --------------------------------- + dataset = CIFAR10Dataset(args) + TRAIN_DATA_LOADER_FILE_PATH = "data_loaders/cifar10/train_data_loader.pickle" + TEST_DATA_LOADER_FILE_PATH = "data_loaders/cifar10/test_data_loader.pickle" + + if not os.path.exists("data_loaders/cifar10"): + pathlib.Path("data_loaders/cifar10").mkdir(parents=True, exist_ok=True) + + train_data_loader = generate_train_loader(args, dataset) + test_data_loader = generate_test_loader(args, dataset) + + with open(TRAIN_DATA_LOADER_FILE_PATH, "wb") as f: + save_data_loader_to_file(train_data_loader, f) + + with open(TEST_DATA_LOADER_FILE_PATH, "wb") as f: + save_data_loader_to_file(test_data_loader, f) + + # --------------------------------- + # --------- Fashion-MNIST --------- + # --------------------------------- + dataset = FashionMNISTDataset(args) + TRAIN_DATA_LOADER_FILE_PATH = "data_loaders/fashion-mnist/train_data_loader.pickle" + TEST_DATA_LOADER_FILE_PATH = "data_loaders/fashion-mnist/test_data_loader.pickle" + + if not os.path.exists("data_loaders/fashion-mnist"): + pathlib.Path("data_loaders/fashion-mnist").mkdir(parents=True, exist_ok=True) + + train_data_loader = generate_train_loader(args, dataset) + test_data_loader = generate_test_loader(args, dataset) + + with open(TRAIN_DATA_LOADER_FILE_PATH, "wb") as f: + save_data_loader_to_file(train_data_loader, f) + + with open(TEST_DATA_LOADER_FILE_PATH, "wb") as f: + save_data_loader_to_file(test_data_loader, f) + + # --------------------------------- + # ------------ CIFAR100 ----------- + # --------------------------------- + dataset = CIFAR100Dataset(args) + TRAIN_DATA_LOADER_FILE_PATH = "data_loaders/cifar100/train_data_loader.pickle" + TEST_DATA_LOADER_FILE_PATH = "data_loaders/cifar100/test_data_loader.pickle" + + if not os.path.exists("data_loaders/cifar100"): + pathlib.Path("data_loaders/cifar100").mkdir(parents=True, exist_ok=True) + + train_data_loader = generate_train_loader(args, dataset) + test_data_loader = generate_test_loader(args, dataset) + + with open(TRAIN_DATA_LOADER_FILE_PATH, "wb") as f: + save_data_loader_to_file(train_data_loader, f) + + with open(TEST_DATA_LOADER_FILE_PATH, "wb") as f: + save_data_loader_to_file(test_data_loader, f) diff --git a/fltk/util/iid_equal.py b/fltk/util/iid_equal.py new file mode 100644 index 00000000..c47bcc16 --- /dev/null +++ b/fltk/util/iid_equal.py @@ -0,0 +1,19 @@ +import torch + +def distribute_batches_equally(train_data_loader, num_workers): + """ + Gives each worker the same number of batches of training data. + + :param train_data_loader: Training data loader + :type train_data_loader: torch.utils.data.DataLoader + :param num_workers: number of workers + :type num_workers: int + """ + distributed_dataset = [[] for i in range(num_workers)] + + for batch_idx, (data, target) in enumerate(train_data_loader): + worker_idx = batch_idx % num_workers + + distributed_dataset[worker_idx].append((data, target)) + + return distributed_dataset diff --git a/fltk/util/label_replacement.py b/fltk/util/label_replacement.py new file mode 100644 index 00000000..1b09d18c --- /dev/null +++ b/fltk/util/label_replacement.py @@ -0,0 +1,12 @@ +def apply_class_label_replacement(X, Y, replacement_method): + """ + Replace class labels using the replacement method + + :param X: data features + :type X: numpy.Array() + :param Y: data labels + :type Y: numpy.Array() + :param replacement_method: Method to update targets + :type replacement_method: method + """ + return (X, replacement_method(Y, set(Y))) diff --git a/fltk/util/log.py b/fltk/util/log.py new file mode 100644 index 00000000..a80661a3 --- /dev/null +++ b/fltk/util/log.py @@ -0,0 +1,9 @@ +import logging + +from torch.distributed import rpc + +class FLLogger: + @staticmethod + @rpc.functions.async_execution + def log(arg1, node_id, log_line, report_time): + logging.info(f'[{node_id}: {report_time}]: {log_line}') \ No newline at end of file diff --git a/fltk/util/results.py b/fltk/util/results.py new file mode 100644 index 00000000..af560479 --- /dev/null +++ b/fltk/util/results.py @@ -0,0 +1,20 @@ +from dataclasses import dataclass +from typing import Any + +@dataclass +class EpochData: + epoch_id: int + duration_train: int + duration_test: int + loss_train: float + accuracy: float + loss: float + class_precision: Any + class_recall: Any + client_id: str = None + + def to_csv_line(self): + delimeter = ',' + values = self.__dict__.values() + values = [str(x) for x in values] + return delimeter.join(values) diff --git a/fltk/util/tensor_converter.py b/fltk/util/tensor_converter.py new file mode 100644 index 00000000..f5f7abee --- /dev/null +++ b/fltk/util/tensor_converter.py @@ -0,0 +1,20 @@ +import numpy + +def convert_distributed_data_into_numpy(distributed_dataset): + """ + Converts a distributed dataset (returned by a data distribution method) from Tensors into numpy arrays. + + :param distributed_dataset: Distributed dataset + :type distributed_dataset: list(tuple) + """ + converted_distributed_dataset = [] + + for worker_idx in range(len(distributed_dataset)): + worker_training_data = distributed_dataset[worker_idx] + + X_ = numpy.array([tensor.numpy() for batch in worker_training_data for tensor in batch[0]]) + Y_ = numpy.array([tensor.numpy() for batch in worker_training_data for tensor in batch[1]]) + + converted_distributed_dataset.append((X_, Y_)) + + return converted_distributed_dataset diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..55bbcd75 --- /dev/null +++ b/setup.py @@ -0,0 +1,35 @@ +from setuptools import setup, find_packages +from fltk import __version__ +setup( + name="fltk", + author="Bart Cox", + author_email="b.a.cox@tudelft.nl", + maintainer="Bart Cox", + maintainer_email="b.a.cox@tudelft.nl", + description="Federated Learning Toolkit", + packages=find_packages(), + version=__version__, + entry_points={ + "console_scripts": [ + "fltk = fltk.__main__:main", + ] + }, + include_package_data=True, + data_files=[('share/tudelft/fltk/configs', ['configs/experiment.yaml'])], + install_requires= + [ + 'tqdm==4.49.0', + 'scikit-learn==0.23.2', + 'pandas==1.1.2', + 'numpy>=1.20.0', + 'torch==1.7.1', + 'torchvision==0.8.2', + 'scipy==1.4.1', + 'h5py==2.10.0', + 'requests', + 'pyyaml', + 'torchsummary', + 'dataclass-csv', + 'tensorboard' + ] +)