Skip to content

Commit

Permalink
Make message calls opaque
Browse files Browse the repository at this point in the history
  • Loading branch information
bacox committed Mar 10, 2022
1 parent 13602ba commit dbcf49b
Show file tree
Hide file tree
Showing 12 changed files with 396 additions and 99 deletions.
9 changes: 5 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ MAINTAINER Bart Cox <[email protected]>
# Run build without interactive dialogue
ARG DEBIAN_FRONTEND=noninteractive

ENV GLOO_SOCKET_IFNAME=eth0
ENV TP_SOCKET_IFNAME=eth0

# Define the working directory of the current Docker container
WORKDIR /opt/federation-lab

Expand All @@ -26,6 +23,9 @@ COPY requirements.txt ./
# Install all required packages for the generator
RUN python3 -m pip install -r requirements.txt

ENV GLOO_SOCKET_IFNAME=$NIC
ENV TP_SOCKET_IFNAME=$NIC

#RUN mkdir -p ./data/MNIST
#COPY ./data/MNIST ../data/MNIST
#ADD fltk ./fedsim
Expand All @@ -46,5 +46,6 @@ COPY fltk ./fltk
COPY configs ./configs
#CMD python3 ./fltk/__main__.py single configs/experiment.yaml --rank=$RANK
# CMD python3 -m fltk single configs/experiment_vanilla.yaml --rank=$RANK
CMD python3 -m fltk single $EXP_CONFIG --rank=$RANK
#CMD python3 -m fltk single $EXP_CONFIG --rank=$RANK
CMD python3 -m fltk remote $EXP_CONFIG $RANK --nic=$NIC --host=$MASTER_HOSTNAME
#CMD python3 setup.py
282 changes: 214 additions & 68 deletions fltk/__main__.py
Original file line number Diff line number Diff line change
@@ -1,87 +1,233 @@
# import os
# import random
# import sys
# import time
#
# import torch.distributed.rpc as rpc
# import logging
#
# import yaml
# import argparse
#
# import torch.multiprocessing as mp
# from fltk.federator import Federator
# from fltk.launch import run_single, run_spawn
# from fltk.util.base_config import BareConfig
#
# logging.basicConfig(level=logging.DEBUG)
#
# def add_default_arguments(parser):
# parser.add_argument('--world_size', type=str, default=None,
# help='Number of entities in the world. This is the number of clients + 1')
#
# def main():
# parser = argparse.ArgumentParser(description='Experiment launcher for the Federated Learning Testbed')
#
# subparsers = parser.add_subparsers(dest="mode")
#
# single_parser = subparsers.add_parser('single')
# single_parser.add_argument('config', type=str)
# single_parser.add_argument('--rank', type=int)
# single_parser.add_argument('--nic', type=str, default=None)
# single_parser.add_argument('--host', type=str, default=None)
# add_default_arguments(single_parser)
#
# spawn_parser = subparsers.add_parser('spawn')
# spawn_parser.add_argument('config', type=str)
# add_default_arguments(spawn_parser)
#
# remote_parser = subparsers.add_parser('remote')
# remote_parser.add_argument('--rank', type=int)
# remote_parser.add_argument('--nic', type=str, default=None)
# remote_parser.add_argument('--host', type=str, default=None)
# add_default_arguments(remote_parser)
# args = parser.parse_args()
# if args.mode == 'remote':
# if args.rank is None or args.host is None or args.world_size is None or args.nic is None:
# print('Missing rank, host, world-size, or nic argument when in \'remote\' mode!')
# parser.print_help()
# exit(1)
# world_size = int(args.world_size)
# master_address = args.host
# nic = args.nic
# rank = int(args.rank)
# if rank == 0:
# print('Remote mode only supports ranks > 0!')
# exit(1)
# print(f'rank={args.rank}, world_size={world_size}, host={master_address}, args=None, nic={nic}')
# run_single(rank=args.rank, world_size=world_size, host=master_address, args=None, nic=nic)
# else:
# with open(args.config) as file:
# sleep_time = random.uniform(0, 5.0)
# time.sleep(sleep_time)
# cfg = BareConfig()
# yaml_data = yaml.load(file, Loader=yaml.FullLoader)
# cfg.merge_yaml(yaml_data)
# if args.mode == 'single':
# if args.rank is None:
# print('Missing rank argument when in \'single\' mode!')
# parser.print_help()
# exit(1)
# world_size = args.world_size
# master_address = args.host
# nic = args.nic
#
# if not world_size:
# world_size = yaml_data['system']['clients']['amount'] + 1
# if not master_address:
# master_address = yaml_data['system']['federator']['hostname']
# if not nic:
# nic = yaml_data['system']['federator']['nic']
# print(f'rank={args.rank}, world_size={world_size}, host={master_address}, args=cfg, nic={nic}')
# run_single(rank=args.rank, world_size=world_size, host=master_address, args=cfg, nic=nic)
# else:
# run_spawn(cfg)
#
# if __name__ == "__main__":
# main()
import os
import random
import sys
import time
from pathlib import Path

