Skip to content

Commit

Permalink
Fix docker high cpu usage
Browse files Browse the repository at this point in the history
  • Loading branch information
bacox committed Mar 15, 2022
1 parent 849021f commit 8506f1d
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 159 deletions.
17 changes: 15 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
# FROM python:3-alpine

# RUN mkdir /data
# VOLUME /data

# EXPOSE 8080

# WORKDIR /data

# CMD ["python", "-m" , "http.server", "8080"]

# Base image to start with
FROM ubuntu:20.04

Expand Down Expand Up @@ -25,6 +36,8 @@ RUN python3 -m pip install -r requirements.txt

ENV GLOO_SOCKET_IFNAME=$NIC
ENV TP_SOCKET_IFNAME=$NIC
#ENV GLOO_SOCKET_IFNAME=eth0
#ENV TP_SOCKET_IFNAME=eth0

#RUN mkdir -p ./data/MNIST
#COPY ./data/MNIST ../data/MNIST
Expand All @@ -46,6 +59,6 @@ COPY fltk ./fltk
COPY configs ./configs
#CMD python3 ./fltk/__main__.py single configs/experiment.yaml --rank=$RANK
# CMD python3 -m fltk single configs/experiment_vanilla.yaml --rank=$RANK
#CMD python3 -m fltk single $EXP_CONFIG --rank=$RANK
CMD python3 -m fltk remote $EXP_CONFIG $RANK --nic=$NIC --host=$MASTER_HOSTNAME
# CMD python3 -m fltk single $EXP_CONFIG --rank=$RANK
CMD python3 -m fltk remote $EXP_CONFIG $RANK --nic=$NIC --host=$MASTER_HOSTNAME $OPTIONAL_PARAMS
#CMD python3 setup.py
5 changes: 0 additions & 5 deletions configs/dev_mnist/fedavg_direct.cfg.yaml

This file was deleted.

6 changes: 3 additions & 3 deletions deploy/dev_generate/description.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ federator:
clients:
fast:
stub-name: stub_default.yml
amount: 20
amount: 2
pin-cores: true
num-cores: 1
cpu-speed: 0.5
cpu-variation: 0.16
cpu-speed: 1
cpu-variation: 0
slow:
stub-name: stub_default.yml
amount: 0
Expand Down
1 change: 1 addition & 0 deletions deploy/dev_generate/stub_default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ client_name: # name can be anything
- EXP_CONFIG=${EXP_CONFIG_FILE}
- MASTER_HOSTNAME=10.5.0.11
- NIC=eth0
- OPTIONAL_PARAMS=${OPTIONAL_PARAMS}
ports:
- "5002:5000" # {machine-port}:{docker-port}
depends_on:
Expand Down
1 change: 1 addition & 0 deletions deploy/dev_generate/system_stub.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ services:
- EXP_CONFIG=${EXP_CONFIG_FILE}
- MASTER_HOSTNAME=10.5.0.11
- NIC=eth0
- OPTIONAL_PARAMS=${OPTIONAL_PARAMS}
ports:
- "5000:5000" # {machine-port}:{docker-port}
networks:
Expand Down
158 changes: 16 additions & 142 deletions fltk/__main__.py
Original file line number Diff line number Diff line change
@@ -1,120 +1,20 @@
# import os
# import random
# import sys
# import time
#
# import torch.distributed.rpc as rpc
# import logging
#
# import yaml
# import argparse
#
# import torch.multiprocessing as mp
# from fltk.federator import Federator
# from fltk.launch import run_single, run_spawn
# from fltk.util.base_config import BareConfig
#
# logging.basicConfig(level=logging.DEBUG)
#
# def add_default_arguments(parser):
# parser.add_argument('--world_size', type=str, default=None,
# help='Number of entities in the world. This is the number of clients + 1')
#
# def main():
# parser = argparse.ArgumentParser(description='Experiment launcher for the Federated Learning Testbed')
#
# subparsers = parser.add_subparsers(dest="mode")
#
# single_parser = subparsers.add_parser('single')
# single_parser.add_argument('config', type=str)
# single_parser.add_argument('--rank', type=int)
# single_parser.add_argument('--nic', type=str, default=None)
# single_parser.add_argument('--host', type=str, default=None)
# add_default_arguments(single_parser)
#
# spawn_parser = subparsers.add_parser('spawn')
# spawn_parser.add_argument('config', type=str)
# add_default_arguments(spawn_parser)
#
# remote_parser = subparsers.add_parser('remote')
# remote_parser.add_argument('--rank', type=int)
# remote_parser.add_argument('--nic', type=str, default=None)
# remote_parser.add_argument('--host', type=str, default=None)
# add_default_arguments(remote_parser)
# args = parser.parse_args()
# if args.mode == 'remote':
# if args.rank is None or args.host is None or args.world_size is None or args.nic is None:
# print('Missing rank, host, world-size, or nic argument when in \'remote\' mode!')
# parser.print_help()
# exit(1)
# world_size = int(args.world_size)
# master_address = args.host
# nic = args.nic
# rank = int(args.rank)
# if rank == 0:
# print('Remote mode only supports ranks > 0!')
# exit(1)
# print(f'rank={args.rank}, world_size={world_size}, host={master_address}, args=None, nic={nic}')
# run_single(rank=args.rank, world_size=world_size, host=master_address, args=None, nic=nic)
# else:
# with open(args.config) as file:
# sleep_time = random.uniform(0, 5.0)
# time.sleep(sleep_time)
# cfg = BareConfig()
# yaml_data = yaml.load(file, Loader=yaml.FullLoader)
# cfg.merge_yaml(yaml_data)
# if args.mode == 'single':
# if args.rank is None:
# print('Missing rank argument when in \'single\' mode!')
# parser.print_help()
# exit(1)
# world_size = args.world_size
# master_address = args.host
# nic = args.nic
#
# if not world_size:
# world_size = yaml_data['system']['clients']['amount'] + 1
# if not master_address:
# master_address = yaml_data['system']['federator']['hostname']
# if not nic:
# nic = yaml_data['system']['federator']['nic']
# print(f'rank={args.rank}, world_size={world_size}, host={master_address}, args=cfg, nic={nic}')
# run_single(rank=args.rank, world_size=world_size, host=master_address, args=cfg, nic=nic)
# else:
# run_spawn(cfg)
#
# if __name__ == "__main__":
# main()
import os
import sys
from pathlib import Path

