diff --git a/Dockerfile b/Dockerfile index 09cdfe0b..b5d5eb0e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,3 +1,14 @@ +# FROM python:3-alpine + +# RUN mkdir /data +# VOLUME /data + +# EXPOSE 8080 + +# WORKDIR /data + +# CMD ["python", "-m" , "http.server", "8080"] + # Base image to start with FROM ubuntu:20.04 @@ -25,6 +36,8 @@ RUN python3 -m pip install -r requirements.txt ENV GLOO_SOCKET_IFNAME=$NIC ENV TP_SOCKET_IFNAME=$NIC +#ENV GLOO_SOCKET_IFNAME=eth0 +#ENV TP_SOCKET_IFNAME=eth0 #RUN mkdir -p ./data/MNIST #COPY ./data/MNIST ../data/MNIST @@ -46,6 +59,6 @@ COPY fltk ./fltk COPY configs ./configs #CMD python3 ./fltk/__main__.py single configs/experiment.yaml --rank=$RANK # CMD python3 -m fltk single configs/experiment_vanilla.yaml --rank=$RANK -#CMD python3 -m fltk single $EXP_CONFIG --rank=$RANK -CMD python3 -m fltk remote $EXP_CONFIG $RANK --nic=$NIC --host=$MASTER_HOSTNAME +# CMD python3 -m fltk single $EXP_CONFIG --rank=$RANK +CMD python3 -m fltk remote $EXP_CONFIG $RANK --nic=$NIC --host=$MASTER_HOSTNAME $OPTIONAL_PARAMS #CMD python3 setup.py \ No newline at end of file diff --git a/configs/dev_mnist/fedavg_direct.cfg.yaml b/configs/dev_mnist/fedavg_direct.cfg.yaml deleted file mode 100644 index 25a64bda..00000000 --- a/configs/dev_mnist/fedavg_direct.cfg.yaml +++ /dev/null @@ -1,5 +0,0 @@ -# Individual configuration -offload_stategy: vanilla -deadline: 500 -single_machine: true -real_time: false \ No newline at end of file diff --git a/deploy/dev_generate/description.yml b/deploy/dev_generate/description.yml index 4a6f45b6..cdfd507d 100644 --- a/deploy/dev_generate/description.yml +++ b/deploy/dev_generate/description.yml @@ -5,11 +5,11 @@ federator: clients: fast: stub-name: stub_default.yml - amount: 20 + amount: 2 pin-cores: true num-cores: 1 - cpu-speed: 0.5 - cpu-variation: 0.16 + cpu-speed: 1 + cpu-variation: 0 slow: stub-name: stub_default.yml amount: 0 diff --git a/deploy/dev_generate/stub_default.yml b/deploy/dev_generate/stub_default.yml index bc437b1c..4023a178 100644 --- a/deploy/dev_generate/stub_default.yml +++ b/deploy/dev_generate/stub_default.yml @@ -16,6 +16,7 @@ client_name: # name can be anything - EXP_CONFIG=${EXP_CONFIG_FILE} - MASTER_HOSTNAME=10.5.0.11 - NIC=eth0 + - OPTIONAL_PARAMS=${OPTIONAL_PARAMS} ports: - "5002:5000" # {machine-port}:{docker-port} depends_on: diff --git a/deploy/dev_generate/system_stub.yml b/deploy/dev_generate/system_stub.yml index 37404525..01270c8c 100644 --- a/deploy/dev_generate/system_stub.yml +++ b/deploy/dev_generate/system_stub.yml @@ -18,6 +18,7 @@ services: - EXP_CONFIG=${EXP_CONFIG_FILE} - MASTER_HOSTNAME=10.5.0.11 - NIC=eth0 + - OPTIONAL_PARAMS=${OPTIONAL_PARAMS} ports: - "5000:5000" # {machine-port}:{docker-port} networks: diff --git a/fltk/__main__.py b/fltk/__main__.py index 8a6e8990..2a82b64f 100644 --- a/fltk/__main__.py +++ b/fltk/__main__.py @@ -1,120 +1,20 @@ -# import os -# import random -# import sys -# import time -# -# import torch.distributed.rpc as rpc -# import logging -# -# import yaml -# import argparse -# -# import torch.multiprocessing as mp -# from fltk.federator import Federator -# from fltk.launch import run_single, run_spawn -# from fltk.util.base_config import BareConfig -# -# logging.basicConfig(level=logging.DEBUG) -# -# def add_default_arguments(parser): -# parser.add_argument('--world_size', type=str, default=None, -# help='Number of entities in the world. This is the number of clients + 1') -# -# def main(): -# parser = argparse.ArgumentParser(description='Experiment launcher for the Federated Learning Testbed') -# -# subparsers = parser.add_subparsers(dest="mode") -# -# single_parser = subparsers.add_parser('single') -# single_parser.add_argument('config', type=str) -# single_parser.add_argument('--rank', type=int) -# single_parser.add_argument('--nic', type=str, default=None) -# single_parser.add_argument('--host', type=str, default=None) -# add_default_arguments(single_parser) -# -# spawn_parser = subparsers.add_parser('spawn') -# spawn_parser.add_argument('config', type=str) -# add_default_arguments(spawn_parser) -# -# remote_parser = subparsers.add_parser('remote') -# remote_parser.add_argument('--rank', type=int) -# remote_parser.add_argument('--nic', type=str, default=None) -# remote_parser.add_argument('--host', type=str, default=None) -# add_default_arguments(remote_parser) -# args = parser.parse_args() -# if args.mode == 'remote': -# if args.rank is None or args.host is None or args.world_size is None or args.nic is None: -# print('Missing rank, host, world-size, or nic argument when in \'remote\' mode!') -# parser.print_help() -# exit(1) -# world_size = int(args.world_size) -# master_address = args.host -# nic = args.nic -# rank = int(args.rank) -# if rank == 0: -# print('Remote mode only supports ranks > 0!') -# exit(1) -# print(f'rank={args.rank}, world_size={world_size}, host={master_address}, args=None, nic={nic}') -# run_single(rank=args.rank, world_size=world_size, host=master_address, args=None, nic=nic) -# else: -# with open(args.config) as file: -# sleep_time = random.uniform(0, 5.0) -# time.sleep(sleep_time) -# cfg = BareConfig() -# yaml_data = yaml.load(file, Loader=yaml.FullLoader) -# cfg.merge_yaml(yaml_data) -# if args.mode == 'single': -# if args.rank is None: -# print('Missing rank argument when in \'single\' mode!') -# parser.print_help() -# exit(1) -# world_size = args.world_size -# master_address = args.host -# nic = args.nic -# -# if not world_size: -# world_size = yaml_data['system']['clients']['amount'] + 1 -# if not master_address: -# master_address = yaml_data['system']['federator']['hostname'] -# if not nic: -# nic = yaml_data['system']['federator']['nic'] -# print(f'rank={args.rank}, world_size={world_size}, host={master_address}, args=cfg, nic={nic}') -# run_single(rank=args.rank, world_size=world_size, host=master_address, args=cfg, nic=nic) -# else: -# run_spawn(cfg) -# -# if __name__ == "__main__": -# main() import os -import sys -from pathlib import Path - from torch.distributed import rpc - from fltk.core.client import Client - -# print(sys.path) -# from fltk.core.federator import Federator as Fed -# print(list(Path.cwd().iterdir())) import argparse -from enum import Enum from pathlib import Path - from fltk.core.federator import Federator from fltk.util.config import Config -from fltk.util.definitions import Aggregations, Optimizations -from fltk.util.generate_experiments import generate - +from fltk.util.generate_experiments import generate, run -def run_single(config_path: Path): +def run_single(config_path: Path, prefix: str = None): # We can iterate over all the experiments in the directory and execute it, as long as the system remains the same! # System = machines and its configuration - print(config_path) config = Config.FromYamlFile(config_path) config.world_size = config.num_clients + 1 - config.replication_id = 1 + config.replication_id = prefix federator_node = Federator('federator', 0, config.world_size, config) federator_node.run() @@ -137,10 +37,12 @@ def retrieve_network_params_from_config(config: Config, nic=None, host=None): nic = system_attr['federator']['nic'] return nic, host -def run_remote(config_path: Path, rank: int, nic=None, host=None): + +def run_remote(config_path: Path, rank: int, nic=None, host=None, prefix: str=None): print(config_path, rank) config = Config.FromYamlFile(config_path) config.world_size = config.num_clients + 1 + config.replication_id = prefix nic, host = retrieve_network_params_from_config(config, nic, host) if not nic or not host: print('Missing rank, host, world-size, or nic argument when in \'remote\' mode!') @@ -151,7 +53,9 @@ def run_remote(config_path: Path, rank: int, nic=None, host=None): options = rpc.TensorPipeRpcBackendOptions( num_worker_threads=16, rpc_timeout=0, # infinite timeout - init_method=f'tcp://{os.environ["MASTER_ADDR"]}:{os.environ["MASTER_PORT"]}' + # init_method=f'tcp://{os.environ["MASTER_ADDR"]}:{os.environ["MASTER_PORT"]}' + init_method='env://', + _transports=["uv"] ) if rank != 0: print(f'Starting worker {rank} with world size={config.world_size}') @@ -163,8 +67,6 @@ def run_remote(config_path: Path, rank: int, nic=None, host=None): ) client_node = Client(f'client{rank}', rank, config.world_size, config) client_node.remote_registration() - - # trainer passively waiting for ps to kick off training iterations else: print(f'Starting the ps with world size={config.world_size}') rpc.init_rpc( @@ -175,22 +77,15 @@ def run_remote(config_path: Path, rank: int, nic=None, host=None): ) federator_node = Federator('federator', 0, config.world_size, config) - # federator_node.create_clients() federator_node.run() federator_node.stop_all_clients() print('Ending program') - # if rank == 0: - # print('FEDERATOR!') - # else: - # print(f'CLIENT {rank}') - -def main(): - pass def add_default_arguments(parser): - parser.add_argument('config', type=str, - help='') + parser.add_argument('config', type=str, help='') + parser.add_argument('--prefix', type=str, default=None) + if __name__ == '__main__': parser = argparse.ArgumentParser(prog='fltk', description='Experiment launcher for the Federated Learning Testbed (fltk)') @@ -204,12 +99,8 @@ def add_default_arguments(parser): util_run_parser = subparsers.add_parser('util-run') util_run_parser.add_argument('path', type=str) - # launch_parser.add_argument('action', choices=['docker', 'generate', 'run']) - # launch_parser.add_argument('path', help='path or key') - remote_parser = subparsers.add_parser('remote') single_machine_parser = subparsers.add_parser('single') - # add_default_arguments(launch_parser) add_default_arguments(remote_parser) add_default_arguments(single_machine_parser) @@ -217,34 +108,17 @@ def add_default_arguments(parser): remote_parser.add_argument('--nic', type=str, default=None) remote_parser.add_argument('--host', type=str, default=None) - # single_parser = subparsers.add_parser('single', help='single help') - # single_parser.add_argument('config') - # util_parser = subparsers.add_parser('util', help='util help') - # util_parser.add_argument('action') - # print(sys.argv) args = parser.parse_args() if args.action == 'util-docker': - print('docker') + print('Unimplemented!') elif args.action == 'util-generate': path = Path(args.path) print(f'generate for {path}') generate(path) elif args.action == 'util-run': - print('run') # run_single(Path(args.config)) + run(Path(args.path)) elif args.action == 'remote': - run_remote(Path(args.config), args.rank, args.nic, args.host) + run_remote(Path(args.config), args.rank, args.nic, args.host, args.prefix) else: # Run single machine mode - run_single(Path(args.config)) - - # if args.mode == 'single': - # print('Single') - # c = Config(optimizer=Optimizations.fedprox) - # print(isinstance(Config.aggregation, Enum)) - # config = Config.FromYamlFile(args.config) - # - # auto = config.optimizer - # print(config) - # print('Parsed') - - # print(args) \ No newline at end of file + run_single(Path(args.config), args.prefix) diff --git a/fltk/core/federator.py b/fltk/core/federator.py index 8151d8a7..4975e0bc 100644 --- a/fltk/core/federator.py +++ b/fltk/core/federator.py @@ -39,7 +39,10 @@ def __init__(self, id: int, rank: int, world_size: int, config: Config): self.loss_function = self.config.get_loss_function()() self.num_rounds = config.rounds self.config = config - config.output_path = Path(config.output_path) / config.exp_name / f'{config.name}_r{config.replication_id}' + prefix_text = '' + if config.replication_id: + prefix_text = f'_r{config.replication_id}' + config.output_path = Path(config.output_path) / f'{config.experiment_prefix}{prefix_text}' self.exp_data = DataContainer('federator', config.output_path, FederatorRecord, config.save_data_append) self.aggregation_method = get_aggregation(config.aggregation) @@ -112,7 +115,8 @@ def run(self): self.client_load_data() self.get_client_data_sizes() self.clients_ready() - + # self.logger.info('Sleeping before starting communication') + # time.sleep(20) for communication_round in range(self.config.rounds): self.exec_round(communication_round) @@ -216,9 +220,10 @@ def all_futures_done(futures: List[torch.Future])->bool: while not all_futures_done(training_futures): time.sleep(0.1) + self.logger.info('') # self.logger.info(f'Waiting for other clients') - self.logger.info(f'Continue with rest [1]') + self.logger.info(f'Continue with rest [1]') time.sleep(3) # for client in selected_clients: diff --git a/fltk/util/config.py b/fltk/util/config.py index a6811735..9b262606 100644 --- a/fltk/util/config.py +++ b/fltk/util/config.py @@ -13,7 +13,6 @@ @dataclass class Config: # optimizer: Optimizations - name: str = '' batch_size: int = 1 test_batch_size: int = 1000 rounds: int = 2 @@ -56,7 +55,7 @@ class Config: world_size: int = 0 replication_id: int = None - exp_name: str = 'experiment' + experiment_prefix: str = '' real_time : bool = False @@ -72,6 +71,8 @@ def __init__(self, **kwargs) -> None: kwargs[name] = field.type(kwargs[name]) for name, value in kwargs.items(): self.__setattr__(name, value) + if name == 'output_location': + self.output_path = Path(value) def get_default_model_folder_path(self): return self.default_model_folder_path diff --git a/fltk/util/data_container.py b/fltk/util/data_container.py index 6a6f1350..46963eb9 100644 --- a/fltk/util/data_container.py +++ b/fltk/util/data_container.py @@ -60,6 +60,7 @@ def __init__(self, name: str, output_location: Path, record_type: DataRecord, ap self.append_mode = append_mode file_flag = 'a' if append_mode else 'w' self.file_handle = open(self.file_path, file_flag) + print(f'[<=========>] Creating data container at {self.file_path}') self.record_type = record_type if self.append_mode: open(self.file_path, 'w').close() diff --git a/fltk/util/generate_experiments.py b/fltk/util/generate_experiments.py index d2f1eb4e..c424c889 100644 --- a/fltk/util/generate_experiments.py +++ b/fltk/util/generate_experiments.py @@ -1,4 +1,16 @@ from pathlib import Path +import os +import yaml +from fltk.util.generate_docker_compose_2 import generate_compose_file + + +def rm_tree(pth: Path): + for child in pth.iterdir(): + if child.is_file(): + child.unlink() + # else: + # rm_tree(child) + # pth.rmdir() def generate(base_path: Path): @@ -9,6 +21,7 @@ def generate(base_path: Path): with open(descr_path) as descr_f: descr_data = descr_f.read() exps_path = base_path / 'exps' + rm_tree(exps_path) exps_path.mkdir(parents=True, exist_ok=True) for exp_cfg in exp_cfg_list: exp_cfg_data = '' @@ -16,12 +29,108 @@ def generate(base_path: Path): exp_cfg_data = exp_f.read() exp_data = descr_data + exp_cfg_data - exp_data += f'\nexperiment_prefix: \'{Path(__file__).parent.name}_{exp_cfg.name.split(".")[0]}\'\n' + exp_data += f'\nexperiment_prefix: \'{base_path.name}_{exp_cfg.name.split(".")[0]}\'\n' filename = '.'.join([exp_cfg.name.split('.')[0], exp_cfg.name.split('.')[2]]) with open(exps_path / filename, mode='w') as f: f.write(exp_data) print('Done') + +# def run(): +# base_path = Path(__file__).parent +# descr_path = base_path / 'descr.yaml' +# +# exp_cfg_list = [x for x in base_path.iterdir() if '.cfg' in x.suffixes] +# descr_data = '' +# with open(descr_path) as descr_f: +# descr_data = descr_f.read() +# +# exps_path = base_path / 'exps' +# exps_path.mkdir(parents=True, exist_ok=True) +# for exp_cfg in exp_cfg_list: +# exp_cfg_data = '' +# replications = 1 +# with open(exp_cfg) as exp_f: +# exp_cfg_data = exp_f.read() +# for replication_id in range(replications): +# exp_data = descr_data + exp_cfg_data +# exp_data += f'\nexperiment_prefix: \'{Path(__file__).parent.name}_{exp_cfg.name.split(".")[0]}\'\n' +# filename = '.'.join([exp_cfg.name.split('.')[0], exp_cfg.name.split('.')[2]]) +# with open(exps_path / filename, mode='w') as f: +# f.write(exp_data) +# print('Done') + + +def run(base_path: Path): + print(f'Run {base_path}') + print(list(base_path.iterdir())) + descr_path = base_path / 'descr.yaml' + exp_cfg_list = [x for x in base_path.iterdir() if '.cfg' in x.suffixes] + descr_data = '' + with open(descr_path) as descr_f: + descr_data = yaml.safe_load(descr_f.read()) + + replications = 1 + if 'replications' in descr_data: + replications = descr_data['replications'] + run_docker = False + if 'docker_system' in descr_data: + # Run in docker + # Generate Docker + print(descr_data) + docker_deploy_path = Path(descr_data['docker_system']) + + print(docker_deploy_path) + run_docker = True + generate_compose_file(docker_deploy_path) + + exp_files = [x for x in (base_path / 'exps').iterdir() if x.suffix in ['.yaml', '.yml']] + + cmd_list = [] + print(exp_files) + if run_docker: + first_prefix = '--build' + for exp_cfg_file in exp_files: + for replication_id in range(replications): + cmd = f'export OPTIONAL_PARAMS="--prefix={replication_id}";export EXP_CONFIG_FILE="{exp_cfg_file}"; docker-compose --compatibility up {first_prefix};' + cmd_list.append(cmd) + # print(f'Running cmd: "{cmd}"') + # os.system(cmd) + first_prefix = '' + pass + else: + print('Switching to direct mode') + for exp_cfg_file in exp_files: + for replication_id in range(replications): + # cmd = f'export OPTIONAL_PARAMS="--prefix={replication_id}";export EXP_CONFIG_FILE="{exp_cfg_file}"; docker-compose --compatibility up {first_prefix};' + cmd = f'python3 -m fltk single {exp_cfg_file} --prefix={replication_id}' + cmd_list.append(cmd) + pass + + [print(x) for x in cmd_list] + for cmd in cmd_list: + print(f'Running cmd: "{cmd}"') + os.system(cmd) + print('Done') + # docker_system + + + # name = 'dev' + # generate_docker(name) + # base_path = f'{Path(__file__).parent}' + # exp_list = [ + # 'fedavg.yaml', + # ] + # exp_list = [f'{base_path}/exps/{x}' for x in exp_list] + # first_prefix = '--build' + # for exp_cfg_file in exp_list: + # cmd = f'export EXP_CONFIG_FILE="{exp_cfg_file}"; docker-compose --compatibility up {first_prefix};' + # print(f'Running cmd: "{cmd}"') + # os.system(cmd) + # first_prefix = '' + + # print('Done') + # if __name__ == '__main__': # base_path = Path(__file__).parent # descr_path = base_path / 'descr.yaml' diff --git a/requirements.txt b/requirements.txt index 84ecb26b..a68a74d1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,4 +15,4 @@ seaborn matplotlib google-api-python-client google-auth-httplib2 -google-auth-oauthlib \ No newline at end of file +google-auth-oauthlib >= 0.4.1 \ No newline at end of file