From 74eccb2f0dc6b3c19350c62247babcd6c5a15526 Mon Sep 17 00:00:00 2001 From: Jeroen Galjaard Date: Tue, 31 Aug 2021 13:31:27 +0200 Subject: [PATCH] Make execution compatible with PytorchJobss --- .env | 2 + configs/tasks/example_arrival_config.json | 9 +- fltk/__main__.py | 22 +- fltk/client.py | 164 +++++++++------ fltk/datasets/__init__.py | 1 - fltk/datasets/cifar10.py | 45 ++-- fltk/datasets/cifar100.py | 7 +- fltk/datasets/data_distribution/__init__.py | 1 - fltk/datasets/data_distribution/iid_equal.py | 17 -- fltk/datasets/dataset.py | 203 +++++++++---------- fltk/datasets/distributed/__init__.py | 4 - fltk/datasets/distributed/cifar10.py | 58 ------ fltk/datasets/distributed/cifar100.py | 90 -------- fltk/datasets/distributed/dataset.py | 43 ---- fltk/datasets/distributed/fashion_mnist.py | 62 ------ fltk/datasets/fashion_mnist.py | 7 +- fltk/launch.py | 68 +++---- fltk/nets/cifar_100_resnet.py | 7 +- fltk/nets/cifar_10_cnn.py | 8 +- fltk/nets/cifar_10_resnet.py | 2 +- fltk/nets/util/utils.py | 7 +- fltk/schedulers/min_lr_step.py | 7 +- fltk/util/arguments.py | 162 +-------------- fltk/util/config/__init__.py | 2 + fltk/util/config/arguments.py | 111 ++++++++++ fltk/util/config/base_config.py | 66 +++--- 26 files changed, 433 insertions(+), 742 deletions(-) delete mode 100644 fltk/datasets/data_distribution/__init__.py delete mode 100644 fltk/datasets/data_distribution/iid_equal.py delete mode 100644 fltk/datasets/distributed/__init__.py delete mode 100644 fltk/datasets/distributed/cifar10.py delete mode 100644 fltk/datasets/distributed/cifar100.py delete mode 100644 fltk/datasets/distributed/dataset.py delete mode 100644 fltk/datasets/distributed/fashion_mnist.py create mode 100644 fltk/util/config/arguments.py diff --git a/.env b/.env index e69de29b..b21ef940 100644 --- a/.env +++ b/.env @@ -0,0 +1,2 @@ +MASTER_ADDR=localhost +MASTER_PORT=5000 diff --git a/configs/tasks/example_arrival_config.json b/configs/tasks/example_arrival_config.json index 0ce6fab3..6ab5d10d 100644 --- a/configs/tasks/example_arrival_config.json +++ b/configs/tasks/example_arrival_config.json @@ -12,10 +12,11 @@ "action": "train" }, "hyperParameters": { - "batchSize": "128", - "maxEpoch": "5", - "learningRate": "0.01", - "learningrateDecay": "0.0002" + "batchSize": 128, + "maxEpoch": 10, + "learningRate": 0.01, + "learningrateDecay": 0.0002, + "decayStepSize": 50 }, "classProbability": 0.1, "priorities": [ diff --git a/fltk/__main__.py b/fltk/__main__.py index bfab5dc1..0c923c97 100644 --- a/fltk/__main__.py +++ b/fltk/__main__.py @@ -5,23 +5,15 @@ from dotenv import load_dotenv from fltk.launch import launch_client, launch_orchestrator +from fltk.util.config.arguments import create_client_parser, create_cluster_parser, extract_learning_parameters from fltk.util.config.base_config import BareConfig def main(): parser = ArgumentParser(description='Experiment launcher for the Federated Learning Testbed') subparsers = parser.add_subparsers(dest="mode") - - cluster_parser = subparsers.add_parser('cluster') - cluster_parser.add_argument('config', type=str) - - client_parser = subparsers.add_parser('client') - # Option to override rank, by default provided by PytorchJob in Kubeflow. - client_parser.add_argument('--rank', type=int) - # Option to override default nic, by default is 'eth0' in containers. - client_parser.add_argument('--nic', type=str, default=None) - # Option to override 'master' host name, by default provided by PytorchJob in Kubeflow. - client_parser.add_argument('--host', type=str, default=None) + create_client_parser(subparsers) + create_cluster_parser(subparsers) """ To create your own parser mirror the construction in the 'client_parser' object. @@ -39,6 +31,8 @@ def main(): elif arguments.mode == 'client': logging.info("Starting in client mode") client_start(arguments, config) + logging.info("Stopping client...") + exit(0) else: print("Provided mode is not supported...") exit(1) @@ -49,13 +43,13 @@ def cluster_start(args: Namespace, configuration: BareConfig): Function to to launch Orchestrator for execution with provided configurations. Currently this assumes that a single Orchestrator is started that manages all the resources in the cluster. """ - logging.info("Starting in ") launch_orchestrator(args=args, config=configuration) def client_start(args: Namespace, configuration: BareConfig): - logging.info("Starting in client mode.") - launch_client(args=args, config=configuration) + learning_params = extract_learning_parameters(args) + task_id = args.task_id + launch_client(task_id, config=configuration, learning_params=learning_params) if __name__ == "__main__": diff --git a/fltk/client.py b/fltk/client.py index a9d9a379..166733d6 100644 --- a/fltk/client.py +++ b/fltk/client.py @@ -1,6 +1,7 @@ import datetime import logging from pathlib import Path +from typing import Union, List import torch import torch.distributed as dist @@ -9,48 +10,73 @@ from fltk.nets.util.evaluation import calculate_class_precision, calculate_class_recall from fltk.nets.util.utils import save_model, load_model_from_file +from fltk.schedulers import MinCapableStepLR +from fltk.util.config.arguments import LearningParameters from fltk.util.config.base_config import BareConfig from fltk.util.results import EpochData -logging.basicConfig(level=logging.DEBUG) - class Client: - def __init__(self, rank, task_id, config: BareConfig = None): + def __init__(self, rank: int, task_id: int, world_size: int, config: BareConfig = None, + learning_params: LearningParameters = None): """ - @param rank: - @type rank: - @param task_id: - @type task_id: + @param rank: PyTorch rank provided by KubeFlow setup. + @type rank: int + @param task_id: String identifier representing the UID of the training task + @type task_id: str @param config: @type config: + @param learning_params: + @type learning_params: LearningParameters """ self._logger = logging.getLogger(f'Client-{rank}-{task_id}') self._logger.info("Initializing learning client") self._logger.debug(f"Configuration received: {config}") self._id = rank + self._world_size = world_size self._task_id = task_id - self.args = config + + self.config = config + self.learning_params = learning_params + + # Create model and dataset + self.loss_function = self.learning_params.get_loss()() + self.dataset = self.learning_params.get_dataset_class()(self.config, self.learning_params, self._id, + self._world_size) + self.model = self.learning_params.get_model_class()() self.device = self._init_device() - self.dataset = self.args.get_dataset() - self.model = self.args.get_model() - self.loss_function = self.args.get_loss_function() - self.optimizer: torch.nn.Module - self.scheduler = self.args.get_scheduler() - def prepare_learner(self, distributed: bool = False, backend=None): + self.optimizer = None + self.scheduler = None + + def prepare_learner(self, distributed: bool = False, backend: Union[str, dist.Backend] = None): + """ + Function to prepare the learner, i.e. load all the necessary data into memory. + @param distributed: + @type distributed: + @param backend: + @type backend: + @return: + @rtype: + """ self._logger.info(f"Preparing learner model with distributed={distributed}") self.model.to(self.device) if distributed: dist.init_process_group(backend) self.model = torch.nn.parallel.DistributedDataParallel(self.model) - self.optimizer = self.args.get_optimizer(self.model.parameters(), - self.args) + # Currently it is assumed to use an SGD optimizer. **kwargs need to be used to launch this properly + self.optimizer = self.learning_params.get_optimizer()(self.model.parameters(), + lr=self.learning_params.learning_rate, + momentum=0.9) + self.scheduler = MinCapableStepLR(self.optimizer, + self.config.get_scheduler_step_size(), + self.config.get_scheduler_gamma(), + self.config.get_min_lr()) - def _init_device(self, cuda_device: torch.device = torch.device('cuda:0')): + def _init_device(self, cuda_device: torch.device = torch.device('cpu')): """ Initialize Torch to use available devices. Either prepares CUDA device, or disables CUDA during execution to run with CPU only inference/training. @@ -60,7 +86,7 @@ def _init_device(self, cuda_device: torch.device = torch.device('cuda:0')): @return: @rtype: """ - if self.args.cuda and torch.cuda.is_available(): + if self.config.cuda_enabled() and torch.cuda.is_available(): return torch.device(cuda_device) else: # Force usage of CPU @@ -75,42 +101,52 @@ def load_default_model(self): """ model_file = Path(f'{self.model.__name__}.model') - default_model_path = Path(self.args.get_default_model_folder_path()).joinpath(model_file) + default_model_path = Path(self.config.get_default_model_folder_path()).joinpath(model_file) load_model_from_file(self.model, default_model_path) - def train(self, epoch): + def train(self, epoch, log_interval: int = 100): """ + Function to start training, regardless of DistributedDataParallel (DPP) or local training. DDP will account for + synchronization of nodes. If extension requires to make use of torch.distributed.send and torch.distributed.recv + (for example for customized training or Federated Learning), additional torch.distributed.barrier calls might + be required to launch. + :param epoch: Current epoch # :type epoch: int + @param log_interval: Iteration interval at which to log. + @type log_interval: int """ - running_loss = 0.0 final_running_loss = 0.0 - - for i, (inputs, labels) in enumerate(self.dataset.get_train_loader(), 0): - inputs, labels = inputs.to(self.device), labels.to(self.device) - + self.model.train() + for i, (inputs, labels) in enumerate(self.dataset.get_train_loader()): # zero the parameter gradients self.optimizer.zero_grad() - # forward + backward + optimize - + # Forward through the net to train outputs = self.model(inputs) + # Calculate the loss loss = self.loss_function(outputs, labels) + + # Update weights, DPP will account for synchronization of the weights. loss.backward() self.optimizer.step() + running_loss += float(loss.detach().item()) - del loss, outputs - if i % self.args.get_log_interval() == 0: - self.args.get_logger().info( - '[%d, %5d] loss: %.3f' % (epoch, i, running_loss / self.args.get_log_interval())) - final_running_loss = running_loss / self.args.get_log_interval() + if i % log_interval == 0: + self._logger.info('[%d, %5d] loss: %.3f' % (epoch, i, running_loss / log_interval)) + final_running_loss = running_loss / log_interval running_loss = 0.0 self.scheduler.step() - # save model - if self.args.should_save_model(epoch): - self.save_model(epoch, self.args.get_epoch_save_end_suffix()) + + # Save model + if self.config.should_save_model(epoch): + # Note that currently this is not supported in the Framework. However, the creation of a ReadWriteMany + # PVC in the deployment charts, and mounting this in the appropriate directory, would resolve this issue. + # This can be done by copying the setup of the PVC used to record the TensorBoard information (used by + # logger created by the rank==0 node during the training process (i.e. to keep track of process). + self.save_model(epoch) return final_running_loss @@ -124,8 +160,10 @@ def test(self): with torch.no_grad(): for (images, labels) in self.dataset.get_test_loader(): images, labels = images.to(self.device), labels.to(self.device) - + dist.reduce outputs = self.model(images) + # Currently the FLTK framework assumes that a classification task is performed (hence max). + # Future work may add support for non-classification training. _, predicted = torch.max(outputs.data, 1) total += labels.size(0) correct += (predicted == labels).sum().item() @@ -141,50 +179,48 @@ def test(self): class_precision = calculate_class_precision(confusion_mat) class_recall = calculate_class_recall(confusion_mat) - self.args.get_logger().debug('Test set: Accuracy: {}/{} ({:.0f}%)'.format(correct, total, accuracy)) - self.args.get_logger().debug('Test set: Loss: {}'.format(loss)) - self.args.get_logger().debug("Classification Report:\n" + classification_report(targets_, pred_)) - self.args.get_logger().debug("Confusion Matrix:\n" + str(confusion_mat)) - self.args.get_logger().debug("Class precision: {}".format(str(class_precision))) - self.args.get_logger().debug("Class recall: {}".format(str(class_recall))) + self._logger.debug('Test set: Accuracy: {}/{} ({:.0f}%)'.format(correct, total, accuracy)) + self._logger.debug('Test set: Loss: {}'.format(loss)) + self._logger.debug("Classification Report:\n" + classification_report(targets_, pred_)) + self._logger.debug("Confusion Matrix:\n" + str(confusion_mat)) + self._logger.debug("Class precision: {}".format(str(class_precision))) + self._logger.debug("Class recall: {}".format(str(class_recall))) return accuracy, loss, class_precision, class_recall - def run_epochs(self): + def run_epochs(self) -> List[EpochData]: """ - Function to run epochs wit + Function to run epochs with """ - num_epochs = self.config.epochs + max_epoch = self.learning_params.max_epoch + 1 start_time_train = datetime.datetime.now() - # Make epochs 1 index. - for epoch in range(1, num_epochs + 1): - loss = self.train(epoch) + epoch_results = [] + for epoch in range(1, max_epoch): + train_loss = self.train(epoch) if self._id == 0: # Let only the 'master node' work on training. Possibly DDP can be used # to have a distributed test loader as well to speed up (would require - # aggregration of data. + # aggregation of data. # Example https://github.com/fabio-deep/Distributed-Pytorch-Boilerplate/blob/0206247150720ca3e287e9531cb20ef68dc9a15f/src/datasets.py#L271-L303. - accuracy, loss, class_precision, class_recall = self.test() + elapsed_time_train = datetime.datetime.now() - start_time_train + train_time_ms = int(elapsed_time_train.total_seconds() * 1000) - elapsed_time_train = datetime.datetime.now() - start_time_train - train_time_ms = int(elapsed_time_train.total_seconds() * 1000) + start_time_test = datetime.datetime.now() + accuracy, test_loss, class_precision, class_recall = self.test() - start_time_test = datetime.datetime.now() - accuracy, test_loss, class_precision, class_recall = self.test() - elapsed_time_test = datetime.datetime.now() - start_time_test - test_time_ms = int(elapsed_time_test.total_seconds() * 1000) + elapsed_time_test = datetime.datetime.now() - start_time_test + test_time_ms = int(elapsed_time_test.total_seconds() * 1000) - data = EpochData(train_time_ms, test_time_ms, loss, accuracy, test_loss, class_precision, - class_recall, client_id=self._id) + data = EpochData(train_time_ms, test_time_ms, train_loss, accuracy, test_loss, class_precision, + class_recall, client_id=self._id) + epoch_results.append(data) + return epoch_results - def save_model(self, epoch, suffix): + def save_model(self, epoch): """ Move function to utils directory. Saves the model if necessary. """ - self.args.get_logger().debug(f"Saving model to flat file storage. Saved at epoch #{epoch}") - - save_model(self.model, epoch, self.args.get_save_model_folder_path(), self.args) - - + self._logger.debug(f"Saving model to flat file storage. Saved at epoch #{epoch}") + save_model(self.model, self.config.get_save_model_folder_path(), epoch) diff --git a/fltk/datasets/__init__.py b/fltk/datasets/__init__.py index e0abbd66..9f6677e8 100644 --- a/fltk/datasets/__init__.py +++ b/fltk/datasets/__init__.py @@ -1,4 +1,3 @@ -from .distributed import * from .cifar10 import CIFAR10Dataset from .cifar100 import CIFAR100Dataset from .fashion_mnist import FashionMNISTDataset \ No newline at end of file diff --git a/fltk/datasets/cifar10.py b/fltk/datasets/cifar10.py index 82e375e4..8b6d763a 100644 --- a/fltk/datasets/cifar10.py +++ b/fltk/datasets/cifar10.py @@ -1,17 +1,16 @@ -from .dataset import Dataset +from torch.utils.data import DataLoader, DistributedSampler from torchvision import datasets from torchvision import transforms -from torch.utils.data import DataLoader, DistributedSampler +from .dataset import Dataset -class CIFAR10Dataset(Dataset): - def __init__(self, args): - super(CIFAR10Dataset, self).__init__(args) +class CIFAR10Dataset(Dataset): - def load_train_dataset(self): - self.get_args().get_logger().debug("Loading CIFAR10 train data") + def __init__(self, config, learning_param, rank: int = 0, world_size: int = None): + super(CIFAR10Dataset, self).__init__(config, learning_param, rank, world_size) + def load_train_dataset(self, rank: int = 0, world_size: int = None): normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) transform = transforms.Compose([ transforms.RandomHorizontalFlip(), @@ -20,32 +19,22 @@ def load_train_dataset(self): normalize ]) - train_dataset = datasets.CIFAR10(root=self.get_args().get_data_path(), train=True, download=True, transform=transform) - sampler = DistributedSampler(train_dataset, rank=self.args.get_rank(), num_replicas=self.args.get_world_size()) if self.args.get_distributed() else None - train_loader = DataLoader(train_dataset, batch_size=len(train_dataset), sampler=sampler) - self.args.set_sampler(sampler) + train_dataset = datasets.CIFAR10(root=self.config.get_data_path(), train=True, download=True, transform=transform) + sampler = DistributedSampler(train_dataset, rank=rank, num_replicas=self.world_size) if self.world_size else None + train_loader = DataLoader(train_dataset, batch_size=self.learning_params.batch_size, sampler=sampler, + shuffle=(sampler is None)) - train_data = self.get_tuple_from_data_loader(train_loader) - - self.get_args().get_logger().debug("Finished loading CIFAR10 train data") - - return train_data + return train_loader def load_test_dataset(self): - self.get_args().get_logger().debug("Loading CIFAR10 test data") - normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) transform = transforms.Compose([ transforms.ToTensor(), normalize ]) - test_dataset = datasets.CIFAR10(root=self.get_args().get_data_path(), train=False, download=True, transform=transform) - sampler = DistributedSampler(test_dataset, rank=self.args.get_rank(), num_replicas=self.args.get_world_size()) if self.args.get_distributed() else None - test_loader = DataLoader(test_dataset, batch_size=len(test_dataset), sampler=sampler) - self.args.set_sampler(sampler) - - test_data = self.get_tuple_from_data_loader(test_loader) - - self.get_args().get_logger().debug("Finished loading CIFAR10 test data") - - return test_data + test_dataset = datasets.CIFAR10(root=self.config.get_data_path(), train=False, download=True, + transform=transform) + sampler = DistributedSampler(test_dataset, rank=self.rank, + num_replicas=self.world_size) if self.world_size else None + test_loader = DataLoader(test_dataset, batch_size=self.learning_params.batch_size, sampler=sampler) + return test_loader diff --git a/fltk/datasets/cifar100.py b/fltk/datasets/cifar100.py index 186a98dc..11d61e8f 100644 --- a/fltk/datasets/cifar100.py +++ b/fltk/datasets/cifar100.py @@ -3,10 +3,13 @@ from torchvision import transforms from torch.utils.data import DataLoader +from ..util.config.base_config import BareConfig + + class CIFAR100Dataset(Dataset): - def __init__(self, args): - super(CIFAR100Dataset, self).__init__(args) + def __init__(self, config: BareConfig, rank: int = 0, world_size: int = None): + super(CIFAR100Dataset, self).__init__(config, rank, world_size) def load_train_dataset(self): self.get_args().get_logger().debug("Loading CIFAR100 train data") diff --git a/fltk/datasets/data_distribution/__init__.py b/fltk/datasets/data_distribution/__init__.py deleted file mode 100644 index ab7c6369..00000000 --- a/fltk/datasets/data_distribution/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .iid_equal import distribute_batches_equally diff --git a/fltk/datasets/data_distribution/iid_equal.py b/fltk/datasets/data_distribution/iid_equal.py deleted file mode 100644 index 05ab1a4b..00000000 --- a/fltk/datasets/data_distribution/iid_equal.py +++ /dev/null @@ -1,17 +0,0 @@ -def distribute_batches_equally(train_data_loader, num_workers): - """ - Gives each worker the same number of batches of training data. - - :param train_data_loader: Training data loader - :type train_data_loader: torch.utils.data.DataLoader - :param num_workers: number of workers - :type num_workers: int - """ - distributed_dataset = [[] for _ in range(num_workers)] - - for batch_idx, (data, target) in enumerate(train_data_loader): - worker_idx = batch_idx % num_workers - - distributed_dataset[worker_idx].append((data, target)) - - return distributed_dataset diff --git a/fltk/datasets/dataset.py b/fltk/datasets/dataset.py index 62d4ca04..00f3f7e3 100644 --- a/fltk/datasets/dataset.py +++ b/fltk/datasets/dataset.py @@ -1,115 +1,100 @@ from abc import abstractmethod + +import torch from torch.utils.data import DataLoader from torch.utils.data import TensorDataset -import torch -import numpy - -from fltk.util.arguments import Arguments class Dataset: - """ - TODO: Look into RPC memory leaks occuring due to https://github.com/pytorch/pytorch/issues/61920 - """ - def __init__(self, args: Arguments): - self.args = args - self.train_dataset = self.load_train_dataset() - self.test_dataset = self.load_test_dataset() - - def get_args(self): - """ - Returns the arguments. - - :return: Arguments - """ - return self.args - - def get_train_dataset(self): - """ - Returns the train dataset. - - :return: tuple - """ - return self.train_dataset - - def get_test_dataset(self): - """ - Returns the test dataset. - - :return: tuple - """ - return self.test_dataset - - @abstractmethod - def load_train_dataset(self): - """ - Loads & returns the training dataset. - - :return: tuple - """ - raise NotImplementedError("load_train_dataset() isn't implemented") - - @abstractmethod - def load_test_dataset(self): - """ - Loads & returns the test dataset. - - :return: tuple - """ - raise NotImplementedError("load_test_dataset() isn't implemented") - - def get_train_loader(self, batch_size, **kwargs): - """ - Return the data loader for the train dataset. - - :param batch_size: batch size of data loader - :type batch_size: int - :return: torch.utils.data.DataLoader - """ - return Dataset.get_data_loader_from_data(batch_size, self.train_dataset[0], self.train_dataset[1], **kwargs) - - def get_test_loader(self, batch_size, **kwargs): - """ - Return the data loader for the test dataset. - - :param batch_size: batch size of data loader - :type batch_size: int - :return: torch.utils.data.DataLoader - """ - return Dataset.get_data_loader_from_data(batch_size, self.test_dataset[0], self.test_dataset[1], **kwargs) - - @staticmethod - def get_data_loader_from_data(batch_size, X, Y, **kwargs): - """ - Get a data loader created from a given set of data. - - :param batch_size: batch size of data loader - :type batch_size: int - :param X: data features - :type X: numpy.Array() - :param Y: data labels - :type Y: numpy.Array() - :return: torch.utils.data.DataLoader - """ - X_torch = torch.from_numpy(X).float() - - if "classification_problem" in kwargs and kwargs["classification_problem"] == False: - Y_torch = torch.from_numpy(Y).float() - else: - Y_torch = torch.from_numpy(Y).long() - dataset = TensorDataset(X_torch, Y_torch) - - kwargs.pop("classification_problem", None) - - return DataLoader(dataset, batch_size=batch_size, **kwargs) - - @staticmethod - def get_tuple_from_data_loader(data_loader): - """ - Get a tuple representation of the data stored in a data loader. - - :param data_loader: data loader to get data from - :type data_loader: torch.utils.data.DataLoader - :return: tuple - """ - return (next(iter(data_loader))[0].numpy(), next(iter(data_loader))[1].numpy()) + """ + TODO: Look into RPC memory leaks occuring due to https://github.com/pytorch/pytorch/issues/61920 + """ + + def __init__(self, config, learning_params, rank: int, world_size: int): + self.config = config + self.learning_params = learning_params + + self.rank = rank + self.world_size = world_size + + self.train_loader = self.load_train_dataset() + self.test_loader = self.load_test_dataset() + + def get_train_dataset(self): + """ + Returns the train dataset. + + :return: tuple + """ + return self.train_loader + + def get_test_dataset(self): + """ + Returns the test dataset. + + :return: tuple + """ + return self.test_loader + + @abstractmethod + def load_train_dataset(self): + """ + Loads & returns the training dataset. + + :return: tuple + """ + raise NotImplementedError("load_train_dataset() isn't implemented") + + @abstractmethod + def load_test_dataset(self): + """ + Loads & returns the test dataset. + + :return: tuple + """ + raise NotImplementedError("load_test_dataset() isn't implemented") + + def get_train_loader(self, **kwargs): + """ + Return the data loader for the train dataset. + + :param batch_size: batch size of data loader + :type batch_size: int + :return: torch.utils.data.DataLoader + """ + return self.train_loader + + def get_test_loader(self, **kwargs): + """ + Return the data loader for the test dataset. + + :param batch_size: batch size of data loader + :type batch_size: int + :return: torch.utils.data.DataLoader + """ + return self.test_loader + + @staticmethod + def get_data_loader_from_data(batch_size, X, Y, **kwargs): + """ + Get a data loader created from a given set of data. + + :param batch_size: batch size of data loader + :type batch_size: int + :param X: data features + :type X: numpy.Array() + :param Y: data labels + :type Y: numpy.Array() + :return: torch.utils.data.DataLoader + """ + X_torch = torch.from_numpy(X).float() + + if "classification_problem" in kwargs and kwargs["classification_problem"] == False: + Y_torch = torch.from_numpy(Y).float() + else: + Y_torch = torch.from_numpy(Y).long() + dataset = TensorDataset(X_torch, Y_torch) + + kwargs.pop("classification_problem", None) + + return DataLoader(dataset, batch_size=batch_size, **kwargs) diff --git a/fltk/datasets/distributed/__init__.py b/fltk/datasets/distributed/__init__.py deleted file mode 100644 index 8175430d..00000000 --- a/fltk/datasets/distributed/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .dataset import DistDataset -from .cifar10 import DistCIFAR10Dataset -from .cifar100 import DistCIFAR100Dataset -from .fashion_mnist import DistFashionMNISTDataset diff --git a/fltk/datasets/distributed/cifar10.py b/fltk/datasets/distributed/cifar10.py deleted file mode 100644 index f1942410..00000000 --- a/fltk/datasets/distributed/cifar10.py +++ /dev/null @@ -1,58 +0,0 @@ -import logging - -from torch.utils.data import DataLoader -from torchvision import datasets -from torchvision import transforms - -from fltk.datasets.distributed.dataset import DistDataset -from fltk.strategy.data_samplers import get_sampler - - -class DistCIFAR10Dataset(DistDataset): - - def __init__(self, args): - super(DistCIFAR10Dataset, self).__init__(args) - self.get_args().get_logger().debug(f"Instantiated CIFAR10 train data.") - self.init_train_dataset() - self.init_test_dataset() - - def init_train_dataset(self): - dist_loader_text = "distributed" if self.args.get_distributed() else "" - self.get_args().get_logger().debug(f"Loading '{dist_loader_text}' CIFAR10 train data") - normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) - transform = transforms.Compose([ - transforms.RandomHorizontalFlip(), - transforms.RandomCrop(32, 4), - transforms.ToTensor(), - normalize - ]) - - self.train_dataset = datasets.CIFAR10(root=self.get_args().get_data_path(), train=True, download=True, - transform=transform, - target_transform=None if not self.pill else self.pill.poison_targets()) - - self.train_sampler = get_sampler(self.train_dataset, self.args) - self.train_loader = DataLoader(self.train_dataset, batch_size=16, sampler=self.train_sampler) - logging.info("this client gets {} samples".format(len(self.train_sampler))) - - def init_test_dataset(self): - self.get_args().get_logger().debug("Loading CIFAR10 test data") - - normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) - transform = transforms.Compose([ - transforms.ToTensor(), - normalize - ]) - # TODO: decide on whether to poison test or not target_transform=None if not self.pill else self.pill.poison_targets() - self.test_dataset = datasets.CIFAR10(root=self.get_args().get_data_path(), train=False, download=True, - transform=transform) - self.test_sampler = get_sampler(self.test_dataset, self.args) - self.test_loader = DataLoader(self.test_dataset, batch_size=16, sampler=self.test_sampler) - - def __del__(self): - del self.train_dataset - del self.train_sampler - del self.train_loader - del self.test_dataset - del self.test_sampler - del self.test_loader diff --git a/fltk/datasets/distributed/cifar100.py b/fltk/datasets/distributed/cifar100.py deleted file mode 100644 index 408439d6..00000000 --- a/fltk/datasets/distributed/cifar100.py +++ /dev/null @@ -1,90 +0,0 @@ -from torchvision import datasets -from torchvision import transforms -from torch.utils.data import DataLoader, DistributedSampler - -from fltk.datasets.distributed.dataset import DistDataset -from fltk.strategy.data_samplers import get_sampler - - -class DistCIFAR100Dataset(DistDataset): - - def __init__(self, args): - super(DistCIFAR100Dataset, self).__init__(args) - self.init_train_dataset() - self.init_test_dataset() - - def init_train_dataset(self): - dist_loader_text = "distributed" if self.args.get_distributed() else "" - self.get_args().get_logger().debug(f"Loading '{dist_loader_text}' CIFAR100 train data") - normalize = transforms.Normalize(mean=[0.507, 0.487, 0.441], std=[0.267, 0.256, 0.276]) - transform = transforms.Compose([ - transforms.RandomHorizontalFlip(), - transforms.RandomCrop(32, 4), - transforms.ToTensor(), - normalize - ]) - self.train_dataset = datasets.CIFAR100(root=self.get_args().get_data_path(), train=True, download=True, - transform=transform, target_transform=self.pill.poison_targets()) - self.train_sampler = get_sampler(self.train_dataset, self.args) - self.train_loader = DataLoader(self.train_dataset, batch_size=16, sampler=self.train_sampler) - - def init_test_dataset(self): - dist_loader_text = "distributed" if self.args.get_distributed() else "" - self.get_args().get_logger().debug(f"Loading '{dist_loader_text}' CIFAR100 test data") - - normalize = transforms.Normalize(mean=[0.507, 0.487, 0.441], std=[0.267, 0.256, 0.276]) - transform = transforms.Compose([ - transforms.ToTensor(), - normalize - ]) - self.test_dataset = datasets.CIFAR100(root=self.get_args().get_data_path(), train=False, download=True, - transform=transform) - self.test_sampler = get_sampler(self.test_dataset, self.args) - self.test_loader = DataLoader(self.test_dataset, batch_size=16, sampler=self.test_sampler) - - - def load_train_dataset(self): - dist_loader_text = "distributed" if self.args.get_distributed() else "" - self.get_args().get_logger().debug(f"Loading '{dist_loader_text}' CIFAR100 train data") - - normalize = transforms.Normalize(mean=[0.507, 0.487, 0.441], std=[0.267, 0.256, 0.276]) - transform = transforms.Compose([ - transforms.RandomHorizontalFlip(), - transforms.RandomCrop(32, 4), - transforms.ToTensor(), - normalize - ]) - - train_dataset = datasets.CIFAR100(root=self.get_args().get_data_path(), train=True, download=True, - transform=transform) - sampler = get_sampler(self.test_dataset, self.args) - - train_loader = DataLoader(train_dataset, batch_size=len(train_dataset), sampler=sampler) - self.args.set_sampler(sampler) - - train_data = self.get_tuple_from_data_loader(train_loader) - dist_loader_text = "distributed" if self.args.get_distributed() else "" - self.get_args().get_logger().debug(f"Finished loading '{dist_loader_text}' CIFAR100 train data") - - return train_data - - def load_test_dataset(self): - self.get_args().get_logger().debug("Loading CIFAR100 test data") - - normalize = transforms.Normalize(mean=[0.507, 0.487, 0.441], std=[0.267, 0.256, 0.276]) - transform = transforms.Compose([ - transforms.ToTensor(), - normalize - ]) - test_dataset = datasets.CIFAR100(root=self.get_args().get_data_path(), train=False, download=True, - transform=transform) - sampler = get_sampler(self.test_dataset, self.args) - test_loader = DataLoader(test_dataset, batch_size=len(test_dataset), sampler=sampler) - self.args.set_sampler(sampler) - - test_data = self.get_tuple_from_data_loader(test_loader) - - self.get_args().get_logger().debug("Finished loading CIFAR10 test data") - - return test_data - diff --git a/fltk/datasets/distributed/dataset.py b/fltk/datasets/distributed/dataset.py deleted file mode 100644 index b60651d8..00000000 --- a/fltk/datasets/distributed/dataset.py +++ /dev/null @@ -1,43 +0,0 @@ -from abc import abstractmethod - -from fltk.util.arguments import Arguments - - -class DistDataset: - train_sampler = None - test_sampler = None - train_dataset = None - test_dataset = None - train_loader = None - test_loader = None - - def __init__(self, args: Arguments): - self.args = args - - def get_args(self): - """ - Returns the arguments during initialization of the method - - :return: Arguments - """ - return self.args - - def get_train_loader(self): - return self.train_loader - - def get_test_loader(self): - return self.test_loader - - def get_train_sampler(self): - return self.train_sampler - - def get_test_sampler(self): - return self.test_sampler - - @abstractmethod - def init_train_dataset(self): - raise NotImplementedError("load_train_dataset() isn't implemented") - - @abstractmethod - def init_test_dataset(self): - raise NotImplementedError("load_train_dataset() isn't implemented") diff --git a/fltk/datasets/distributed/fashion_mnist.py b/fltk/datasets/distributed/fashion_mnist.py deleted file mode 100644 index d431ea56..00000000 --- a/fltk/datasets/distributed/fashion_mnist.py +++ /dev/null @@ -1,62 +0,0 @@ -from torch.utils.data import DataLoader -from torchvision import datasets -from torchvision import transforms - -from fltk.datasets.distributed import DistDataset -from fltk.strategy.data_samplers import get_sampler -from fltk.util.poison.poisonpill import PoisonPill - - -class DistFashionMNISTDataset(DistDataset): - - def __init__(self, args, pill: PoisonPill = None): - super(DistFashionMNISTDataset, self).__init__(args, pill) - self.init_train_dataset() - self.init_test_dataset() - - def init_train_dataset(self): - dist_loader_text = "distributed" if self.args.get_distributed() else "" - self.get_args().get_logger().debug(f"Loading '{dist_loader_text}' Fashion MNIST train data") - - self.train_dataset = datasets.FashionMNIST(root=self.get_args().get_data_path(), train=True, download=True, - transform=transforms.Compose([transforms.ToTensor()]), - target_transform=None if not self.pill else self.pill.poison_targets()) - self.train_sampler = get_sampler(self.train_dataset, self.args) - self.train_loader = DataLoader(self.train_dataset, batch_size=16, sampler=self.train_sampler) - - def init_test_dataset(self): - dist_loader_text = "distributed" if self.args.get_distributed() else "" - self.get_args().get_logger().debug(f"Loading '{dist_loader_text}' Fashion MNIST test data") - self.test_dataset = datasets.FashionMNIST(root=self.get_args().get_data_path(), train=False, download=True, - transform=transforms.Compose([transforms.ToTensor()]), - target_transform=None if not self.pill else self.pill.poison_targets()) - self.test_sampler = get_sampler(self.test_dataset, self.args) - self.test_loader = DataLoader(self.test_dataset, batch_size=16, sampler=self.test_sampler) - - def load_train_dataset(self): - self.get_args().get_logger().debug("Loading Fashion MNIST train data") - - train_dataset = datasets.FashionMNIST(self.get_args().get_data_path(), train=True, download=True, - transform=transforms.Compose([transforms.ToTensor()]), - target_transform=None if not self.pill else self.pill.poison_targets()) - train_loader = DataLoader(train_dataset, batch_size=len(train_dataset)) - - train_data = self.get_tuple_from_data_loader(train_loader) - - self.get_args().get_logger().debug("Finished loading Fashion MNIST train data") - - return train_data - - def load_test_dataset(self): - self.get_args().get_logger().debug("Loading Fashion MNIST test data") - - # TODO: target_transform=None if not self.pill else self.pill.poison_targets() - test_dataset = datasets.FashionMNIST(self.get_args().get_data_path(), train=False, download=True, - transform=transforms.Compose([transforms.ToTensor()])) - test_loader = DataLoader(test_dataset, batch_size=len(test_dataset)) - - test_data = self.get_tuple_from_data_loader(test_loader) - - self.get_args().get_logger().debug("Finished loading Fashion MNIST test data") - - return test_data diff --git a/fltk/datasets/fashion_mnist.py b/fltk/datasets/fashion_mnist.py index 0f851cfa..2538c3a7 100644 --- a/fltk/datasets/fashion_mnist.py +++ b/fltk/datasets/fashion_mnist.py @@ -3,10 +3,13 @@ from torchvision import transforms from torch.utils.data import DataLoader +from ..util.config.base_config import BareConfig + + class FashionMNISTDataset(Dataset): - def __init__(self, args): - super(FashionMNISTDataset, self).__init__(args) + def __init__(self, config: BareConfig, rank: int = 0, world_size: int = None): + super(FashionMNISTDataset, self).__init__(config, rank, world_size) def load_train_dataset(self): self.get_args().get_logger().debug("Loading Fashion MNIST train data") diff --git a/fltk/launch.py b/fltk/launch.py index 12da8c3c..870fe78e 100644 --- a/fltk/launch.py +++ b/fltk/launch.py @@ -6,67 +6,59 @@ import torch.distributed as dist from fltk.client import Client -from fltk.orchestrator import run_orchestrator, Orchestrator +from fltk.orchestrator import Orchestrator from fltk.util.cluster.client import ClusterManager +from fltk.util.config.arguments import LearningParameters from fltk.util.config.base_config import BareConfig from fltk.util.task.generator.arrival_generator import ExperimentGenerator logging.basicConfig(level=logging.INFO) -def is_distributed(): - return dist.is_available() and dist.is_initialized() - +def is_distributed() -> bool: + """ + Function to check whether distributed execution is needed. -def launch_client(task_id=None, rank=None, options=None, args: Namespace = None, config: BareConfig = None): + Note: the WORLD_SIZE environmental variable needs to be set for this to work (larger than 1). + PytorchJobs launched from KubeFlow automatically set this property. + @return: Indicator for distributed execution. + @rtype: bool """ + world_size = int(os.environ.get('WORLD_SIZE', 1)) + leader_port = int(os.environ.get('MASTER_PORT', 5000)) + leader_address = os.environ.get('MASTER_ADDR', 'localhost') + logging.info(f"Training with WS: {world_size} connecting to: {leader_address}:{leader_port}") + return dist.is_available() and world_size > 1 - @param host: - @type host: - @param rank: - @type rank: - @param options: - @type options: - @param args: - @type args: + +def launch_client(task_id, config: BareConfig = None, learning_params: LearningParameters = None): + """ + @param task_id: + @type task_id: @param config: Configuration for components, needed for spinning up components of the Orchestrator. @type config: BareConfig + @param learning_params: + @type: LearningParameters @return: @rtype: """ - - # prepare_environment(host, nic) - logging.info(f'Starting with host={os.environ["MASTER_ADDR"]} and port={os.environ["MASTER_PORT"]}') + rank, world_size, backend = 0, None, None distributed = is_distributed() if distributed: - rank, world_size = dist.get_rank(), dist.get_world_size() - logging.info(f'Starting with rank={rank} and world size={world_size}') - - client = Client(rank, task_id, config) + rank = dist.get_rank() + world_size = dist.get_world_size() backend = dist.get_backend() - client.prepare_learner(distributed, backend) - client.run_epochs() - else: - """ - Currently on only DistributedDataParallel is supported. If you want, you can implement a different - approach, although it is advised to tinker with the DistributedDataParallel execution, as this - greatly simplifies the forward and backward computations using AllReduce under the hood. - - For more information, refer to the Kubeflow PyTorch-Operator and PyTorch Distributed documentation. - """ - print("Non DistributedDataParallel execution is not (yet) supported!") + logging.info(f'Starting Creating client with {rank}') + client = Client(rank, task_id, world_size, config, learning_params) + client.prepare_learner(distributed, backend) + epoch_data = client.run_epochs() + print(epoch_data) -def launch_orchestrator(host=None, rank=None, options=None, args: Namespace = None, config: BareConfig = None): +def launch_orchestrator(args: Namespace = None, config: BareConfig = None): """ Default runner for the Orchestrator that is based on KubeFlow - @param host: - @type host: - @param rank: - @type rank: - @param options: - @type options: @param args: @type args: @param config: Configuration for components, needed for spinning up components of the Orchestrator. diff --git a/fltk/nets/cifar_100_resnet.py b/fltk/nets/cifar_100_resnet.py index ae5d9cd0..95063de5 100644 --- a/fltk/nets/cifar_100_resnet.py +++ b/fltk/nets/cifar_100_resnet.py @@ -1,5 +1,4 @@ -from types import Union - +from typing import Union import torch.nn as nn @@ -72,8 +71,10 @@ def forward(self, x): class Cifar100ResNet(nn.Module): - def __init__(self, block=Union[BasicBlock, Bottleneck], num_block=[2, 2, 2, 2], num_classes=100): + def __init__(self, block: Union[BasicBlock, Bottleneck], num_block=None, num_classes=100): super(Cifar100ResNet, self).__init__() + if num_block is None: + num_block = [2, 2, 2, 2] self.in_channels = 64 diff --git a/fltk/nets/cifar_10_cnn.py b/fltk/nets/cifar_10_cnn.py index d0d382b6..d9825e25 100644 --- a/fltk/nets/cifar_10_cnn.py +++ b/fltk/nets/cifar_10_cnn.py @@ -1,7 +1,7 @@ +import torch import torch.nn as nn import torch.nn.functional as F - class Cifar10CNN(nn.Module): def __init__(self): @@ -26,6 +26,8 @@ def __init__(self): self.pool3 = nn.MaxPool2d(kernel_size=2) self.fc1 = nn.Linear(128 * 4 * 4, 128) + + self.softmax = nn.Softmax() self.fc2 = nn.Linear(128, 10) def forward(self, x): @@ -44,6 +46,6 @@ def forward(self, x): x = x.view(-1, 128 * 4 * 4) x = self.fc1(x) - x = F.softmax(self.fc2(x)) + x = self.softmax(self.fc2(x)) - return x + return x \ No newline at end of file diff --git a/fltk/nets/cifar_10_resnet.py b/fltk/nets/cifar_10_resnet.py index 53344e8e..6482447a 100644 --- a/fltk/nets/cifar_10_resnet.py +++ b/fltk/nets/cifar_10_resnet.py @@ -1,4 +1,4 @@ -from types import Union +from typing import Union import torch.nn as nn import torch.nn.functional as F diff --git a/fltk/nets/util/utils.py b/fltk/nets/util/utils.py index 7fbd100f..b36ae51f 100644 --- a/fltk/nets/util/utils.py +++ b/fltk/nets/util/utils.py @@ -9,7 +9,6 @@ from fltk.util.config.base_config import BareConfig from fltk.util.results import EpochData - def flatten_params(model_description: Union[torch.nn.Module, OrderedDict]): """ flattens all parameters into a single column vector. Returns the dictionary to recover them @@ -86,13 +85,11 @@ def load_model_from_file(model: torch.nn.Module, model_file_path: Path) -> None: raise FileExistsError(f"Cannot load model file {model_file_path} into {model}...") -def save_model(model: torch.nn.Module, directory: str, epoch, config: BareConfig): +def save_model(model: torch.nn.Module, directory: str, epoch: int): """ Saves the model if necessary. """ - config.get_logger().debug(f"Saving model to flat file storage. Save #{model.__class__}") - - full_save_path = f"./{directory}/{config.net}_{epoch}.pth" + full_save_path = f"./{directory}/{model.__class__.__name__}_{epoch}.pth" torch.save(model.state_dict(), full_save_path) diff --git a/fltk/schedulers/min_lr_step.py b/fltk/schedulers/min_lr_step.py index 6a44fdcb..cecef6dd 100644 --- a/fltk/schedulers/min_lr_step.py +++ b/fltk/schedulers/min_lr_step.py @@ -1,6 +1,9 @@ +import logging + + class MinCapableStepLR: - def __init__(self, logger, optimizer, step_size, gamma, min_lr): + def __init__(self, optimizer, step_size, gamma, min_lr): """ :param logger: logger :type logger: loguru.logger @@ -13,7 +16,7 @@ def __init__(self, logger, optimizer, step_size, gamma, min_lr): :param min_lr: minimum learning rate :type min_lr: float """ - self.logger = logger + self.logger = logging.getLogger('MinCapableStepLR') self.optimizer = optimizer self.step_size = step_size diff --git a/fltk/util/arguments.py b/fltk/util/arguments.py index 7945113d..e86f38ec 100644 --- a/fltk/util/arguments.py +++ b/fltk/util/arguments.py @@ -1,21 +1,11 @@ -import torch.nn.functional as F - import torch -import json - -# Setting the seed for Torch -import yaml - -from fltk.nets import Cifar10CNN, FashionMNISTCNN, Cifar100ResNet, FashionMNISTResNet, Cifar10ResNet, Cifar100VGG SEED = 1 torch.manual_seed(SEED) class Arguments: - def __init__(self, logger): - self.logger = logger - + def __init__(self): self.batch_size = 10 self.test_batch_size = 1000 self.epochs = 1 @@ -24,13 +14,8 @@ def __init__(self, logger): self.cuda = False self.shuffle = False self.log_interval = 10 - self.kwargs = {} - self.contribution_measurement_round = 1 - self.contribution_measurement_metric = 'Influence' - self.scheduler_step_size = 50 - self.scheduler_gamma = 0.5 - self.min_lr = 1e-10 + self.round_worker_selection_strategy = None self.round_worker_selection_strategy_kwargs = None @@ -41,30 +26,14 @@ def __init__(self, logger): self.save_model_path = "models" self.epoch_save_start_suffix = "start" self.epoch_save_end_suffix = "end" - self.get_poison_effort = 'half' - self.num_workers = 50 - # self.num_poisoned_workers = 10 - self.rank = 0 - self.world_size = 0 + self.data_sampler = None self.distributed = False - self.available_nets = { - "Cifar100ResNet" : Cifar100ResNet, - "Cifar100VGG" : Cifar100VGG, - "Cifar10CNN" : Cifar10CNN, - "Cifar10ResNet" : Cifar10ResNet, - "FashionMNISTCNN" : FashionMNISTCNN, - "FashionMNISTResNet" : FashionMNISTResNet - } self.net = None self.set_net_by_name('Cifar10CNN') - # self.net = FashionMNISTCNN - # self.net = Cifar100ResNet - # self.net = FashionMNISTResNet - # self.net = Cifar10ResNet - # self.net = Cifar10ResNet + self.dataset_name = 'cifar10' self.train_data_loader_pickle_path = { 'cifar10': 'data_loaders/cifar10/train_data_loader.pickle', @@ -78,14 +47,6 @@ def __init__(self, logger): 'cifar100': 'data_loaders/cifar100/test_data_loader.pickle', } - # self.train_data_loader_pickle_path = "data_loaders/cifar10/train_data_loader.pickle" - # self.test_data_loader_pickle_path = "data_loaders/cifar10/test_data_loader.pickle" - - # self.train_data_loader_pickle_path = "data_loaders/fashion-mnist/train_data_loader.pickle" - # self.test_data_loader_pickle_path = "data_loaders/fashion-mnist/test_data_loader.pickle" - - # self.train_data_loader_pickle_path = "data_loaders/cifar100/train_data_loader.pickle" - # self.test_data_loader_pickle_path = "data_loaders/cifar100/test_data_loader.pickle" self.loss_function = torch.nn.CrossEntropyLoss @@ -93,32 +54,6 @@ def __init__(self, logger): self.data_path = "data" - def get_distributed(self): - return self.distributed - - def get_rank(self): - return self.rank - - def get_world_size(self): - return self.world_size - - def set_sampler(self, sampler): - self.data_sampler = sampler - - def get_sampler(self): - return self.data_sampler - - def get_round_worker_selection_strategy(self): - return self.round_worker_selection_strategy - - def get_round_worker_selection_strategy_kwargs(self): - return self.round_worker_selection_strategy_kwargs - - def set_round_worker_selection_strategy_kwargs(self, kwargs): - self.round_worker_selection_strategy_kwargs = kwargs - - def set_client_selection_strategy(self, strategy): - self.round_worker_selection_strategy = strategy def get_data_path(self): return self.data_path @@ -187,26 +122,6 @@ def set_num_workers(self, num_workers): def set_model_save_path(self, save_model_path): self.save_model_path = save_model_path - def get_logger(self): - return self.logger - - def get_loss_function(self): - return self.loss_function - - def get_net(self): - return self.net - - def get_num_workers(self): - return self.num_workers - - def get_num_poisoned_workers(self): - return self.num_poisoned_workers - - def get_poison_effort(self): - return self.get_poison_effort - - def get_learning_rate(self): - return self.lr def get_momentum(self): return self.momentum @@ -223,71 +138,4 @@ def get_test_batch_size(self): def get_log_interval(self): return self.log_interval - def get_save_model_folder_path(self): - return self.save_model_path - - def get_learning_rate_from_epoch(self, epoch_idx): - lr = self.lr * (self.scheduler_gamma ** int(epoch_idx / self.scheduler_step_size)) - - if lr < self.min_lr: - self.logger.warning("Updating LR would place it below min LR. Skipping LR update.") - - return self.min_lr - - self.logger.debug("LR: {}".format(lr)) - - return lr - - def get_contribution_measurement_round(self): - return self.contribution_measurement_round - - def get_contribution_measurement_metric(self): - return self.contribution_measurement_metric - - def should_save_model(self, epoch_idx): - """ - Returns true/false models should be saved. - - :param epoch_idx: current training epoch index - :type epoch_idx: int - """ - if not self.save_model: - return False - - if epoch_idx == 1 or epoch_idx % self.save_epoch_interval == 0: - return True - - def log(self): - """ - Log this arguments object to the logger. - """ - self.logger.debug("Arguments: {}", str(self)) - - def __str__(self): - return "\nBatch Size: {}\n".format(self.batch_size) + \ - "Test Batch Size: {}\n".format(self.test_batch_size) + \ - "Epochs: {}\n".format(self.epochs) + \ - "Learning Rate: {}\n".format(self.lr) + \ - "Momentum: {}\n".format(self.momentum) + \ - "CUDA Enabled: {}\n".format(self.cuda) + \ - "Shuffle Enabled: {}\n".format(self.shuffle) + \ - "Log Interval: {}\n".format(self.log_interval) + \ - "Scheduler Step Size: {}\n".format(self.scheduler_step_size) + \ - "Scheduler Gamma: {}\n".format(self.scheduler_gamma) + \ - "Scheduler Minimum Learning Rate: {}\n".format(self.min_lr) + \ - "Client Selection Strategy: {}\n".format(self.round_worker_selection_strategy) + \ - "Client Selection Strategy Arguments: {}\n".format(json.dumps(self.round_worker_selection_strategy_kwargs, indent=4, sort_keys=True)) + \ - "Model Saving Enabled: {}\n".format(self.save_model) + \ - "Model Saving Interval: {}\n".format(self.save_epoch_interval) + \ - "Model Saving Path (Relative): {}\n".format(self.save_model_path) + \ - "Epoch Save Start Prefix: {}\n".format(self.epoch_save_start_suffix) + \ - "Epoch Save End Suffix: {}\n".format(self.epoch_save_end_suffix) + \ - "Number of Clients: {}\n".format(self.num_workers) + \ - "Number of Poisoned Clients: {}\n".format(self.num_poisoned_workers) + \ - "NN: {}\n".format(self.net) + \ - "Train Data Loader Path: {}\n".format(self.train_data_loader_pickle_path) + \ - "Test Data Loader Path: {}\n".format(self.test_data_loader_pickle_path) + \ - "Loss Function: {}\n".format(self.loss_function) + \ - "Default Model Folder Path: {}\n".format(self.default_model_folder_path) + \ - "Data Path: {}\n".format(self.data_path) + \ - "Dataset Name: {}\n".format(self.dataset_name) \ No newline at end of file + diff --git a/fltk/util/config/__init__.py b/fltk/util/config/__init__.py index e69de29b..5073a6c5 100644 --- a/fltk/util/config/__init__.py +++ b/fltk/util/config/__init__.py @@ -0,0 +1,2 @@ +from .arguments import * +from .base_config import * diff --git a/fltk/util/config/arguments.py b/fltk/util/config/arguments.py new file mode 100644 index 00000000..40f20a9a --- /dev/null +++ b/fltk/util/config/arguments.py @@ -0,0 +1,111 @@ +from argparse import Namespace +from dataclasses import dataclass +from typing import List, Tuple, Type + +import torch.nn + +from fltk.datasets import CIFAR10Dataset, FashionMNISTDataset +from fltk.datasets.dataset import Dataset +from fltk.nets import Cifar100ResNet, Cifar100VGG, Cifar10CNN, Cifar10ResNet, FashionMNISTCNN, FashionMNISTResNet + +CLIENT_ARGS: List[Tuple[str, str, str, type]] = \ + [("model", "md", "Which model to train", str), + ("dataset", "ds", "Which dataset to train the model on", str), + ("batch_size", "bs", + "Number that are 'batched' together in a single forward/backward pass during the optimization steps.", int), + ("max_epoch", "ep", + "Maximum number of times that the 'training' set instances can be used during the optimization steps", int), + ("learning_rate", "lr", "Factor to limit the step size that is taken during each gradient descent step.", float), + ("decay", 'dc', + "Rate at which the learning rate decreases (i.e. the optimization takes smaller steps", float), + ("loss", 'ls', "Loss function to use for optimization steps", str), + ("optimizer", 'op', "Which optimizer to use during the trainign process", str) + ] + + +@dataclass(frozen=True) +class LearningParameters: + model: str + dataset: str + batch_size: int + max_epoch: int + learning_rate: float + learning_decay: float + loss: str + optimizer: str + + _available_nets = { + "Cifar100ResNet": Cifar100ResNet, + "Cifar100VGG": Cifar100VGG, + "Cifar10CNN": Cifar10CNN, + "Cifar10ResNet": Cifar10ResNet, + "FashionMNISTCNN": FashionMNISTCNN, + "FashionMNISTResNet": FashionMNISTResNet + } + + _available_data = { + "Cifar10": CIFAR10Dataset, + "Cifar100": CIFAR10Dataset, + "FashionMnist": FashionMNISTDataset + } + + _available_loss = { + "CrossEntropy": torch.nn.CrossEntropyLoss + } + + _available_optimizer = { + "Adam": torch.optim.SGD + } + + + def get_model_class(self) -> Type[torch.nn.Module]: + return self._available_nets.get(self.model) + + def get_dataset_class(self) -> Type[Dataset]: + return self._available_data.get(self.dataset) + + def get_loss(self): + return self._available_loss.get(self.loss) + + def get_optimizer(self) -> Type[torch.optim.Optimizer]: + return self._available_optimizer.get(self.optimizer) + + +def extract_learning_parameters(args: Namespace) -> LearningParameters: + """ + Function to extract the learning hyper-parameters from the Namespace object for the passed arguments. + @param args: Namespace environment for running the Client. + @type args: Namespace + @return: Parsed learning parameters. + @rtype: LearningParameters + """ + model = args.model + dataset = args.dataset + batch_size = args.batch_size + epoch = args.max_epoch + lr = args.learning_rate + decay = args.decay + loss = args.loss + optimizer = args.optimizer + return LearningParameters(model, dataset, batch_size, epoch, lr, decay, loss, optimizer) + + +def create_client_parser(subparsers) -> None: + client_parser = subparsers.add_parser('client') + client_parser.add_argument('config', type=str) + client_parser.add_argument('task_id', type=str) + # Option to override rank, by default provided by PytorchJob in Kubeflow. + client_parser.add_argument('--rank', type=int, default=None) + # Option to override default nic, by default is 'eth0' in containers. + client_parser.add_argument('--nic', type=str, default=None) + # Option to override 'master' host name, by default provided by PytorchJob in Kubeflow. + client_parser.add_argument('--host', type=str, default=None) + + # Add hyper-parameters + for long, short, hlp, tpe in CLIENT_ARGS: + client_parser.add_argument(f'-{short}', f'--{long}', type=tpe, help=hlp, required=True) + + +def create_cluster_parser(subparsers) -> None: + cluster_parser = subparsers.add_parser('cluster') + cluster_parser.add_argument('config', type=str) diff --git a/fltk/util/config/base_config.py b/fltk/util/config/base_config.py index d98d089e..e64cc38c 100644 --- a/fltk/util/config/base_config.py +++ b/fltk/util/config/base_config.py @@ -14,6 +14,10 @@ class GeneralNetConfig: epoch_save_start_suffix: str = 'cloud_experiment' epoch_save_end_suffix: str = 'cloud_experiment' + scheduler_step_size = 50 + scheduler_gamma = 0.5 + min_lr = 1e-10 + @dataclass_json @dataclass(frozen=True) @@ -54,6 +58,10 @@ class ExecutionConfig: experiment_prefix: str = "experiment" cuda: bool = False + default_model_folder_path = "default_models" + epoch_save_end_suffix = "epoch_end" + save_model_path = "models" + data_path = "data" @dataclass_json @@ -85,44 +93,28 @@ class BareConfig(object): execution_config: ExecutionConfig cluster_config: ClusterConfig = field(metadata=config(field_name="cluster")) - # # TODO: Move to external class/object - # self.train_data_loader_pickle_path = { - # 'cifar10': 'data_loaders/cifar10/train_data_loader.pickle', - # 'fashion-mnist': 'data_loaders/fashion-mnist/train_data_loader.pickle', - # 'cifar100': 'data_loaders/cifar100/train_data_loader.pickle', - # } - # - # self.test_data_loader_pickle_path = { - # 'cifar10': 'data_loaders/cifar10/test_data_loader.pickle', - # 'fashion-mnist': 'data_loaders/fashion-mnist/test_data_loader.pickle', - # 'cifar100': 'data_loaders/cifar100/test_data_loader.pickle', - # } - # - # # TODO: Make part of different configuration - # self.loss_function = torch.nn.CrossEntropyLoss - # - # self.default_model_folder_path = "default_models" - # self.data_path = "data" - - def get_dataloader_list(self): - """ - @deprecated - @return: - @rtype: - """ - return list(self.train_data_loader_pickle_path.keys()) + def get_scheduler_step_size(self) -> int: + return self.execution_config.general_net.scheduler_step_size + + def get_scheduler_gamma(self): + return self.execution_config.general_net.scheduler_gamma - def set_train_data_loader_pickle_path(self, path, name='cifar10'): - self.train_data_loader_pickle_path[name] = path + def get_min_lr(self): + return self.execution_config.general_net.min_lr - def get_train_data_loader_pickle_path(self): - return self.train_data_loader_pickle_path[self.dataset_name] + def get_data_path(self): + return self.execution_config.data_path - def set_test_data_loader_pickle_path(self, path, name='cifar10'): - self.test_data_loader_pickle_path[name] = path + def get_default_model_folder_path(self): + return self.execution_config.default_model_folder_path - def get_test_data_loader_pickle_path(self): - return self.test_data_loader_pickle_path[self.dataset_name] + def cuda_enabled(self) -> bool: + """ + Function to check CUDA availability independent of BareConfig structure. + @return: + @rtype: + """ + return self.execution_config.cuda def should_save_model(self, epoch_idx): """ @@ -133,3 +125,9 @@ def should_save_model(self, epoch_idx): """ return self.execution_config.general_net.save_model and ( epoch_idx == 1 or epoch_idx % self.execution_config.general_net.save_epoch_interval == 0) + + def get_epoch_save_end_suffix(self) -> bool: + return self.execution_config.epoch_save_suffix + + def get_save_model_folder_path(self): + return self.execution_config.save_model_path