from torch.distributed import rpc

from fltk.core.client import Client

# print(sys.path)
# from fltk.core.federator import Federator as Fed
# print(list(Path.cwd().iterdir()))
import argparse
from enum import Enum
from pathlib import Path

from fltk.core.federator import Federator
from fltk.util.config import Config
from fltk.util.definitions import Aggregations, Optimizations
from fltk.util.generate_experiments import generate

from fltk.util.generate_experiments import generate, run

def run_single(config_path: Path):

def run_single(config_path: Path, prefix: str = None):
# We can iterate over all the experiments in the directory and execute it, as long as the system remains the same!
# System = machines and its configuration

print(config_path)
config = Config.FromYamlFile(config_path)
config.world_size = config.num_clients + 1
config.replication_id = 1
config.replication_id = prefix
federator_node = Federator('federator', 0, config.world_size, config)
federator_node.run()

Expand All @@ -137,10 +37,12 @@ def retrieve_network_params_from_config(config: Config, nic=None, host=None):
nic = system_attr['federator']['nic']
return nic, host

def run_remote(config_path: Path, rank: int, nic=None, host=None):

def run_remote(config_path: Path, rank: int, nic=None, host=None, prefix: str=None):
print(config_path, rank)
config = Config.FromYamlFile(config_path)
config.world_size = config.num_clients + 1
config.replication_id = prefix
nic, host = retrieve_network_params_from_config(config, nic, host)
if not nic or not host:
print('Missing rank, host, world-size, or nic argument when in \'remote\' mode!')
Expand All @@ -151,7 +53,9 @@ def run_remote(config_path: Path, rank: int, nic=None, host=None):
options = rpc.TensorPipeRpcBackendOptions(
num_worker_threads=16,
rpc_timeout=0, # infinite timeout
init_method=f'tcp://{os.environ["MASTER_ADDR"]}:{os.environ["MASTER_PORT"]}'
# init_method=f'tcp://{os.environ["MASTER_ADDR"]}:{os.environ["MASTER_PORT"]}'
init_method='env://',
_transports=["uv"]
)
if rank != 0:
print(f'Starting worker {rank} with world size={config.world_size}')
Expand All @@ -163,8 +67,6 @@ def run_remote(config_path: Path, rank: int, nic=None, host=None):
)
client_node = Client(f'client{rank}', rank, config.world_size, config)
client_node.remote_registration()

# trainer passively waiting for ps to kick off training iterations
else:
print(f'Starting the ps with world size={config.world_size}')
rpc.init_rpc(
Expand All @@ -175,22 +77,15 @@ def run_remote(config_path: Path, rank: int, nic=None, host=None):

)
federator_node = Federator('federator', 0, config.world_size, config)
# federator_node.create_clients()
federator_node.run()
federator_node.stop_all_clients()
print('Ending program')
# if rank == 0:
# print('FEDERATOR!')
# else:
# print(f'CLIENT {rank}')

def main():
pass


def add_default_arguments(parser):
parser.add_argument('config', type=str,
help='')
parser.add_argument('config', type=str, help='')
parser.add_argument('--prefix', type=str, default=None)


