diff --git a/Dockerfile b/Dockerfile index 47a53e91..bee5f1be 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,5 @@ FROM ubuntu:20.04 - MAINTAINER Jeroen Galjaard # Run build without interactive dialogue @@ -14,7 +13,7 @@ WORKDIR /opt/federation-lab # Update the Ubuntu software repository and fetch packages RUN apt-get update \ - && apt-get install -y curl python3 python3-pip net-tools iproute2 + && apt-get install -y curl python3 python3-pip # Add Pre-downloaded models (otherwise needs be run every-time) ADD data/ data/ diff --git a/experiments/example_cuda/descr.yaml b/experiments/example_cuda/descr.yaml index b42db5f4..91d5bfa3 100644 --- a/experiments/example_cuda/descr.yaml +++ b/experiments/example_cuda/descr.yaml @@ -12,7 +12,7 @@ profiling_time: 100 warmup_round: false output_location: 'output/example_cuda' tensor_board_active: true -clients_per_round: 2 +clients_per_round: 1 node_groups: slow: [1, 1] medium: [2, 2] diff --git a/fltk/__main__.py b/fltk/__main__.py index f6e41d36..cccb8d20 100644 --- a/fltk/__main__.py +++ b/fltk/__main__.py @@ -2,6 +2,7 @@ import json import logging from pathlib import Path +from typing import Optional, Any from fltk.launch import launch_extractor, launch_client, launch_single, \ launch_remote, launch_cluster @@ -22,6 +23,12 @@ } +def _save_get(args, param) -> Optional[Any]: + if args is not None and hasattr(args, param): + return args.__dict__[param] + return None + + def __main__(): parser = argparse.ArgumentParser(prog='fltk', description='Experiment launcher for the Federated Learning Testbed (fltk)') @@ -33,17 +40,32 @@ def __main__(): """ args = parser.parse_args() + config = None + try: + with open(args.config, 'r') as config_file: + config: DistributedConfig = DistributedConfig.from_dict(json.load(config_file)) + config.config_path = Path(args.config) + except: + pass + arg_path, conf_path = None, None + try: + arg_path = Path(args.path) + except Exception as e: + print('No argument path is provided.') + try: + conf_path = Path(args.config) + except Exception as e: + print('No configuration path is provided.') - with open(args.config, 'r') as config_file: - config: DistributedConfig = DistributedConfig.from_dict(json.load(config_file)) - config.config_path = Path(args.config) - - arg_path = Path(args.path) - conf_path = Path(args.config) - - # Lookup execution mode and call function to start subroutine - __run_op_dict[args.action](arg_path, conf_path, rank=args.rank, parser=parser, nic=args.nic, host=args.host, - prefix=args.prefix, args=args) + # TODO: move kwargs into function as extractor + __run_op_dict[args.action](arg_path, conf_path, + rank=_save_get(args, 'rank'), + parser=parser, + nic=_save_get(args, 'nic'), + host=_save_get(args, 'host'), + prefix=_save_get(args, 'prefix'), + args=args, + config=config) exit(0) diff --git a/fltk/core/distributed/client.py b/fltk/core/distributed/client.py index 0ec20f22..28582633 100644 --- a/fltk/core/distributed/client.py +++ b/fltk/core/distributed/client.py @@ -17,7 +17,7 @@ from fltk.util.results import EpochData -class Client(DistNode): +class DistClient(DistNode): """ TODO: Combine with Client and differentiate between Federated and Distributed Learnign through better inheritance. """ diff --git a/fltk/datasets/distributed/dataset.py b/fltk/datasets/distributed/dataset.py index 83f4cf31..b096b494 100644 --- a/fltk/datasets/distributed/dataset.py +++ b/fltk/datasets/distributed/dataset.py @@ -1,10 +1,12 @@ from abc import abstractmethod +from typing import Any + from torch.utils.data import DataLoader from torch.utils.data import TensorDataset import torch import numpy -from fltk.util.arguments import Arguments +# from fltk.util.arguments import Arguments from fltk.util.log import getLogger @@ -17,7 +19,8 @@ class DistDataset: train_loader = None test_loader = None logger = getLogger(__name__) - def __init__(self, args: Arguments): + + def __init__(self, args: Any): self.args = args # self.train_dataset = self.load_train_dataset() # self.test_dataset = self.load_test_dataset() diff --git a/fltk/datasets/distributed/fashion_mnist.py b/fltk/datasets/distributed/fashion_mnist.py index 770f6b66..1a76fb96 100644 --- a/fltk/datasets/distributed/fashion_mnist.py +++ b/fltk/datasets/distributed/fashion_mnist.py @@ -1,4 +1,4 @@ -from fltk.datasets.distributed import DistDataset +from fltk.datasets.distributed.dataset import DistDataset from torchvision import datasets from torchvision import transforms from torch.utils.data import DataLoader, DistributedSampler diff --git a/fltk/datasets/distributed/mnist.py b/fltk/datasets/distributed/mnist.py index a4056a3c..32758658 100644 --- a/fltk/datasets/distributed/mnist.py +++ b/fltk/datasets/distributed/mnist.py @@ -1,5 +1,5 @@ from __future__ import annotations -from fltk.datasets import DistDataset +from fltk.datasets.distributed.dataset import DistDataset from torchvision import datasets, transforms from torch.utils.data import DataLoader # from fltk.strategy import get_sampler, get_augmentations, get_augmentations_tensor, UnifyingSampler diff --git a/fltk/datasets/loader_util.py b/fltk/datasets/loader_util.py index 1671882f..7e0d1264 100644 --- a/fltk/datasets/loader_util.py +++ b/fltk/datasets/loader_util.py @@ -1,9 +1,7 @@ -from fltk.datasets.distributed.mnist import DistMNISTDataset -from fltk.datasets.distributed.cifar10 import DistCIFAR10Dataset -from fltk.datasets.distributed.cifar100 import DistCIFAR100Dataset -from fltk.datasets.distributed.fashion_mnist import DistFashionMNISTDataset +from fltk.datasets.distributed import DistMNISTDataset, DistFashionMNISTDataset, DistCIFAR100Dataset, DistCIFAR10Dataset from fltk.util.definitions import Dataset + def available_datasets(): return { Dataset.cifar10: DistCIFAR10Dataset, @@ -12,6 +10,7 @@ def available_datasets(): Dataset.mnist: DistMNISTDataset } + def get_dataset(name: Dataset): return available_datasets()[name] @@ -26,11 +25,11 @@ def get_train_loader_path(name: Dataset) -> str: return paths[name] -def get_test_loader_path(name: Dataset)-> str: +def get_test_loader_path(name: Dataset) -> str: paths = { Dataset.cifar10: 'data_loaders/cifar10/test_data_loader.pickle', Dataset.fashion_mnist: 'data_loaders/fashion-mnist/test_data_loader.pickle', Dataset.cifar100: 'data_loaders/cifar100/test_data_loader.pickle', Dataset.mnist: 'data_loaders/mnist/test_data_loader.pickle', } - return paths[name] \ No newline at end of file + return paths[name] diff --git a/fltk/launch.py b/fltk/launch.py index 91e7d13d..b237a8c4 100644 --- a/fltk/launch.py +++ b/fltk/launch.py @@ -9,7 +9,7 @@ from torch.distributed import rpc from fltk.core.client import Client -from fltk.core.distributed.client import Client +from fltk.core.distributed.client import DistClient from fltk.core.distributed.extractor import download_datasets from fltk.core.distributed.orchestrator import Orchestrator from fltk.core.federator import Federator @@ -70,7 +70,7 @@ def launch_distributed_client(task_id: str, config: DistributedConfig = None, le logging.info(f'Starting Creating client with {rank}') - client = Client(rank, task_id, world_size, config, learning_params) + client = DistClient(rank, task_id, world_size, config, learning_params) client.prepare_learner(distributed) epoch_data = client.run_epochs() print(epoch_data) @@ -153,7 +153,16 @@ def launch_single(base_path: Path, config_path: Path, prefix: str = None, **kwar federator_node.run() -def _retrieve_env_params(nic=None, host=None): +def _retrieve_or_init_env(nic=None, host=None): + """ + Function + @param nic: + @type nic: + @param host: + @type host: + @return: + @rtype: + """ if host: os.environ['MASTER_ADDR'] = host os.environ['MASTER_PORT'] = '5000' @@ -161,6 +170,9 @@ def _retrieve_env_params(nic=None, host=None): os.environ['GLOO_SOCKET_IFNAME'] = nic os.environ['TP_SOCKET_IFNAME'] = nic +def _retrieve_env_config(): + rank, world_size, port = os.environ.get('RANK'), os.environ.get('WORLD_SIZE'), os.environ["MASTER_PORT"] + return rank, world_size, port def _retrieve_network_params_from_config(config: Config, nic=None, host=None): if hasattr(config, 'system'): @@ -173,27 +185,33 @@ def _retrieve_network_params_from_config(config: Config, nic=None, host=None): return nic, host -def launch_remote(base_path: Path, config_path: Path, rank: int, parser, nic=None, host=None, prefix: str = None, - **kwargs): +def launch_remote(base_path: Path, config_path: Path, rank: int, parser, nic=None, host=None, prefix: str = None, **kwargs): print(config_path, rank) + config = Config.FromYamlFile(config_path) config.world_size = config.num_clients + 1 config.replication_id = prefix - nic, host = _retrieve_network_params_from_config(config, nic, host) if not (nic and host): - print('Missing rank, host, world-size, or nic argument when in \'remote\' mode!') + nic, host = _retrieve_network_params_from_config(config, nic, host) + _retrieve_or_init_env(nic, host) + elif not (nic and host): + rank, world_size, master_port = _retrieve_env_config() + assert world_size == config.world_size + else: + print('Missing rank, host, world-size, checking environment!') parser.print_help() exit(1) - _retrieve_env_params(nic, host) - print(f'Starting with host={os.environ["MASTER_ADDR"]} and port={os.environ["MASTER_PORT"]} and interface={nic}') + + msg = f'Starting with host={host} and port={os.environ["MASTER_PORT"]} and interface={nic}' + logging.log(logging.INFO, msg) options = rpc.TensorPipeRpcBackendOptions( num_worker_threads=16, rpc_timeout=0, # infinite timeout init_method='env://', - _transports=["uv"] + _transports=["uv"] # Use LibUV backend for async/IO interaction ) if rank != 0: - print(f'Starting worker {rank} with world size={config.world_size}') + print(f'Starting worker-{rank} with world size={config.world_size}') rpc.init_rpc( f"client{rank}", rank=rank, @@ -203,7 +221,7 @@ def launch_remote(base_path: Path, config_path: Path, rank: int, parser, nic=Non client_node = Client(f'client{rank}', rank, config.world_size, config) client_node.remote_registration() else: - print(f'Starting the ps with world size={config.world_size}') + print(f'Starting the PS (Fed) with world size={config.world_size}') rpc.init_rpc( "federator", rank=rank, @@ -217,7 +235,7 @@ def launch_remote(base_path: Path, config_path: Path, rank: int, parser, nic=Non print('Ending program') -def launch_cluster(arg_path, conf_path, args: Namespace = None, configuration: DistributedConfig = None, **kwargs): +def launch_cluster(arg_path, conf_path, args: Namespace = None, config: DistributedConfig = None, **kwargs): """ Function to launch Orchestrator for execution with provided configurations. Currently this assumes that a single Orchestrator is started that manages all the resources in the cluster. @@ -227,5 +245,5 @@ def launch_cluster(arg_path, conf_path, args: Namespace = None, configuration: D datefmt='%m-%d %H:%M') # Set the seed for arrivals, torch seed is mostly ignored. Set the `arrival_seed` to a different value # for each repetition that you want to run an experiment with. - configuration.set_seed() - launch_orchestrator(args=args, conf=configuration) \ No newline at end of file + config.set_seed() + launch_orchestrator(args=args, conf=config) \ No newline at end of file diff --git a/fltk/nets/util/__init__.py b/fltk/nets/util/__init__.py index 5e41e6c0..ad0be49d 100644 --- a/fltk/nets/util/__init__.py +++ b/fltk/nets/util/__init__.py @@ -1,3 +1,2 @@ -from .reproducability import init_reproducibility -from .model import save_model, flatten_params, recover_flattened, load_model_from_file -from .evaluation import calculate_class_recall, calculate_class_precision \ No newline at end of file +from fltk.nets.util.model import save_model, flatten_params, recover_flattened, load_model_from_file +from fltk.nets.util.evaluation import calculate_class_recall, calculate_class_precision \ No newline at end of file diff --git a/fltk/nets/util/model.py b/fltk/nets/util/model.py index 9f98c8ec..c6770f21 100644 --- a/fltk/nets/util/model.py +++ b/fltk/nets/util/model.py @@ -6,7 +6,7 @@ import torch from torch.utils.tensorboard import SummaryWriter -from fltk.util.config.base_config import BareConfig +import fltk.util.config as config from fltk.util.results import EpochData @@ -46,7 +46,7 @@ def recover_flattened(flat_params, model): return l -def initialize_default_model(config: BareConfig, model_class) -> torch.nn.Module: +def initialize_default_model(config: config.DistributedConfig, model_class) -> torch.nn.Module: """ Load a default model dictionary into a torch model. @param model: diff --git a/fltk/util/__init__.py b/fltk/util/__init__.py index e69de29b..27cf03f6 100644 --- a/fltk/util/__init__.py +++ b/fltk/util/__init__.py @@ -0,0 +1 @@ +from fltk.util.reproducability import init_reproducibility \ No newline at end of file diff --git a/fltk/util/cluster/client.py b/fltk/util/cluster/client.py index 11c289ed..547227ec 100644 --- a/fltk/util/cluster/client.py +++ b/fltk/util/cluster/client.py @@ -31,12 +31,12 @@ class Resource: @dataclass class BuildDescription: - resources: OrderedDict[str, V1ResourceRequirements] - typed_containers: OrderedDict[str, V1Container] - typed_templates: OrderedDict[str, V1PodTemplateSpec] - id: UUID - spec: V1PyTorchJobSpec - tolerations: List[V1Toleration] + resources = OrderedDict[str, V1ResourceRequirements]() + typed_containers = OrderedDict[str, V1Container]() + typed_templates = OrderedDict[str, V1PodTemplateSpec]() + id: Optional[UUID] = None + spec: Optional[V1PyTorchJobSpec] = None + tolerations: Optional[List[V1Toleration]] = None class ResourceWatchDog: diff --git a/fltk/util/config/__init__.py b/fltk/util/config/__init__.py index 08739bf1..e1b87972 100644 --- a/fltk/util/config/__init__.py +++ b/fltk/util/config/__init__.py @@ -1,2 +1,2 @@ -from .distributed_config import * -from .config import Config \ No newline at end of file +from fltk.util.config.distributed_config import DistributedConfig +from fltk.util.config.config import Config diff --git a/fltk/util/config/arguments.py b/fltk/util/config/arguments.py index 63a20737..dbd4f35a 100644 --- a/fltk/util/config/arguments.py +++ b/fltk/util/config/arguments.py @@ -172,10 +172,12 @@ def create_util_run_parser(subparsers) -> None: def create_remote_parser(subparsers) -> None: remote_parser = subparsers.add_parser('remote') + add_default_arguments(remote_parser) + remote_parser.add_argument('rank', type=int) remote_parser.add_argument('--nic', type=str, default=None) remote_parser.add_argument('--host', type=str, default=None) - add_default_arguments(remote_parser) + def create_single_parser(subparsers) -> None: diff --git a/fltk/util/config/distributed_config.py b/fltk/util/config/distributed_config.py index 4f089a8d..1b7d0240 100644 --- a/fltk/util/config/distributed_config.py +++ b/fltk/util/config/distributed_config.py @@ -4,7 +4,7 @@ from dataclasses_json import config, dataclass_json -from fltk.nets.util.reproducability import init_reproducibility +from fltk.util import init_reproducibility @dataclass_json diff --git a/fltk/nets/util/reproducability.py b/fltk/util/reproducability.py similarity index 94% rename from fltk/nets/util/reproducability.py rename to fltk/util/reproducability.py index 217ef168..b26767bf 100644 --- a/fltk/nets/util/reproducability.py +++ b/fltk/util/reproducability.py @@ -4,7 +4,7 @@ import torch -def cuda_reproducible_backend(cuda: bool) -> None: +def _cuda_reproducible_backend(cuda: bool) -> None: """ Function to set the CUDA backend to reproducible (i.e. deterministic) or to default configuration (per PyTorch 1.9.1). @@ -40,6 +40,6 @@ def init_reproducibility(torch_seed: int = 42, cuda: bool = False, numpy_seed: i torch.manual_seed(torch_seed) if cuda: torch.cuda.manual_seed_all(torch_seed) - cuda_reproducible_backend(True) + _cuda_reproducible_backend(True) np.random.seed(numpy_seed) os.environ['PYTHONHASHSEED'] = str(hash_seed) diff --git a/fltk/util/show_client_distributions.py b/fltk/util/show_client_distributions.py index c30cfbbb..96ed3b07 100644 --- a/fltk/util/show_client_distributions.py +++ b/fltk/util/show_client_distributions.py @@ -1,7 +1,7 @@ import pandas as pd from tqdm import tqdm -from fltk.core.distributed.client import Client +from fltk.core.distributed.client import DistClient from fltk.datasets.distributed import DistCIFAR10Dataset, DistCIFAR100Dataset, DistFashionMNISTDataset, DistDataset import logging @@ -108,7 +108,7 @@ def gen_distribution(name, params): # for i, (inputs, labels) in enumerate(dataset.get_train_loader(), 0): # print(labels) # print('d') - client = Client("test", None, rank, args.world_size, args) + client = DistClient("test", None, rank, args.world_size, args) client.init_dataloader() train_loader = client.dataset.get_train_loader() train_loader2 = dataset.get_train_loader() diff --git a/fltk/util/task/config/__init__.py b/fltk/util/task/config/__init__.py index e69de29b..4c0b2818 100644 --- a/fltk/util/task/config/__init__.py +++ b/fltk/util/task/config/__init__.py @@ -0,0 +1 @@ +from .parameter import TrainTask, ExperimentParser, SystemParameters, HyperParameters \ No newline at end of file diff --git a/fltk/util/task/generator/__init__.py b/fltk/util/task/generator/__init__.py index e69de29b..700413d7 100644 --- a/fltk/util/task/generator/__init__.py +++ b/fltk/util/task/generator/__init__.py @@ -0,0 +1 @@ +from .arrival_generator import ArrivalGenerator \ No newline at end of file diff --git a/fltk/util/task/task.py b/fltk/util/task/task.py index 7034138c..ba8e8300 100644 --- a/fltk/util/task/task.py +++ b/fltk/util/task/task.py @@ -3,7 +3,7 @@ from typing import OrderedDict, Dict, List, Optional from uuid import UUID -from fltk.util.task.config.parameter import SystemParameters, HyperParameters +from fltk.util.task.config import SystemParameters, HyperParameters @dataclass