import torch.distributed.rpc as rpc
import logging
from torch.distributed import rpc

import yaml
from fltk.core.client import Client

print(sys.path)
# from fltk.core.federator import Federator as Fed
print(list(Path.cwd().iterdir()))
import argparse
from enum import Enum
from pathlib import Path

import torch.multiprocessing as mp
from fltk.federator import Federator
from fltk.launch import run_single, run_spawn
from fltk.util.base_config import BareConfig
from fltk.core.federator import Federator
from fltk.util.config import Config
from fltk.util.definitions import Aggregations, Optimizations

logging.basicConfig(level=logging.DEBUG)
def run_single(config_path: Path):

def add_default_arguments(parser):
parser.add_argument('--world_size', type=str, default=None,
help='Number of entities in the world. This is the number of clients + 1')
# We can iterate over all the experiments in the directory and execute it, as long as the system remains the same!
# System = machines and its configuration

print(config_path)
config = Config.FromYamlFile(config_path)
config.world_size = config.num_clients + 1
config.replication_id = 1
federator_node = Federator('federator', 0, config.world_size, config)
federator_node.run()


def retrieve_env_params(nic=None, host=None):
if host:
os.environ['MASTER_ADDR'] = host
os.environ['MASTER_PORT'] = '5000'
if nic:
os.environ['GLOO_SOCKET_IFNAME'] = nic
os.environ['TP_SOCKET_IFNAME'] = nic

def retrieve_network_params_from_config(config: Config, nic=None, host=None):
if hasattr(config, 'system'):
system_attr = getattr(config, 'system')
if 'federator' in system_attr:
if 'hostname' in system_attr['federator'] and not host:
host = system_attr['federator']['hostname']
if 'nic' in system_attr['federator'] and not nic:
nic = system_attr['federator']['nic']
return nic, host

def run_remote(config_path: Path, rank: int, nic=None, host=None):
print(config_path, rank)
config = Config.FromYamlFile(config_path)
config.world_size = config.num_clients + 1
nic, host = retrieve_network_params_from_config(config, nic, host)
if not nic or not host:
print('Missing rank, host, world-size, or nic argument when in \'remote\' mode!')
parser.print_help()
exit(1)
retrieve_env_params(nic, host)
print(f'Starting with host={os.environ["MASTER_ADDR"]} and port={os.environ["MASTER_PORT"]} and interface={nic}')
options = rpc.TensorPipeRpcBackendOptions(
num_worker_threads=16,
rpc_timeout=0, # infinite timeout
init_method=f'tcp://{os.environ["MASTER_ADDR"]}:{os.environ["MASTER_PORT"]}'
)
if rank != 0:
print(f'Starting worker {rank}')
rpc.init_rpc(
f"client{rank}",
rank=rank,
world_size=config.world_size,
rpc_backend_options=options,
)
client_node = Client(f'client{rank}', rank, config.world_size, config)
client_node.remote_registration()

# trainer passively waiting for ps to kick off training iterations
else:
print(f'Starting the ps with world size={config.world_size}')
rpc.init_rpc(
"federator",
rank=rank,
world_size=config.world_size,
rpc_backend_options=options

)
federator_node = Federator('federator', 0, config.world_size, config)
# federator_node.create_clients()
federator_node.run()
federator_node.stop_all_clients()
print('Ending program')
# if rank == 0:
# print('FEDERATOR!')
# else:
# print(f'CLIENT {rank}')

def main():
parser = argparse.ArgumentParser(description='Experiment launcher for the Federated Learning Testbed')
pass

subparsers = parser.add_subparsers(dest="mode")

single_parser = subparsers.add_parser('single')
single_parser.add_argument('config', type=str)
single_parser.add_argument('--rank', type=int)
single_parser.add_argument('--nic', type=str, default=None)
single_parser.add_argument('--host', type=str, default=None)
add_default_arguments(single_parser)
def add_default_arguments(parser):
parser.add_argument('config', type=str,
help='')

spawn_parser = subparsers.add_parser('spawn')
spawn_parser.add_argument('config', type=str)
add_default_arguments(spawn_parser)
if __name__ == '__main__':
parser = argparse.ArgumentParser(prog='fltk', description='Experiment launcher for the Federated Learning Testbed (fltk)')
subparsers = parser.add_subparsers(dest="action", required=True)