if __name__ == '__main__':
parser = argparse.ArgumentParser(prog='fltk', description='Experiment launcher for the Federated Learning Testbed (fltk)')
Expand All @@ -204,47 +99,26 @@ def add_default_arguments(parser):
util_run_parser = subparsers.add_parser('util-run')
util_run_parser.add_argument('path', type=str)

# launch_parser.add_argument('action', choices=['docker', 'generate', 'run'])
# launch_parser.add_argument('path', help='path or key')

remote_parser = subparsers.add_parser('remote')
single_machine_parser = subparsers.add_parser('single')
# add_default_arguments(launch_parser)
add_default_arguments(remote_parser)
add_default_arguments(single_machine_parser)

remote_parser.add_argument('rank', type=int)
remote_parser.add_argument('--nic', type=str, default=None)
remote_parser.add_argument('--host', type=str, default=None)

# single_parser = subparsers.add_parser('single', help='single help')
# single_parser.add_argument('config')
# util_parser = subparsers.add_parser('util', help='util help')
# util_parser.add_argument('action')
# print(sys.argv)
args = parser.parse_args()
if args.action == 'util-docker':
print('docker')
print('Unimplemented!')
elif args.action == 'util-generate':
path = Path(args.path)
print(f'generate for {path}')
generate(path)
elif args.action == 'util-run':
print('run') # run_single(Path(args.config))
run(Path(args.path))
elif args.action == 'remote':
run_remote(Path(args.config), args.rank, args.nic, args.host)
run_remote(Path(args.config), args.rank, args.nic, args.host, args.prefix)
else:
# Run single machine mode
run_single(Path(args.config))

# if args.mode == 'single':
# print('Single')
# c = Config(optimizer=Optimizations.fedprox)
# print(isinstance(Config.aggregation, Enum))
# config = Config.FromYamlFile(args.config)
#
# auto = config.optimizer
# print(config)
# print('Parsed')

# print(args)
run_single(Path(args.config), args.prefix)
11 changes: 8 additions & 3 deletions fltk/core/federator.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ def __init__(self, id: int, rank: int, world_size: int, config: Config):
self.loss_function = self.config.get_loss_function()()
self.num_rounds = config.rounds
self.config = config
config.output_path = Path(config.output_path) / config.exp_name / f'{config.name}_r{config.replication_id}'
prefix_text = ''
if config.replication_id:
prefix_text = f'_r{config.replication_id}'
config.output_path = Path(config.output_path) / f'{config.experiment_prefix}{prefix_text}'
self.exp_data = DataContainer('federator', config.output_path, FederatorRecord, config.save_data_append)
self.aggregation_method = get_aggregation(config.aggregation)

Expand Down Expand Up @@ -112,7 +115,8 @@ def run(self):
self.client_load_data()
self.get_client_data_sizes()
self.clients_ready()

# self.logger.info('Sleeping before starting communication')
# time.sleep(20)
for communication_round in range(self.config.rounds):
self.exec_round(communication_round)

Expand Down Expand Up @@ -216,9 +220,10 @@ def all_futures_done(futures: List[torch.Future])->bool:

while not all_futures_done(training_futures):
time.sleep(0.1)
self.logger.info('')
# self.logger.info(f'Waiting for other clients')

self.logger.info(f'Continue with rest [1]')
self.logger.info(f'Continue with rest [1]')
time.sleep(3)

# for client in selected_clients:
Expand Down
5 changes: 3 additions & 2 deletions fltk/util/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
@dataclass
class Config:
# optimizer: Optimizations
name: str = ''
batch_size: int = 1
test_batch_size: int = 1000
rounds: int = 2
Expand Down Expand Up @@ -56,7 +55,7 @@ class Config:
world_size: int = 0

replication_id: int = None
exp_name: str = 'experiment'
experiment_prefix: str = ''

real_time : bool = False

Expand All @@ -72,6 +71,8 @@ def __init__(self, **kwargs) -> None:
kwargs[name] = field.type(kwargs[name])
for name, value in kwargs.items():
self.__setattr__(name, value)
if name == 'output_location':
self.output_path = Path(value)

def get_default_model_folder_path(self):
return self.default_model_folder_path
Expand Down
1 change: 1 addition & 0 deletions fltk/util/data_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def __init__(self, name: str, output_location: Path, record_type: DataRecord, ap
self.append_mode = append_mode
file_flag = 'a' if append_mode else 'w'
self.file_handle = open(self.file_path, file_flag)
print(f'[<=========>] Creating data container at {self.file_path}')
self.record_type = record_type
if self.append_mode:
open(self.file_path, 'w').close()
Expand Down
Loading

0 comments on commit 8506f1d

Please sign in to comment.