From 4e451db71cde6e170f7cd430b5b87839b4044889 Mon Sep 17 00:00:00 2001 From: JMGaljaard Date: Mon, 12 Sep 2022 17:18:26 +0200 Subject: [PATCH] Clean up documentation and remove Singleton meta-class --- fltk/datasets/__init__.py | 22 +-- fltk/util/cluster/client.py | 4 +- fltk/util/config/__init__.py | 30 +++- fltk/util/config/arguments.py | 140 ++++++++------- fltk/util/config/definitions/aggregate.py | 1 + fltk/util/config/definitions/data_sampler.py | 1 + fltk/util/config/definitions/dataset.py | 32 +++- .../config/definitions/experiment_type.py | 1 + fltk/util/config/definitions/logging.py | 1 + fltk/util/config/definitions/loss.py | 11 +- fltk/util/config/definitions/net.py | 14 +- fltk/util/config/definitions/optim.py | 1 + fltk/util/config/definitions/orchestrator.py | 34 ++-- fltk/util/config/distributed_config.py | 166 +++++++++++------- fltk/util/config/experiment_config.py | 154 ++++++++-------- fltk/util/config/learner_config.py | 78 ++++---- fltk/util/singleton.py | 15 -- fltk/util/task/generator/arrival_generator.py | 4 +- 18 files changed, 408 insertions(+), 301 deletions(-) delete mode 100644 fltk/util/singleton.py diff --git a/fltk/datasets/__init__.py b/fltk/datasets/__init__.py index aec90089..b4ecc748 100644 --- a/fltk/datasets/__init__.py +++ b/fltk/datasets/__init__.py @@ -2,24 +2,4 @@ from fltk.datasets.cifar100 import CIFAR100Dataset from fltk.datasets.fashion_mnist import FashionMNISTDataset from fltk.datasets.mnist import MNIST -from fltk.util.config.definitions import Dataset - - -def get_train_loader_path(name: Dataset) -> str: - paths = { - Dataset.cifar10: 'data_loaders/cifar10/train_data_loader.pickle', - Dataset.fashion_mnist: 'data_loaders/fashion-mnist/train_data_loader.pickle', - Dataset.cifar100: 'data_loaders/cifar100/train_data_loader.pickle', - Dataset.mnist: 'data_loaders/mnist/train_data_loader.pickle', - } - return paths[name] - - -def get_test_loader_path(name: Dataset) -> str: - paths = { - Dataset.cifar10: 'data_loaders/cifar10/test_data_loader.pickle', - Dataset.fashion_mnist: 'data_loaders/fashion-mnist/test_data_loader.pickle', - Dataset.cifar100: 'data_loaders/cifar100/test_data_loader.pickle', - Dataset.mnist: 'data_loaders/mnist/test_data_loader.pickle', - } - return paths[name] +from fltk.datasets.dataset import Dataset diff --git a/fltk/util/cluster/client.py b/fltk/util/cluster/client.py index 12ebcdb5..62af8306 100644 --- a/fltk/util/cluster/client.py +++ b/fltk/util/cluster/client.py @@ -1,5 +1,6 @@ from __future__ import annotations +import abc import logging import time from collections import defaultdict @@ -16,7 +17,6 @@ V1VolumeMount, V1Toleration, V1Volume, V1PersistentVolumeClaimVolumeSource, V1ConfigMapVolumeSource from fltk.util.cluster.conversion import Convert -from fltk.util.singleton import Singleton from fltk.util.task.arrival_task import DistributedArrivalTask, ArrivalTask, FederatedArrivalTask if TYPE_CHECKING: @@ -164,7 +164,7 @@ def __monitor_pods(self) -> None: self._logger.debug(self._resource_lookup) -class ClusterManager(metaclass=Singleton): +class ClusterManager(abc.ABC): """ Object with basic monitoring functionality. This shows how the information of different Pods in a cluster can be requested and parsed. Currently, it mainly exists to start the ResourceWatchDog, which now only keeps track of the diff --git a/fltk/util/config/__init__.py b/fltk/util/config/__init__.py index 5b8990f2..1e234f5f 100644 --- a/fltk/util/config/__init__.py +++ b/fltk/util/config/__init__.py @@ -11,6 +11,17 @@ def retrieve_config_network_params(conf: FedLearnerConfig, nic=None, host=None): + """ + + Args: + conf: FedLearnerConfig: + nic: (Default value = None) + host: (Default value = None) + + Returns: + str: NIC to use. + str: host to use. + """ if hasattr(conf, 'system'): system_attr = getattr(conf, 'system') if 'federator' in system_attr: @@ -22,6 +33,15 @@ def retrieve_config_network_params(conf: FedLearnerConfig, nic=None, host=None): def get_distributed_config(args, alt_path: str = None) -> Optional[DistributedConfig]: + """ + + Args: + args: + alt_path: str: (Default value = None) + + Returns: + Optional[DistributedConfig]: When provided, DistributedConfig from Path specified during startup. + """ if args: config_path = args.config else: @@ -39,8 +59,14 @@ def get_distributed_config(args, alt_path: str = None) -> Optional[DistributedCo def get_learning_param_config(args, alt_path: str = None) -> Optional[DistLearnerConfig]: - """ - Retrieve learning parameter configuration from Disk for distributed learning experiments. + """Retrieve learning parameter configuration from Disk for distributed learning experiments. + + Args: + args: + alt_path: str: (Default value = None) + + Returns: + """ if args: config_path = args.experiment_config diff --git a/fltk/util/config/arguments.py b/fltk/util/config/arguments.py index e60aee89..496d6fed 100644 --- a/fltk/util/config/arguments.py +++ b/fltk/util/config/arguments.py @@ -3,25 +3,30 @@ import torch.distributed as dist -def _create_extractor_parser(subparsers): +def _create_extractor_parser(subparsers) -> None: """ Helper function to add extractor arguments. - @param subparsers: Subparser to add arguments to. - @type subparsers: Any - @return: None - @rtype: None + + Args: + subparsers (Any): Subparser to add arguments to. + + Returns: + None + """ extractor_parser = subparsers.add_parser('extractor') extractor_parser.add_argument('config', type=str) def _create_client_parser(subparsers) -> None: - """ - Helper function to add client arguments. - @param subparsers: Subparser to add arguments to. - @type subparsers: Any - @return: None - @rtype: None + """Helper function to add client arguments. + + Args: + subparsers(Any): Subparser to add arguments to. + + Returns: + None + """ client_parser = subparsers.add_parser('client') client_parser.add_argument('experiment_config', type=str, help="Experiment specific config (yaml).") @@ -34,12 +39,14 @@ def _create_client_parser(subparsers) -> None: def _create_cluster_parser(subparsers) -> None: - """ - Helper function to add cluster execution arguments. - @param subparsers: Subparser to add arguments to. - @type subparsers: Any - @return: None - @rtype: None + """Helper function to add cluster execution arguments. + + Args: + subparsers(Any): Subparser to add arguments to. + + Returns: + None + """ cluster_parser = subparsers.add_parser('cluster') cluster_parser.add_argument('config', type=str) @@ -48,12 +55,14 @@ def _create_cluster_parser(subparsers) -> None: def _create_container_util_parser(subparsers) -> None: - """ - Helper function to add container util execution arguments. - @param subparsers: Subparser to add arguments to. - @type subparsers: Any - @return: None - @rtype: None + """Helper function to add container util execution arguments. + + Args: + subparsers(Any): Subparser to add arguments to. + + Returns: + None + """ util_docker_parser = subparsers.add_parser('util-docker') util_docker_parser.add_argument('name', type=str) @@ -61,37 +70,43 @@ def _create_container_util_parser(subparsers) -> None: def _create_util_parser(subparsers): - """ - Helper function to add util generation execution arguments. - @param subparsers: Subparser to add arguments to. - @type subparsers: Any - @return: None - @rtype: None + """Helper function to add util generation execution arguments. + + Args: + subparsers(Any): Subparser to add arguments to. + + Returns: + None: None + """ util_generate_parser = subparsers.add_parser('util-generate') util_generate_parser.add_argument('path', type=str) def _create_util_run_parser(subparsers) -> None: - """ - Helper function to add util run execution arguments. - @param subparsers: Subparser to add arguments to. - @type subparsers: Any - @return: None - @rtype: None + """Helper function to add util run execution arguments. + + Args: + subparsers(Any): Subparser to add arguments to. + + Returns: + None: None + """ util_run_parser = subparsers.add_parser('util-run') util_run_parser.add_argument('path', type=str) def _create_remote_parser(subparsers) -> None: - """ - Helper function to add remote Federated Learning execution arguments. Supports both Docker and K8s execution + """Helper function to add remote Federated Learning execution arguments. Supports both Docker and K8s execution using optional (positional) arguments. - @param subparsers: Subparser to add arguments to. - @type subparsers: Any - @return: None - @rtype: None + + Args: + subparsers(Any): Subparser to add arguments to. + + Returns: + None: None + """ remote_parser = subparsers.add_parser('remote') add_default_arguments(remote_parser) @@ -102,24 +117,29 @@ def _create_remote_parser(subparsers) -> None: def _create_single_parser(subparsers) -> None: - """ - Helper function to add Local single machine execution arguments. - @param subparsers: Subparser to add arguments to. - @type subparsers: Any - @return: None - @rtype: None + """Helper function to add Local single machine execution arguments. + + Args: + subparsers(Any): Subparser to add arguments to. + + Returns: + None: None + """ single_machine_parser = subparsers.add_parser('single') add_default_arguments(single_machine_parser) def add_default_arguments(*parsers): - """ - Helper function to add default arguments shared between executions. - @param parsers: Subparser to add arguments to. - @type subparsers: Any - @return: None - @rtype: None + """Helper function to add default arguments shared between executions. + + Args: + parsers: Subparser to add arguments to. + *parsers: + + Returns: + None: None + """ for parser in parsers: parser.add_argument('config', type=str, help='') @@ -127,12 +147,14 @@ def add_default_arguments(*parsers): def create_all_subparsers(subparsers): - """ - Helper function to add all subparsers to an argparse object. - @param subparsers: Subparser to add arguments to. - @type subparsers: Any - @return: None - @rtype: ArgumentParser + """Helper function to add all subparsers to an argparse object. + + Args: + subparsers(Any): Subparser to add arguments to. + + Returns: + ArgumentParser: None + """ _create_extractor_parser(subparsers) _create_client_parser(subparsers) diff --git a/fltk/util/config/definitions/aggregate.py b/fltk/util/config/definitions/aggregate.py index 3c48a3ea..a13bd3de 100644 --- a/fltk/util/config/definitions/aggregate.py +++ b/fltk/util/config/definitions/aggregate.py @@ -3,6 +3,7 @@ @unique class Aggregations(Enum): + """Enum for Provided aggregation Types.""" avg = 'Avg' fedavg = 'FedAvg' sum = 'Sum' diff --git a/fltk/util/config/definitions/data_sampler.py b/fltk/util/config/definitions/data_sampler.py index 46d8acbf..01072285 100644 --- a/fltk/util/config/definitions/data_sampler.py +++ b/fltk/util/config/definitions/data_sampler.py @@ -3,6 +3,7 @@ @unique class DataSampler(Enum): + """Enum for provided datasampler (Federated) Types.""" uniform = "uniform" q_sampler = "q sampler" limit_labels = "limit labels" diff --git a/fltk/util/config/definitions/dataset.py b/fltk/util/config/definitions/dataset.py index 99eabb99..cf9ada10 100644 --- a/fltk/util/config/definitions/dataset.py +++ b/fltk/util/config/definitions/dataset.py @@ -1,30 +1,44 @@ +from typing import Type + from aenum import unique, Enum -from re import T +from fltk import datasets from fltk.datasets import CIFAR10Dataset, CIFAR100Dataset, FashionMNISTDataset, MNIST @unique class Dataset(Enum): + """Enum for provided dataset Types.""" cifar10 = 'cifar10' cifar100 = 'cifar100' fashion_mnist = 'fashion-mnist' mnist = 'mnist' @classmethod - def _missing_name_(cls, name: str) -> T: + def _missing_name_(cls, name: str) -> "Dataset": + """Helper function in case name could not be looked up (to support older configurations). + + Args: + name (str): Name of Type to be looked up. + + Returns: + Dataset: Corresponding Enum instance, if name is recognized from lower case. + + """ for member in cls: if member.name.lower() == name.lower(): return member -def get_dist_dataset(name: Dataset): - """ - Function to retrieve distributed dataset (Distributed Learning Experiment). - @param name: Definition name of the datset. - @type name: Dataset - @return: - @rtype: +def get_dist_dataset(name: Dataset) -> Type[datasets.Dataset]: + """Function to retrieve (distributed) dataset, for Distributed Learning Experiments. + + Args: + name (Dataset): Definition (Enum) of the dataset configurated. + + Returns: + Type[datasets.Dataset]: Class reference to requested dataset. + """ __lookup = { Dataset.cifar10: CIFAR10Dataset, diff --git a/fltk/util/config/definitions/experiment_type.py b/fltk/util/config/definitions/experiment_type.py index 64e22400..dabdd0b6 100644 --- a/fltk/util/config/definitions/experiment_type.py +++ b/fltk/util/config/definitions/experiment_type.py @@ -2,5 +2,6 @@ class ExperimentType(Enum): + """ """ FEDERATED = 'federated' DISTRIBUTED = 'distributed' diff --git a/fltk/util/config/definitions/logging.py b/fltk/util/config/definitions/logging.py index ebde5c1a..dcda88ba 100644 --- a/fltk/util/config/definitions/logging.py +++ b/fltk/util/config/definitions/logging.py @@ -2,6 +2,7 @@ class LogLevel(Enum): + """ """ CRITICAL = 50 FATAL = CRITICAL ERROR = 40 diff --git a/fltk/util/config/definitions/loss.py b/fltk/util/config/definitions/loss.py index 51d515ed..5e926f30 100644 --- a/fltk/util/config/definitions/loss.py +++ b/fltk/util/config/definitions/loss.py @@ -13,6 +13,7 @@ @unique class Loss(Enum): + """ """ l1_loss = 'L1Loss' mse_loss = 'MSELoss' cross_entropy_loss = 'CrossEntropyLoss' @@ -36,9 +37,15 @@ class Loss(Enum): def get_loss_function(request: Union[str, Loss]) -> Type[_Loss]: - """ - Mapper function to map a request to a loss function. As fallback behavior the request is evaluated + """Mapper function to map a request to a loss function. As fallback behavior the request is evaluated using the Python interpreter to try to load an existing implementation dynamically. + + Args: + request (Union[str, Loss]): Configured loss function to retrieve. + + Returns: + Type[_Loss]: Class reference to loss function implementation of PyTorch. + """ __lookup_dict: Dict[Loss, Type[_Loss]] = { Loss.l1_loss: torch.nn.L1Loss, diff --git a/fltk/util/config/definitions/net.py b/fltk/util/config/definitions/net.py index 31e85ea3..b6f18ebe 100644 --- a/fltk/util/config/definitions/net.py +++ b/fltk/util/config/definitions/net.py @@ -1,7 +1,10 @@ +from typing import Optional + from aenum import unique, Enum @unique class Nets(Enum): + """ """ cifar100_resnet = "Cifar100ResNet" cifar100_vgg = "Cifar100VGG" cifar10_cnn = "Cifar10CNN" @@ -11,7 +14,16 @@ class Nets(Enum): mnist_cnn = 'MNISTCNN' @classmethod - def _missing_name_(cls, name: str): + def _missing_name_(cls, name: str) -> Optional["Nets"]: + """Helper function to get lower/higher-case configured network, to allow for case-insensitive lookup. + + Args: + name (str): Name of network to lookup in case of missing lookup. + + Returns: + Optional[Nets]: Of name is not part of defined networks, else reference to class implementing network. + + """ for member in cls: if member.name.lower() == name.lower(): return member diff --git a/fltk/util/config/definitions/optim.py b/fltk/util/config/definitions/optim.py index 2d64a9ac..6863c369 100644 --- a/fltk/util/config/definitions/optim.py +++ b/fltk/util/config/definitions/optim.py @@ -3,6 +3,7 @@ @unique class Optimizations(Enum): + """ """ adam = 'Adam' adam_w = 'AdamW' sgd = 'SGD' diff --git a/fltk/util/config/definitions/orchestrator.py b/fltk/util/config/definitions/orchestrator.py index d22da1eb..7667bb7b 100644 --- a/fltk/util/config/definitions/orchestrator.py +++ b/fltk/util/config/definitions/orchestrator.py @@ -15,19 +15,24 @@ @unique class OrchestratorType(Enum): + """ """ BATCH = 'batch' SIMULATED = 'simulated' def get_orchestrator(config: DistributedConfig, cluster_manager: ClusterManager, arrival_generator: ArrivalGenerator) -> Orchestrator: - """ - Retrieve Orchestrator type given a Distributed (experiment) configuration. This allows for defining the + """Retrieve Orchestrator type given a Distributed (experiment) configuration. This allows for defining the type of experiment (Batch or Simulated arrivals) once, and letting the Orchestrator implementation make sure that the tasks are scheduled correctly. - @param config: Distributed (cluster) configuration object for experiments. - @type config: DistributedConfig - @return: Type of Orchestrator as requested by configuration object. - @rtype: Type[Orchestrator] + + Args: + config (DistributedConfig): Distributed (cluster) configuration object for experiments. + cluster_manager (ClusterManager): + arrival_generator (ArrivalGenerator): + + Returns: + Type[Orchestrator]: Type of Orchestrator as requested by configuration object. + """ __lookup = { OrchestratorType.BATCH: BatchOrchestrator, @@ -39,14 +44,15 @@ def get_orchestrator(config: DistributedConfig, cluster_manager: ClusterManager, def get_arrival_generator(config: DistributedConfig, experiment: str) -> ArrivalGenerator: - """ - Retrieval function to create generator functions - @param config: Distributed (cluster) configuration with general configuration. - @type config: DistributedConfig - @param experiment: Experiment name. - @type experiment: str - @return: ArrivalGenerator initialized with the experiment Path. - @rtype: ArrivalGenerator + """Retrieval function to create generator functions + + Args: + config (DistributedConfig): Distributed (cluster) configuration with general configuration. + experiment (str): Experiment name. + + Returns: + ArrivalGenerator: ArrivalGenerator object initialized with the experiment Path. + """ __lookup = { OrchestratorType.BATCH: SequentialArrivalGenerator, diff --git a/fltk/util/config/distributed_config.py b/fltk/util/config/distributed_config.py index 8da9da82..a6de0bda 100644 --- a/fltk/util/config/distributed_config.py +++ b/fltk/util/config/distributed_config.py @@ -17,6 +17,7 @@ @dataclass_json @dataclass class GeneralNetConfig: + """ """ save_model: bool = False save_temp_model: bool = False save_epoch_interval: int = 1 @@ -28,27 +29,29 @@ class GeneralNetConfig: @dataclass_json @dataclass(frozen=True) class ReproducibilityConfig: - """ - Dataclass object to hold experiment configuration settings related to reproducibility of experiments. - """ + """Dataclass object to hold experiment configuration settings related to reproducibility of experiments.""" seeds: List[int] @dataclass_json @dataclass(frozen=True) class TensorboardConfig: + """ """ active: bool record_dir: str def prepare_log_dir(self, working_dir: Path = None): - """ - Function to create logging directory used by TensorBoard. When running in a cluster, this function should not be + """Function to create logging directory used by TensorBoard. When running in a cluster, this function should not be used, as the TensorBoard instance that is started simultaneously with the Orchestrator. - @param working_dir: Current working directory, by default PWD is assumed at which the Python interpreter is + + Args: + working_dir(Path): Current working directory, by default PWD is assumed at which the Python interpreter is started. - @type working_dir: Path - @return: None - @rtype: None + working_dir: Path: (Default value = None) + + Returns: + None: None + """ dir_to_check = Path(self.record_dir) if working_dir: @@ -60,6 +63,7 @@ def prepare_log_dir(self, working_dir: Path = None): @dataclass_json @dataclass class ExecutionConfig: + """ """ general_net: GeneralNetConfig = field(metadata=config(field_name="net")) reproducibility: ReproducibilityConfig tensorboard: TensorboardConfig @@ -77,6 +81,7 @@ class ExecutionConfig: @dataclass_json @dataclass class OrchestratorConfig: + """ """ orchestrator_type: OrchestratorType parallel_execution: bool = True @@ -84,6 +89,7 @@ class OrchestratorConfig: @dataclass_json @dataclass class ClientConfig: + """ """ prefix: str tensorboard_active: bool @@ -91,33 +97,39 @@ class ClientConfig: @dataclass_json @dataclass class ClusterConfig: + """ """ orchestrator: OrchestratorConfig client: ClientConfig namespace: str = 'test' image: str = 'fltk:latest' def load_incluster_namespace(self): - """ - Function to retrieve information from teh cluster itself provided by K8s. - @return: None - @rtype: None + """Function to retrieve information from teh cluster itself provided by K8s. + + Args: + + Returns: + None: None + """ with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f: current_namespace = f.read() self.namespace = current_namespace def load_incluster_image(self): - """ - Function to load the in-cluster image. The fltk-values.yaml file in charts is expected to have (at least) the + """Function to load the in-cluster image. The fltk-values.yaml file in charts is expected to have (at least) the following contents. The default Helm chart contains the necessary options to set this correctly. - + provider: domain: gcr.io projectName: imageName: fltk:latest - @return: None - @rtype: None + Args: + + Returns: + None: None + """ self.image = os.environ.get('IMAGE_NAME') @@ -125,41 +137,55 @@ def load_incluster_image(self): @dataclass_json @dataclass class DistributedConfig: - """ - Configuration Dataclass for shared configurations between experiments. This regards your general setup, describing + """Configuration Dataclass for shared configurations between experiments. This regards your general setup, describing elements like the utilization of CUDA accelerators, format of logging file names, whether to save experiment data and the likes. + + Args: + + Returns: + """ execution_config: ExecutionConfig cluster_config: ClusterConfig = field(metadata=config(field_name="cluster")) config_path: Optional[Path] = None def get_duration(self) -> int: - """ - Function to get execution duration of an experiment. - @return: Integer representation of seconds for which the experiments must be run. - @rtype: int + """Function to get execution duration of an experiment. + + Args: + + Returns: + int: Integer representation of seconds for which the experiments must be run. + """ return self.execution_config.duration def get_log_dir(self): - """ - Function to get the logging directory from the configuration. - @return: path object to the logging directory. - @rtype: Path + """Function to get the logging directory from the configuration. + + Args: + + Returns: + Path: path object to the logging directory. + """ return self.execution_config.log_path def get_log_path(self, experiment_id: str, client_id: int, learn_params: DistLearnerConfig) -> Path: - """ - Function to get the logging path that corresponds to a specific experiment, client and network that has been + """Function to get the logging path that corresponds to a specific experiment, client and network that has been deployed as learning task. - @param experiment_id: Unique experiment ID (should be provided by the Orchestrator). - @type experiment_id: str - @param client_id: Rank of the client. - @type client_id: int - @return: Path representation of the directory/path should be logged by the training process. - @rtype: Path + + Args: + experiment_id(str): Unique experiment ID (should be provided by the Orchestrator). + client_id(int): Rank of the client. + experiment_id: str: + client_id: int: + learn_params: DistLearnerConfig: + + Returns: + Path: Path representation of the directory/path should be logged by the training process. + """ base_log = Path(self.execution_config.tensorboard.record_dir) model, dataset, replication = learn_params.model, learn_params.dataset, learn_params.replication @@ -167,54 +193,70 @@ def get_log_path(self, experiment_id: str, client_id: int, learn_params: DistLea return base_log.joinpath(experiment_dir) def get_data_path(self) -> Path: - """ - Function to get the data path from config. - @return: Path representation to where data can be written. - @rtype: Path + """Function to get the data path from config. + + Args: + + Returns: + Path: Path representation to where data can be written. + """ return Path(self.execution_config.data_path) def get_default_model_folder_path(self) -> Path: - """ - @deprecated Function to get the default model folder path from FedLearningConfig, needed for non-default training in the + """@deprecated Function to get the default model folder path from FedLearningConfig, needed for non-default training in the FLTK framework. - @return: Path representation of model path. - @rtype: Path + + Args: + + Returns: + Path: Path representation of model path. + """ return Path(self.execution_config.default_model_folder_path) def cuda_enabled(self) -> bool: - """ - Function to check CUDA availability independent of BareConfig structure. - @return: True when CUDA should be used, False otherwise. - @rtype: bool + """Function to check CUDA availability independent of BareConfig structure. + + Args: + + Returns: + bool: True when CUDA should be used, False otherwise. + """ return self.execution_config.cuda def should_save_model(self, epoch_idx) -> bool: - """ - @deprecated Returns true/false models should be saved. + """@deprecated Returns true/false models should be saved. + + Args: + epoch_idx(int): current training epoch index + + Returns: + bool: Boolean indication of whether the model should be saved - @param epoch_idx: current training epoch index - @type epoch_idx: int - @return: Boolean indication of whether the model should be saved - @rtype: bool """ return self.execution_config.general_net.save_model and ( epoch_idx == 1 or epoch_idx % self.execution_config.general_net.save_epoch_interval == 0) def get_epoch_save_end_suffix(self) -> str: - """ - Function to gather the end suffix for saving after running an epoch. - @return: Suffix for saving epoch data. - @rtype: str + """Function to gather the end suffix for saving after running an epoch. + + Args: + + Returns: + str: Suffix for saving epoch data. + """ return self.execution_config.epoch_save_end_suffix def get_save_model_folder_path(self) -> Path: - """ - Function to get save path for a model. - @return: Path to where the model should be saved. - @rtype: Path + """Function to get save path for a model. + + Args: + + Returns: + Path: Path to where the model should be saved. + """ return Path(self.execution_config.save_model_path) diff --git a/fltk/util/config/experiment_config.py b/fltk/util/config/experiment_config.py index 63d3eecd..8d7fc496 100644 --- a/fltk/util/config/experiment_config.py +++ b/fltk/util/config/experiment_config.py @@ -10,10 +10,13 @@ def _none_factory() -> None: - """ - Helper function to construct a default 'None' value. - @return: Default None value. - @rtype: None + """Helper function to construct a default 'None' value. + + Args: + + Returns: + None: Default None value. + """ return None @@ -21,9 +24,7 @@ def _none_factory() -> None: @dataclass_json(letter_case=LetterCase.CAMEL) @dataclass(frozen=True) class OptimizerConfig: - """ - Dataclass containing learning Optimizer parameters for learning tasks. - """ + """Dataclass containing learning Optimizer parameters for learning tasks.""" type: Optional[Optimizations] = None momentum: Optional[Union[float, Tuple[float]]] = None betas: Optional[Union[float, Tuple[float]]] = None @@ -33,9 +34,7 @@ class OptimizerConfig: @dataclass_json(letter_case=LetterCase.CAMEL) @dataclass(frozen=True) class SchedulerConfig: - """ - Dataclass containing learning rate scheduler configuration. - """ + """Dataclass containing learning rate scheduler configuration.""" scheduler_step_size: Optional[int] = None scheduler_gamma: Optional[float] = None min_lr: Optional[float] = field(metadata=config(field_name="minimumLearningRate"), default_factory=_none_factory) @@ -44,9 +43,7 @@ class SchedulerConfig: @dataclass_json(letter_case=LetterCase.CAMEL) @dataclass(frozen=True) class HyperParameterConfiguration: - """ - Dataclass containing training hyper parameters for learning tasks. - """ + """Dataclass containing training hyper parameters for learning tasks.""" optimizer_config: Optional[OptimizerConfig] = field(metadata=config(field_name="optimizerConfig"), default_factory=_none_factory) scheduler_config: Optional[SchedulerConfig] = field(metadata=config(field_name="schedulerConfig"), @@ -57,28 +54,30 @@ class HyperParameterConfiguration: lr_decay: Optional[float] = field(metadata=config(field_name="learningRateDecay"), default_factory=_none_factory) total_epochs: int = None - def merge_default(self, other: Dict[str, Any]): - """ - Function to merge a HyperParameterConfiguration object with a default configuration - @param other: - @type other: - @return: - @rtype: + def merge_default(self, other: Dict[str, Any]) -> "HyperParameterConfiguration": + """Function to merge a HyperParameterConfiguration object with a default configuration. + + Args: + other (Dict[str, Any]): Non-default configuration to overwrite default HyperParameter configuration object. + + Returns: + HyperParameterConfiguration: Merged configuration objects, allowing defaults to be overwritten + """ return HyperParameterConfiguration.from_dict({**self.__dict__, **other}) # pylint: disable=no-member -def merge_optional(default_dict: Dict[str, Any], update_dict: Dict[str, Any], tpe: str): - """ - Function to merge dictionaries to add set parameters from update dictionary into default dictionary. - @param default_dict: Default configuraiton dictionary. - @type default_dict: dict - @param update_dict: Update configuration to be merged into default configurations. - @type update_dict: dict - @param tpe: String representation of type of learner. - @type tpe: str - @return: Result of merged dictionaries. - @rtype: dict +def merge_optional(default_dict: Dict[str, Any], update_dict: Dict[str, Any], tpe: str) -> Dict[str, Any]: + """Function to merge dictionaries to add set parameters from update dictionary into default dictionary. + + Args: + default_dict (Dict[str, Any]): Default configuration dictionary to be overwritten by update_dict. + update_dict ((Dict[str, Any])): Update configuration to be merged into default configurations. + tpe (str): String representation of type of learner. + + Returns: + Dict[str, Any]: Result of merged dictionaries. + """ default_copy = default_dict.copy() for k, v in default_copy.items(): # pylint: disable=invalid-name @@ -98,19 +97,18 @@ def merge_optional(default_dict: Dict[str, Any], update_dict: Dict[str, Any], tp def merge_optional_dataclass(default: T, update: T, data_type: Type[T], learner_type: str) -> T: - """ - Function to merge two dataclasses of same type to update a default object with an update dataclass containing + """Function to merge two dataclasses of same type to update a default object with an update dataclass containing only a few set parameters. - @param default: Default dataclass. - @type default: T - @param update: Update dataclass to merge into default. - @type update: T - @param data_type: Type of the two dataclasses. - @type data_type: Type[T] - @param learner_type: String representation of learner type. - @type learner_type: str - @return: Instance of the passed data_type. - @rtype: T + + Args: + default (T): Default dataclass. + update (T): Update dataclass to merge into default. + data_type (Type[T]): Type of the two dataclasses, to return Type T of dataclass. + learner_type (str): String representation of learner type. + + Returns: + T: Instance of the passed data_type. + """ if isinstance(update, default.__class__): merged = data_type.from_dict(merge_optional(default.to_dict(), update.to_dict(), learner_type)) # pylint: disable=no-member @@ -121,9 +119,8 @@ def merge_optional_dataclass(default: T, update: T, data_type: Type[T], learner_ @dataclass_json(letter_case=LetterCase.CAMEL) @dataclass(frozen=True) class HyperParameters: - """ - Learning HyperParameters. - + """Learning HyperParameters. + bs: Number of images that are used during each forward/backward phase. max_epoch: Number of times epochs are executed. lr: Learning rate parameter, limiting the step size in the gradient update. @@ -136,8 +133,6 @@ def __post_init__(self): """ Post init function that populates the hyperparameters of optionally configured elements of a HyperParam Configuration. - @return: - @rtype: """ for learner_type in self.configurations.keys(): @@ -149,16 +144,22 @@ def __post_init__(self): self.configurations[learner_type] = updated_conf - def get(self, tpe: str): + def get(self, tpe: str) -> "HyperParameterConfiguration": + """Get function to retrieve HyperParameterConfiguration corresponding to a type of learner. + + Args: + tpe (str): String representation of type of learner. + + Returns: + HyperParameterConfiguration: Configuration corresponding to learner. + """ return self.configurations[tpe] @dataclass_json(letter_case=LetterCase.CAMEL) @dataclass(frozen=True) class Priority: - """ - Job class priority, indicating the presedence of one arrival over another. - """ + """Job class priority, indicating the presedence of one arrival over another.""" priority: int probability: float @@ -166,9 +167,7 @@ class Priority: @dataclass_json(letter_case=LetterCase.CAMEL) @dataclass(frozen=True) class SystemResources: - """ - Dataclass representing SystemResources for Pods to be spawned in K8s. - """ + """Dataclass representing SystemResources for Pods to be spawned in K8s.""" cores: str memory: str @@ -176,8 +175,8 @@ class SystemResources: @dataclass_json(letter_case=LetterCase.CAMEL) @dataclass(frozen=True) class SystemParameters: - """ - System parameters to spawn pods with. + """System parameters to spawn pods with. + data_parallelism: Number of pods (distributed) that will work together on training the network. executor_cores: Number of cores assigned to each executor. executor_memory: Amount of RAM allocated to each executor. @@ -187,6 +186,15 @@ class SystemParameters: configurations: OrderedDict[str, SystemResources] def get(self, tpe: str): + """Get function to retrieve SystemParameters for Type. Currently only supports Master and Client per Kubeflow. + + Args: + tpe (str): Type of Pod/Learner to retrieve SystemResources for. + + Returns: + SystemResources: Configuration object for system resources corresponding to the requested type. + + """ if tpe in self.configurations: return self.configurations[tpe] # Fallback to default for alternative declaration. @@ -196,9 +204,7 @@ def get(self, tpe: str): @dataclass_json(letter_case=LetterCase.CAMEL) @dataclass(frozen=True) class NetworkConfiguration: - """ - Dataclass describing the network and dataset that is 'trained' for a task. - """ + """Dataclass describing the network and dataset that is 'trained' for a task.""" network: Nets dataset: Dataset loss_function: Optional[Loss] @@ -207,9 +213,7 @@ class NetworkConfiguration: @dataclass_json(letter_case=LetterCase.CAMEL) @dataclass(frozen=True) class SamplerConfiguration: - """ - Dataclass containing configuration for datasampler to be used by learners. - """ + """Dataclass containing configuration for datasampler to be used by learners.""" type: DataSampler q_value: str seed: int @@ -219,10 +223,7 @@ class SamplerConfiguration: @dataclass_json(letter_case=LetterCase.CAMEL) @dataclass(frozen=True) class LearningParameters: - """ - Dataclass containing configuration parameters for the learning process itself. This includes the Federated learning - parameters as well as some system parameters like cuda. - """ + """Dataclass containing configuration parameters for the learning process itself. This includes the Federated learning""" cuda: bool rounds: Optional[int] = None epochs_per_round: Optional[int] = None @@ -234,9 +235,7 @@ class LearningParameters: @dataclass_json(letter_case=LetterCase.CAMEL) @dataclass(frozen=True) class JobClassParameter: - """ - Dataclass describing the job specific parameters (system and hyper). - """ + """Dataclass describing the job specific parameters (system and hyper).""" network_configuration: NetworkConfiguration system_parameters: SystemParameters hyper_parameters: HyperParameters @@ -248,10 +247,9 @@ class JobClassParameter: @dataclass_json(letter_case=LetterCase.CAMEL) @dataclass(frozen=True) class JobDescription: - """ - Dataclass describing the characteristics of a Job type, as well as the corresponding arrival statistic. + """Dataclass describing the characteristics of a Job type, as well as the corresponding arrival statistic. Currently, the arrival statistics is the lambda value used in a Poisson arrival process. - + preemtible_jobs: indicates whether the jobs can be pre-emptively rescheduled by the scheduler. This is currently not implemented in FLTK, but could be added as a project (advanced). """ @@ -264,8 +262,7 @@ class JobDescription: @dataclass_json(letter_case=LetterCase.CAMEL) @dataclass(frozen=True) class ExperimentConfig: - """ - Dataclass object containing the configuration of an entire experiment, including + """Dataclass object containing the configuration of an entire experiment, including configurations for repetitions. See also JobDescription to define types of jobs, and ExperimentConfiguration to set the configuration of experiments. """ @@ -273,16 +270,13 @@ class ExperimentConfig: class ExperimentParser: # pylint: disable=too-few-public-methods - """ - Simple parser to load experiment configuration into a programmatic objects. - """ + """Simple parser to load experiment configuration into a programmatic objects.""" def __init__(self, config_path: Path): self.__config_path = config_path def parse(self) -> ExperimentConfig: - """ - Parse function to load JSON conf into JobDescription objects. Any changes to the JSON file format + """Parse function to load JSON conf into JobDescription objects. Any changes to the JSON file format should be reflected by the classes used. For more information refer to the dataclasses JSON documentation https://pypi.org/project/dataclasses-json/. """ diff --git a/fltk/util/config/learner_config.py b/fltk/util/config/learner_config.py index 00022cb2..ede6ade0 100644 --- a/fltk/util/config/learner_config.py +++ b/fltk/util/config/learner_config.py @@ -21,10 +21,15 @@ def _eval_decoder(obj: Union[str, T]) -> Union[Any, T]: - """ - Decoder function to help decoding string objects to objects using the Python interpeter. + """Decoder function to help decoding string objects to objects using the Python interpeter. If a non-string object is passed it will return the argument + Args: + obj (Union[str, T]): Object to decode during evaluation. + + Returns: + Union[Any, T]: Depending on object that was passed, instance of type T, or Any other Python object. + """ if isinstance(obj, str): return eval(obj) @@ -32,13 +37,15 @@ def _eval_decoder(obj: Union[str, T]) -> Union[Any, T]: def get_safe_loader() -> Type[yaml.SafeLoader]: - """ - Function to get a yaml SafeLoader that is capable of properly parsing yaml compatible floats. - + """Function to get a yaml SafeLoader that is capable of properly parsing yaml compatible floats. + The default yaml loader would parse a value such as `1e-10` as a string, rather than a float. - @return: SafeLoader capable of parsing scientificly notated yaml values. - @rtype: yaml.SafeLoader + Args: + + Returns: + yaml.SafeLoader: SafeLoader capable of parsing scientificly notated yaml values. + """ # Current version of yaml does not parse numbers like 1e-10 correctly, resulting in a str type. # Credits to https://stackoverflow.com/a/30462009/14661801 @@ -60,6 +67,7 @@ def get_safe_loader() -> Type[yaml.SafeLoader]: @dataclass_json @dataclass class LearnerConfig: + """ """ replication: int = field(metadata=dict(required=False, missing=-1)) batch_size: int = field(metadata=dict(required=False, missing=128)) test_batch_size: int = field(metadata=dict(required=False, missing=128)) @@ -73,6 +81,7 @@ class LearnerConfig: @dataclass_json @dataclass class FedLearnerConfig(LearnerConfig): + """ """ loss_function: Loss = Loss.cross_entropy_loss # Number of communication epochs. rounds: int = 2 @@ -123,47 +132,58 @@ class FedLearnerConfig(LearnerConfig): output_path: Path = field(metadata=config(encoder=str, decoder=Path), default=Path('logging')) def update_rng_seed(self): + """ """ torch.manual_seed(self.rng_seed) def get_default_model_folder_path(self): + """ """ return self.default_model_folder_path def get_distributed(self): + """ """ return self.distributed def get_sampler(self): + """ """ return self.data_sampler def get_world_size(self): + """ """ return self.world_size def get_rank(self): + """ """ return self.rank def get_sampler_args(self): + """ """ return tuple(self.data_sampler_args) def get_data_path(self): + """ """ return self.data_path def get_loss_function(self) -> Type[_Loss]: + """ """ return get_loss_function(self.loss_function) @staticmethod def from_yaml(path: Path): - """ - Parse yaml file to dataclass. Re-implemented to rely on dataclasses_json to load data with tested library. - + """Parse yaml file to dataclass. Re-implemented to rely on dataclasses_json to load data with tested library. + Alternatively, running the followign code would result in loading a JSON formatted configuration file, in case you prefer to create json based configuration files. - >>> with open("configs/example.json") as f: - >>> FedLearnerConfig.from_json(f.read()) + Args: + path(Path): Path pointing to configuration yaml file. + path: Path: - @param path: Path pointing to configuration yaml file. - @type path: Path - @return: Configuration dataclass representation of the configuration file. - @rtype: FedLearnerConfig + Returns: + FedLearnerConfig: Configuration dataclass representation of the configuration file. + + Examples: + >>> with open("configs/example.json") as f: + >>> FedLearnerConfig.from_json(f.read()) """ getLogger(__name__).debug(f'Loading yaml from {path.absolute()}') safe_loader = get_safe_loader() @@ -176,9 +196,7 @@ def from_yaml(path: Path): @dataclass_json @dataclass class DistLearnerConfig(LearnerConfig): # pylint: disable=too-many-instance-attributes - """ - Class encapsulating LearningParameters, for now used under DistributedLearning. - """ + """Class encapsulating LearningParameters, for now used under DistributedLearning.""" optimizer_args: Dict[str, Any] model: Nets dataset: Dataset @@ -190,19 +208,17 @@ class DistLearnerConfig(LearnerConfig): # pylint: disable=too-many-instance-att @staticmethod def from_yaml(path: Path) -> "DistLearnerConfig": - """ - Parse yaml file to dataclass. Re-implemented to rely on dataclasses_json to load data with tested library. - - Alternatively, running the followign code would result in loading a JSON formatted configuration file, in case + """Parse yaml file to dataclass. Re-implemented to rely on dataclasses_json to load data with tested library. + + Alternatively, running the following code would result in loading a JSON formatted configuration file, in case you prefer to create json based configuration files. - >>> with open("configs/example.json") as f: - >>> DistLearnerConfig.from_json(f.read()) + Args: + path (Path): Path pointing to configuration yaml file. + + Returns: + DistLearnerConfig: Configuration dataclass representation of the configuration file. - @param path: Path pointing to configuration yaml file. - @type path: Path - @return: Configuration dataclass representation of the configuration file. - @rtype: FedLearnerConfig """ getLogger(__name__).debug(f'Loading yaml from {path.absolute()}') safe_loader = get_safe_loader() @@ -213,7 +229,5 @@ def from_yaml(path: Path) -> "DistLearnerConfig": def get_loss_function(self) -> Type[_Loss]: - """ - Helper function to get loss_function based on definition _or_ string. - """ + """Helper function to get loss_function based on definition _or_ string.""" return get_loss_function(self.loss) diff --git a/fltk/util/singleton.py b/fltk/util/singleton.py deleted file mode 100644 index 0e2b8402..00000000 --- a/fltk/util/singleton.py +++ /dev/null @@ -1,15 +0,0 @@ -import threading - -class Singleton(type): - """ - Helper class defining a Singleton object for Python meta-classes. - """ - _lock = threading.Lock() - _instances = {} - - def __call__(cls, *args, **kwargs): - if cls not in cls._instances: - with cls._lock: - if cls not in cls._instances: - cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) - return cls._instances[cls] diff --git a/fltk/util/task/generator/arrival_generator.py b/fltk/util/task/generator/arrival_generator.py index a7b144c2..00a768be 100644 --- a/fltk/util/task/generator/arrival_generator.py +++ b/fltk/util/task/generator/arrival_generator.py @@ -1,3 +1,4 @@ +import abc import collections import logging import multiprocessing @@ -17,12 +18,11 @@ from fltk.util.config.definitions.net import Nets from fltk.util.config.experiment_config import (HyperParameters, SystemParameters, LearningParameters, JobDescription, ExperimentParser) -from fltk.util.singleton import Singleton from fltk.util.task.train_task import TrainTask @dataclass -class ArrivalGenerator(metaclass=Singleton): # pylint: disable=too-many-instance-attributes +class ArrivalGenerator(abc.ABC): # pylint: disable=too-many-instance-attributes """ Abstract Base Class for generating arrivals in the system. These tasks must be run """