diff --git a/configs/cloud_experiment.yaml b/configs/cloud_experiment.yaml deleted file mode 100644 index b4810539..00000000 --- a/configs/cloud_experiment.yaml +++ /dev/null @@ -1,23 +0,0 @@ -cluster: - orchestrator: - wait_for_clients: true - # Use the SERVICE provided by the fl-server to connect - service: 'fl-server.test.svc.cluster.local' - # Default NIC is eth0 - nic: 'eth0' - worker: - prefix: 'client' -execution_config: - experiment_prefix: 'cloud_experiment' - tensor_board_active: true - cuda: false - net: - save_model: false - save_temp_model: false - save_epoch_interval: 1 - save_model_path: "models" - epoch_save_start_suffix: "start" - epoch_save_end_suffix: "end" - reproducability: - torch_seed: 42 - arrival_seed: 123 diff --git a/configs/example_cloud_experiment.json b/configs/example_cloud_experiment.json new file mode 100644 index 00000000..767deaea --- /dev/null +++ b/configs/example_cloud_experiment.json @@ -0,0 +1,30 @@ +{ + "cluster": { + "orchestrator": { + "wait_for_clients": true, + "service": "fl-server.test.svc.cluster.local", + "nic": "eth0" + }, + "client": { + "prefix": "client", + "tensorboard_active": true + } + }, + "execution_config": { + "experiment_prefix": "cloud_experiment", + "tensorboard_active": true, + "cuda": false, + "net": { + "save_model": false, + "save_temp_model": false, + "save_epoch_interval": 1, + "save_model_path": "models", + "epoch_save_start_suffix": "start", + "epoch_save_end_suffix": "end" + }, + "reproducibility": { + "torch_seed": 42, + "arrival_seed": 123 + } + } +} \ No newline at end of file diff --git a/fltk/__main__.py b/fltk/__main__.py index b1daba16..e32b682c 100644 --- a/fltk/__main__.py +++ b/fltk/__main__.py @@ -3,14 +3,13 @@ from multiprocessing.pool import ThreadPool from pathlib import Path -import yaml from dotenv import load_dotenv from fltk.launch import run_single -from fltk.util.base_config import BareConfig +from fltk.util.config.base_config import BareConfig from fltk.util.cluster.client import ClusterManager -from fltk.util.generator.arrival_generator import ExperimentGenerator +from fltk.util.task.generator.arrival_generator import ExperimentGenerator logging.basicConfig(level=logging.INFO) @@ -27,61 +26,27 @@ def main(): subparsers = parser.add_subparsers(dest="mode") # Create single experiment parser - single_parser = subparsers.add_parser('single') - single_parser.add_argument('config', type=str) - single_parser.add_argument('--rank', type=int) - single_parser.add_argument('--nic', type=str, default=None) - single_parser.add_argument('--host', type=str, default=None) - add_default_arguments(single_parser) - - # Create spawn parser - spawn_parser = subparsers.add_parser('spawn') - spawn_parser.add_argument('config', type=str) - add_default_arguments(spawn_parser) - - # Create remote parser - remote_parser = subparsers.add_parser('remote') - remote_parser.add_argument('--rank', type=int) - remote_parser.add_argument('--nic', type=str, default=None) - remote_parser.add_argument('--host', type=str, default=None) - add_default_arguments(remote_parser) - - # Create poisoned parser - poison_parser = subparsers.add_parser('poison') - poison_parser.add_argument('config', type=str) - poison_parser.add_argument('--rank', type=int) - poison_parser.add_argument('--nic', type=str, default=None) - poison_parser.add_argument('--host', type=str, default=None) - add_default_arguments(poison_parser) - - poison_parser = subparsers.add_parser('cluster') - poison_parser.add_argument('config', type=str) - poison_parser.add_argument('--rank', type=int) - poison_parser.add_argument('--nic', type=str, default=None) - poison_parser.add_argument('--host', type=str, default=None) - add_default_arguments(poison_parser) - - args = parser.parse_args() - - if args.mode == 'cluster': - logging.info("[Fed] Starting in cluster mode.") - # TODO: Load configuration path - config_path: Path = None - cluster_manager = ClusterManager() - arrival_generator = ExperimentGenerator(config_path) - - pool = ThreadPool(4) - pool.apply(cluster_manager.start) - pool.apply(arrival_generator.run) - - pool.join() - else: - with open(args.config) as config_file: - cfg = BareConfig() - yaml_data = yaml.load(config_file, Loader=yaml.FullLoader) - cfg.merge_yaml(yaml_data) - if args.mode == 'poison': - perform_poison_experiment(args, cfg, parser, yaml_data) + cluster_parser = subparsers.add_parser('cluster') + cluster_parser.add_argument('config', type=str) + cluster_parser.add_argument('--rank', type=int) + cluster_parser.add_argument('--nic', type=str, default=None) + cluster_parser.add_argument('--host', type=str, default=None) + add_default_arguments(cluster_parser) + + arguments = parser.parse_args() + + + with open(arguments.config) as config_file: + try: + config = BareConfig.from_json(config_file) + except Exception as e: + print("Cannot load provided configuration, exiting...") + exit(-1) + + if arguments.mode == 'orchestrator': + start_clusterized(arguments, config) + elif arguments.mode == 'client': + run_single() def perform_single_experiment(args, cfg, parser, yaml_data): @@ -103,29 +68,26 @@ def perform_single_experiment(args, cfg, parser, yaml_data): run_single(rank=args.rank, world_size=world_size, host=master_address, args=cfg, nic=nic) -def perform_poison_experiment(args, cfg, yaml_data): +def start_clusterized(args: dict, config: BareConfig): """ Function to start poisoned experiment. """ - if args.rank is None: - print('Missing rank argument when in \'poison\' mode!') - exit(1) - if not yaml_data.get('poison'): - print(f'Missing poison configuration for \'poison\' mode') - exit(1) + logging.info("[Fed] Starting in cluster mode.") + # TODO: Load configuration path + config_path: Path = None + cluster_manager = ClusterManager() + arrival_generator = ExperimentGenerator(config_path) + + pool = ThreadPool(4) + pool.apply(cluster_manager.start) + pool.apply(arrival_generator.run) + + pool.join() + - world_size = args.world_size - master_address = args.host - nic = args.nic - if not world_size: - world_size = yaml_data['system']['clients']['amount'] + 1 - if not master_address: - master_address = yaml_data['system']['federator']['hostname'] - if not nic: - nic = yaml_data['system']['federator']['nic'] print(f'rank={args.rank}, world_size={world_size}, host={master_address}, args=cfg, nic={nic}') - run_single(rank=args.rank, world_size=world_size, host=master_address, args=cfg, nic=nic) + run_single(rank=args.rank, args=config, nic=nic) if __name__ == "__main__": diff --git a/fltk/client.py b/fltk/client.py index 62470422..f4f6434e 100644 --- a/fltk/client.py +++ b/fltk/client.py @@ -15,7 +15,7 @@ from torch.distributed import rpc from fltk.schedulers import MinCapableStepLR -from fltk.util.base_config import BareConfig +from fltk.util.config.base_config import BareConfig from fltk.util.log import DistLearningLogger from fltk.util.results import EpochData diff --git a/fltk/launch.py b/fltk/launch.py index b77c6a38..8058719e 100644 --- a/fltk/launch.py +++ b/fltk/launch.py @@ -16,18 +16,30 @@ def run_ps(rpc_ids_triple, args): fed = Orchestrator(rpc_ids_triple, config=args) fed.run() +def await_assigned_orchestrator(): + # TODO: Implement await function for client + + """ + 1. Setup everything correctly according to provided configuration files. + 2. Register to cleint + 3. Start working on task description provided by orchestrator + 4. Send heartbeats? (Alternatively use Kubernetes for this) + 5. Send completed data + 6. Terminate/complete pod execution. + """ + pass def run_single(rank, world_size, host = None, args = None, nic = None): logging.info(f'Starting with rank={rank} and world size={world_size}') prepare_environment(host, nic) logging.info(f'Starting with host={os.environ["MASTER_ADDR"]} and port={os.environ["MASTER_PORT"]}') options = rpc.TensorPipeRpcBackendOptions( - num_worker_threads=20, # TODO: Retrieve number of cores from system - rpc_timeout=0, # infinite timeout + num_worker_threads=20, + rpc_timeout=0, init_method=f'tcp://{os.environ["MASTER_ADDR"]}:{os.environ["MASTER_PORT"]}' ) - if rank != 0: + logging.info(f'Starting worker {rank}') rpc.init_rpc( f"client{rank}", @@ -35,7 +47,6 @@ def run_single(rank, world_size, host = None, args = None, nic = None): world_size=world_size, rpc_backend_options=options, ) - # trainer passively waiting for ps to kick off training iterations else: logging.info('Starting the ps') rpc.init_rpc( @@ -43,7 +54,6 @@ def run_single(rank, world_size, host = None, args = None, nic = None): rank=rank, world_size=world_size, rpc_backend_options=options - ) run_ps([(f"client{r}", r, world_size) for r in range(1, world_size)], args) diff --git a/fltk/nets/util/utils.py b/fltk/nets/util/utils.py index 65387143..1648ab89 100644 --- a/fltk/nets/util/utils.py +++ b/fltk/nets/util/utils.py @@ -6,7 +6,7 @@ import torch from torch.utils.tensorboard import SummaryWriter -from fltk.util.base_config import BareConfig +from fltk.util.config.base_config import BareConfig from fltk.util.results import EpochData diff --git a/fltk/orchestrator.py b/fltk/orchestrator.py index 4be15f2d..7c5b4369 100644 --- a/fltk/orchestrator.py +++ b/fltk/orchestrator.py @@ -10,9 +10,9 @@ from fltk.client import Client from fltk.nets.util.utils import flatten_params, save_model -from fltk.util.base_config import BareConfig +from fltk.util.config.base_config import BareConfig from fltk.util.cluster.client import ClientRef -from fltk.util.generator.arrival_generator import ArrivalGenerator +from fltk.util.task.generator.arrival_generator import ArrivalGenerator from fltk.util.log import DistLearningLogger from fltk.util.results import EpochData diff --git a/fltk/util/base_config.py b/fltk/util/config/base_config.py similarity index 50% rename from fltk/util/base_config.py rename to fltk/util/config/base_config.py index c8ba83cb..161ed8db 100644 --- a/fltk/util/base_config.py +++ b/fltk/util/config/base_config.py @@ -1,89 +1,83 @@ -from dataclasses import dataclass +from dataclasses import dataclass, field -import torch -from dataclasses_json import dataclass_json +from dataclasses_json import config, dataclass_json -# TODO: Move reproducability settings towards a different part of the codebase. -# SEED = 1 -# torch.manual_seed(SEED) - - -@dataclass @dataclass_json +@dataclass class GeneralNetConfig: save_model: bool = False save_temp_model: bool = False save_epoch_interval: int = 1 - save_model_path: str = "models" - epoch_save_start_suffix: str = "start" - epoch_save_end_suffix = "end" + save_model_path: str = 'models' + epoch_save_start_suffix: str = 'cloud_experiment' + epoch_save_end_suffix: str = 'cloud_experiment' -@dataclass(frozen=True) @dataclass_json -class ReproducabilityConfig: +@dataclass(frozen=True) +class ReproducibilityConfig: torch_seed: int arrival_seed: int -@dataclass @dataclass_json +@dataclass class ExecutionConfig: - general_net: GeneralNetConfig - reproducability: ReproducabilityConfig + general_net: GeneralNetConfig = field(metadata=config(field_name="net")) + reproducibility: ReproducibilityConfig experiment_prefix: str = "experiment" - tensorboard_active: str = True + tensorboard_active: bool = True cuda: bool = False -@dataclass @dataclass_json +@dataclass class OrchestratorConfig: service: str nic: str -@dataclass @dataclass_json +@dataclass class ClientConfig: prefix: str + tensorboard_active: bool -@dataclass @dataclass_json +@dataclass class ClusterConfig: orchestrator: OrchestratorConfig client: ClientConfig wait_for_clients: bool = True -@dataclass @dataclass_json +@dataclass class BareConfig(object): # Configuration parameters for PyTorch and models that are generated. execution_config: ExecutionConfig - cluster_config: ClusterConfig - - def __init__(self): - # TODO: Move to external class/object - self.train_data_loader_pickle_path = { - 'cifar10': 'data_loaders/cifar10/train_data_loader.pickle', - 'fashion-mnist': 'data_loaders/fashion-mnist/train_data_loader.pickle', - 'cifar100': 'data_loaders/cifar100/train_data_loader.pickle', - } - - self.test_data_loader_pickle_path = { - 'cifar10': 'data_loaders/cifar10/test_data_loader.pickle', - 'fashion-mnist': 'data_loaders/fashion-mnist/test_data_loader.pickle', - 'cifar100': 'data_loaders/cifar100/test_data_loader.pickle', - } - - # TODO: Make part of different configuration - self.loss_function = torch.nn.CrossEntropyLoss - - self.default_model_folder_path = "default_models" - self.data_path = "data" + cluster_config: ClusterConfig = field(metadata=config(field_name="cluster")) + + # # TODO: Move to external class/object + # self.train_data_loader_pickle_path = { + # 'cifar10': 'data_loaders/cifar10/train_data_loader.pickle', + # 'fashion-mnist': 'data_loaders/fashion-mnist/train_data_loader.pickle', + # 'cifar100': 'data_loaders/cifar100/train_data_loader.pickle', + # } + # + # self.test_data_loader_pickle_path = { + # 'cifar10': 'data_loaders/cifar10/test_data_loader.pickle', + # 'fashion-mnist': 'data_loaders/fashion-mnist/test_data_loader.pickle', + # 'cifar100': 'data_loaders/cifar100/test_data_loader.pickle', + # } + # + # # TODO: Make part of different configuration + # self.loss_function = torch.nn.CrossEntropyLoss + # + # self.default_model_folder_path = "default_models" + # self.data_path = "data" def get_dataloader_list(self): """ @@ -93,14 +87,6 @@ def get_dataloader_list(self): """ return list(self.train_data_loader_pickle_path.keys()) - def get_nets_list(self): - """ - @deprecated - @return: - @rtype: - """ - return list(self.available_nets.keys()) - def set_train_data_loader_pickle_path(self, path, name='cifar10'): self.train_data_loader_pickle_path[name] = path @@ -113,8 +99,6 @@ def set_test_data_loader_pickle_path(self, path, name='cifar10'): def get_test_data_loader_pickle_path(self): return self.test_data_loader_pickle_path[self.dataset_name] - def get_save_model_folder_path(self): - return self.save_model_path def should_save_model(self, epoch_idx): """ @@ -123,4 +107,5 @@ def should_save_model(self, epoch_idx): :param epoch_idx: current training epoch index :type epoch_idx: int """ - return self.save_model and (epoch_idx == 1 or epoch_idx % self.save_epoch_interval == 0) + return self.execution_config.general_net.save_model and ( + epoch_idx == 1 or epoch_idx % self.execution_config.general_net.save_epoch_interval == 0) diff --git a/fltk/util/env/learner_environment.py b/fltk/util/env/learner_environment.py index 92c8df08..77ee109b 100644 --- a/fltk/util/env/learner_environment.py +++ b/fltk/util/env/learner_environment.py @@ -1,6 +1,15 @@ import os def prepare_environment(host: str, nic: str): + """ + Function to prepare system environment to use correct settings for the productivity. + @param host: + @type host: + @param nic: + @type nic: + @return: + @rtype: + """ if host: os.environ['MASTER_ADDR'] = host else: diff --git a/fltk/util/poison/__init__.py b/fltk/util/poison/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/fltk/util/poison/poisonpill.py b/fltk/util/poison/poisonpill.py deleted file mode 100644 index 73d8ff7d..00000000 --- a/fltk/util/poison/poisonpill.py +++ /dev/null @@ -1,73 +0,0 @@ -import logging -from abc import abstractmethod, ABC -from typing import Dict, Union, Callable - -import torch - - -class PoisonPill(ABC): - - def __init__(self): - self.logger = logging.getLogger() - - @abstractmethod - def poison_input(self, X: torch.Tensor, *args, **kwargs): - """ - Poison the output according to the corresponding attack. - """ - pass - - @abstractmethod - def poison_output(self, X: torch.Tensor, Y: torch.Tensor, *args, **kwargs): - """ - Poison the output according to the corresponding attack. - """ - pass - - def poison_targets(self) -> Union[Callable, None]: - return None - - @abstractmethod - def __str__(self): - pass - - -class FlipPill(PoisonPill): - - def poison_targets(self) -> Callable: - """ - Apply poison to the targets of a dataset. Note that this is a somewhat strange approach, as the pill ingest the - targets, instead of the Dataset itself. However, this allows for a more efficient implementation. - @param targets: Original targets of the dataset. - @type targets: list - @return: List of mapped targets according to self.flips. - @rtype: list - """ - - # Apply mapping to the input, default value is the target itself! - def flipper(y): - return self.flips.get(y, y) - - return flipper - - def __init__(self, flip_description: Dict[int, int]): - """ - Implements the flip attack scenario, where one or multiple attacks are implemented - """ - super().__init__() - self.flips = flip_description - - def poison_output(self, X: torch.Tensor, Y: torch.Tensor, *args, **kwargs) -> (torch.Tensor, torch.Tensor): - """ - Flip attack does not affect output, rather the pill is taken by the dataset. - """ - return X, Y - - def poison_input(self, X: torch.Tensor, *args, **kwargs) -> torch.Tensor: - """ - Flip attack does not change the input during training. - """ - return X - - def __str__(self): - return f""""Flip attack: {self.flips}""" diff --git a/fltk/util/generator/__init__.py b/fltk/util/task/generator/__init__.py similarity index 100% rename from fltk/util/generator/__init__.py rename to fltk/util/task/generator/__init__.py diff --git a/fltk/util/generator/arrival_generator.py b/fltk/util/task/generator/arrival_generator.py similarity index 100% rename from fltk/util/generator/arrival_generator.py rename to fltk/util/task/generator/arrival_generator.py