launch_parser = subparsers.add_parser('launch-util')
remote_parser = subparsers.add_parser('remote')
remote_parser.add_argument('--rank', type=int)
single_machine_parser = subparsers.add_parser('single')
add_default_arguments(launch_parser)
add_default_arguments(remote_parser)
add_default_arguments(single_machine_parser)

remote_parser.add_argument('rank', type=int)
remote_parser.add_argument('--nic', type=str, default=None)
remote_parser.add_argument('--host', type=str, default=None)
add_default_arguments(remote_parser)

# single_parser = subparsers.add_parser('single', help='single help')
# single_parser.add_argument('config')
# util_parser = subparsers.add_parser('util', help='util help')
# util_parser.add_argument('action')
# print(sys.argv)
args = parser.parse_args()
if args.mode == 'remote':
if args.rank is None or args.host is None or args.world_size is None or args.nic is None:
print('Missing rank, host, world-size, or nic argument when in \'remote\' mode!')
parser.print_help()
exit(1)
world_size = int(args.world_size)
master_address = args.host
nic = args.nic
rank = int(args.rank)
if rank == 0:
print('Remote mode only supports ranks > 0!')
exit(1)
print(f'rank={args.rank}, world_size={world_size}, host={master_address}, args=None, nic={nic}')
run_single(rank=args.rank, world_size=world_size, host=master_address, args=None, nic=nic)
if args.action == 'launch-util':
pass
# run_single(Path(args.config))
if args.action == 'remote':
run_remote(Path(args.config), args.rank, args.nic, args.host)
else:
with open(args.config) as file:
sleep_time = random.uniform(0, 5.0)
time.sleep(sleep_time)
cfg = BareConfig()
yaml_data = yaml.load(file, Loader=yaml.FullLoader)
cfg.merge_yaml(yaml_data)
if args.mode == 'single':
if args.rank is None:
print('Missing rank argument when in \'single\' mode!')
parser.print_help()
exit(1)
world_size = args.world_size
master_address = args.host
nic = args.nic

if not world_size:
world_size = yaml_data['system']['clients']['amount'] + 1
if not master_address:
master_address = yaml_data['system']['federator']['hostname']
if not nic:
nic = yaml_data['system']['federator']['nic']
print(f'rank={args.rank}, world_size={world_size}, host={master_address}, args=cfg, nic={nic}')
run_single(rank=args.rank, world_size=world_size, host=master_address, args=cfg, nic=nic)
else:
run_spawn(cfg)

if __name__ == "__main__":
main()
# Run single machine mode
run_single(Path(args.config))

# if args.mode == 'single':
# print('Single')
# c = Config(optimizer=Optimizations.fedprox)
# print(isinstance(Config.aggregation, Enum))
# config = Config.FromYamlFile(args.config)
#
# auto = config.optimizer
# print(config)
# print('Parsed')

# print(args)
24 changes: 21 additions & 3 deletions fltk/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


class Client(Node):

running = False
def __init__(self, id: int, rank: int, world_size: int, config: Config):
super().__init__(id, rank, world_size, config)

Expand All @@ -22,6 +22,23 @@ def __init__(self, id: int, rank: int, world_size: int, config: Config):
self.config.scheduler_gamma,
self.config.min_lr)

def remote_registration(self):
self.logger.info('Sending registration')
self.message('federator', 'ping', 'new_sender', be_weird=True)
self.message('federator', 'register_client', self.id, self.rank)
self.running = True
self._event_loop()

def stop_client(self):
self.logger.info('Got call to stop event loop')
self.running = False

def _event_loop(self):
self.logger.info('Starting event loop')
while self.running:
time.sleep(0.1)
self.logger.info('Exiting node')

def train(self, num_epochs: int):
start_time = time.time()

Expand All @@ -47,10 +64,11 @@ def train(self, num_epochs: int):
running_loss += loss.item()
# Mark logging update step
if i % self.config.log_interval == 0:
# self.logger.info(
# '[%d, %5d] loss: %.3f' % (num_epochs, i, running_loss / self.config.log_interval))
self.logger.info(
'[%s] [%d, %5d] loss: %.3f' % (self.id, num_epochs, i, running_loss / self.config.log_interval))
final_running_loss = running_loss / self.config.log_interval
running_loss = 0.0
# break

end_time = time.time()
duration = end_time - start_time
Expand Down
Loading

0 comments on commit dbcf49b

Please sign in to comment.