Skip to content

Commit

Permalink
Resolve import issues after merge
Browse files Browse the repository at this point in the history
  • Loading branch information
JMGaljaard committed Apr 4, 2022
1 parent c735aad commit 9691b5e
Show file tree
Hide file tree
Showing 21 changed files with 104 additions and 59 deletions.
3 changes: 1 addition & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
FROM ubuntu:20.04


MAINTAINER Jeroen Galjaard <[email protected]>

# Run build without interactive dialogue
Expand All @@ -14,7 +13,7 @@ WORKDIR /opt/federation-lab

# Update the Ubuntu software repository and fetch packages
RUN apt-get update \
&& apt-get install -y curl python3 python3-pip net-tools iproute2
&& apt-get install -y curl python3 python3-pip

# Add Pre-downloaded models (otherwise needs be run every-time)
ADD data/ data/
Expand Down
2 changes: 1 addition & 1 deletion experiments/example_cuda/descr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ profiling_time: 100
warmup_round: false
output_location: 'output/example_cuda'
tensor_board_active: true
clients_per_round: 2
clients_per_round: 1
node_groups:
slow: [1, 1]
medium: [2, 2]
Expand Down
42 changes: 32 additions & 10 deletions fltk/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
from pathlib import Path
from typing import Optional, Any

from fltk.launch import launch_extractor, launch_client, launch_single, \
launch_remote, launch_cluster
Expand All @@ -22,6 +23,12 @@
}


def _save_get(args, param) -> Optional[Any]:
if args is not None and hasattr(args, param):
return args.__dict__[param]
return None


def __main__():
parser = argparse.ArgumentParser(prog='fltk',
description='Experiment launcher for the Federated Learning Testbed (fltk)')
Expand All @@ -33,17 +40,32 @@ def __main__():
"""

args = parser.parse_args()
config = None
try:
with open(args.config, 'r') as config_file:
config: DistributedConfig = DistributedConfig.from_dict(json.load(config_file))
config.config_path = Path(args.config)
except:
pass
arg_path, conf_path = None, None
try:
arg_path = Path(args.path)
except Exception as e:
print('No argument path is provided.')
try:
conf_path = Path(args.config)
except Exception as e:
print('No configuration path is provided.')

with open(args.config, 'r') as config_file:
config: DistributedConfig = DistributedConfig.from_dict(json.load(config_file))
config.config_path = Path(args.config)

arg_path = Path(args.path)
conf_path = Path(args.config)

# Lookup execution mode and call function to start subroutine
__run_op_dict[args.action](arg_path, conf_path, rank=args.rank, parser=parser, nic=args.nic, host=args.host,
prefix=args.prefix, args=args)
# TODO: move kwargs into function as extractor
__run_op_dict[args.action](arg_path, conf_path,
rank=_save_get(args, 'rank'),
parser=parser,
nic=_save_get(args, 'nic'),
host=_save_get(args, 'host'),
prefix=_save_get(args, 'prefix'),
args=args,
config=config)

exit(0)

Expand Down
2 changes: 1 addition & 1 deletion fltk/core/distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from fltk.util.results import EpochData


class Client(DistNode):
class DistClient(DistNode):
"""
TODO: Combine with Client and differentiate between Federated and Distributed Learnign through better inheritance.
"""
Expand Down
7 changes: 5 additions & 2 deletions fltk/datasets/distributed/dataset.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from abc import abstractmethod
from typing import Any

from torch.utils.data import DataLoader
from torch.utils.data import TensorDataset
import torch
import numpy

from fltk.util.arguments import Arguments
# from fltk.util.arguments import Arguments
from fltk.util.log import getLogger


Expand All @@ -17,7 +19,8 @@ class DistDataset:
train_loader = None
test_loader = None
logger = getLogger(__name__)
def __init__(self, args: Arguments):

def __init__(self, args: Any):
self.args = args
# self.train_dataset = self.load_train_dataset()
# self.test_dataset = self.load_test_dataset()
Expand Down
2 changes: 1 addition & 1 deletion fltk/datasets/distributed/fashion_mnist.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from fltk.datasets.distributed import DistDataset
from fltk.datasets.distributed.dataset import DistDataset
from torchvision import datasets
from torchvision import transforms
from torch.utils.data import DataLoader, DistributedSampler
Expand Down
2 changes: 1 addition & 1 deletion fltk/datasets/distributed/mnist.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import annotations
from fltk.datasets import DistDataset
from fltk.datasets.distributed.dataset import DistDataset
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
# from fltk.strategy import get_sampler, get_augmentations, get_augmentations_tensor, UnifyingSampler
Expand Down
11 changes: 5 additions & 6 deletions fltk/datasets/loader_util.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from fltk.datasets.distributed.mnist import DistMNISTDataset
from fltk.datasets.distributed.cifar10 import DistCIFAR10Dataset
from fltk.datasets.distributed.cifar100 import DistCIFAR100Dataset
from fltk.datasets.distributed.fashion_mnist import DistFashionMNISTDataset
from fltk.datasets.distributed import DistMNISTDataset, DistFashionMNISTDataset, DistCIFAR100Dataset, DistCIFAR10Dataset
from fltk.util.definitions import Dataset


def available_datasets():
return {
Dataset.cifar10: DistCIFAR10Dataset,
Expand All @@ -12,6 +10,7 @@ def available_datasets():
Dataset.mnist: DistMNISTDataset
}


def get_dataset(name: Dataset):
return available_datasets()[name]

Expand All @@ -26,11 +25,11 @@ def get_train_loader_path(name: Dataset) -> str:
return paths[name]


def get_test_loader_path(name: Dataset)-> str:
def get_test_loader_path(name: Dataset) -> str:
paths = {
Dataset.cifar10: 'data_loaders/cifar10/test_data_loader.pickle',
Dataset.fashion_mnist: 'data_loaders/fashion-mnist/test_data_loader.pickle',
Dataset.cifar100: 'data_loaders/cifar100/test_data_loader.pickle',
Dataset.mnist: 'data_loaders/mnist/test_data_loader.pickle',
}
return paths[name]
return paths[name]
48 changes: 33 additions & 15 deletions fltk/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from torch.distributed import rpc

from fltk.core.client import Client
from fltk.core.distributed.client import Client
from fltk.core.distributed.client import DistClient
from fltk.core.distributed.extractor import download_datasets
from fltk.core.distributed.orchestrator import Orchestrator
from fltk.core.federator import Federator
Expand Down Expand Up @@ -70,7 +70,7 @@ def launch_distributed_client(task_id: str, config: DistributedConfig = None, le

logging.info(f'Starting Creating client with {rank}')

client = Client(rank, task_id, world_size, config, learning_params)
client = DistClient(rank, task_id, world_size, config, learning_params)
client.prepare_learner(distributed)
epoch_data = client.run_epochs()
print(epoch_data)
Expand Down Expand Up @@ -153,14 +153,26 @@ def launch_single(base_path: Path, config_path: Path, prefix: str = None, **kwar
federator_node.run()


def _retrieve_env_params(nic=None, host=None):
def _retrieve_or_init_env(nic=None, host=None):
"""
Function
@param nic:
@type nic:
@param host:
@type host:
@return:
@rtype:
"""
if host:
os.environ['MASTER_ADDR'] = host
os.environ['MASTER_PORT'] = '5000'
if nic:
os.environ['GLOO_SOCKET_IFNAME'] = nic
os.environ['TP_SOCKET_IFNAME'] = nic

def _retrieve_env_config():
rank, world_size, port = os.environ.get('RANK'), os.environ.get('WORLD_SIZE'), os.environ["MASTER_PORT"]
return rank, world_size, port

def _retrieve_network_params_from_config(config: Config, nic=None, host=None):
if hasattr(config, 'system'):
Expand All @@ -173,27 +185,33 @@ def _retrieve_network_params_from_config(config: Config, nic=None, host=None):
return nic, host


def launch_remote(base_path: Path, config_path: Path, rank: int, parser, nic=None, host=None, prefix: str = None,
**kwargs):
def launch_remote(base_path: Path, config_path: Path, rank: int, parser, nic=None, host=None, prefix: str = None, **kwargs):
print(config_path, rank)

config = Config.FromYamlFile(config_path)
config.world_size = config.num_clients + 1
config.replication_id = prefix
nic, host = _retrieve_network_params_from_config(config, nic, host)
if not (nic and host):
print('Missing rank, host, world-size, or nic argument when in \'remote\' mode!')
nic, host = _retrieve_network_params_from_config(config, nic, host)
_retrieve_or_init_env(nic, host)
elif not (nic and host):
rank, world_size, master_port = _retrieve_env_config()
assert world_size == config.world_size
else:
print('Missing rank, host, world-size, checking environment!')
parser.print_help()
exit(1)
_retrieve_env_params(nic, host)
print(f'Starting with host={os.environ["MASTER_ADDR"]} and port={os.environ["MASTER_PORT"]} and interface={nic}')

msg = f'Starting with host={host} and port={os.environ["MASTER_PORT"]} and interface={nic}'
logging.log(logging.INFO, msg)
options = rpc.TensorPipeRpcBackendOptions(
num_worker_threads=16,
rpc_timeout=0, # infinite timeout
init_method='env://',
_transports=["uv"]
_transports=["uv"] # Use LibUV backend for async/IO interaction
)
if rank != 0:
print(f'Starting worker {rank} with world size={config.world_size}')
print(f'Starting worker-{rank} with world size={config.world_size}')
rpc.init_rpc(
f"client{rank}",
rank=rank,
Expand All @@ -203,7 +221,7 @@ def launch_remote(base_path: Path, config_path: Path, rank: int, parser, nic=Non
client_node = Client(f'client{rank}', rank, config.world_size, config)
client_node.remote_registration()
else:
print(f'Starting the ps with world size={config.world_size}')
print(f'Starting the PS (Fed) with world size={config.world_size}')
rpc.init_rpc(
"federator",
rank=rank,
Expand All @@ -217,7 +235,7 @@ def launch_remote(base_path: Path, config_path: Path, rank: int, parser, nic=Non
print('Ending program')


def launch_cluster(arg_path, conf_path, args: Namespace = None, configuration: DistributedConfig = None, **kwargs):
def launch_cluster(arg_path, conf_path, args: Namespace = None, config: DistributedConfig = None, **kwargs):
"""
Function to launch Orchestrator for execution with provided configurations. Currently
this assumes that a single Orchestrator is started that manages all the resources in the cluster.
Expand All @@ -227,5 +245,5 @@ def launch_cluster(arg_path, conf_path, args: Namespace = None, configuration: D
datefmt='%m-%d %H:%M')
# Set the seed for arrivals, torch seed is mostly ignored. Set the `arrival_seed` to a different value
# for each repetition that you want to run an experiment with.
configuration.set_seed()
launch_orchestrator(args=args, conf=configuration)
config.set_seed()
launch_orchestrator(args=args, conf=config)
5 changes: 2 additions & 3 deletions fltk/nets/util/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
from .reproducability import init_reproducibility
from .model import save_model, flatten_params, recover_flattened, load_model_from_file
from .evaluation import calculate_class_recall, calculate_class_precision
from fltk.nets.util.model import save_model, flatten_params, recover_flattened, load_model_from_file
from fltk.nets.util.evaluation import calculate_class_recall, calculate_class_precision
4 changes: 2 additions & 2 deletions fltk/nets/util/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import torch
from torch.utils.tensorboard import SummaryWriter

from fltk.util.config.base_config import BareConfig
import fltk.util.config as config
from fltk.util.results import EpochData


Expand Down Expand Up @@ -46,7 +46,7 @@ def recover_flattened(flat_params, model):
return l


def initialize_default_model(config: BareConfig, model_class) -> torch.nn.Module:
def initialize_default_model(config: config.DistributedConfig, model_class) -> torch.nn.Module:
"""
Load a default model dictionary into a torch model.
@param model:
Expand Down
1 change: 1 addition & 0 deletions fltk/util/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from fltk.util.reproducability import init_reproducibility
12 changes: 6 additions & 6 deletions fltk/util/cluster/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ class Resource:

@dataclass
class BuildDescription:
resources: OrderedDict[str, V1ResourceRequirements]
typed_containers: OrderedDict[str, V1Container]
typed_templates: OrderedDict[str, V1PodTemplateSpec]
id: UUID
spec: V1PyTorchJobSpec
tolerations: List[V1Toleration]
resources = OrderedDict[str, V1ResourceRequirements]()
typed_containers = OrderedDict[str, V1Container]()
typed_templates = OrderedDict[str, V1PodTemplateSpec]()
id: Optional[UUID] = None
spec: Optional[V1PyTorchJobSpec] = None
tolerations: Optional[List[V1Toleration]] = None


class ResourceWatchDog:
Expand Down
4 changes: 2 additions & 2 deletions fltk/util/config/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .distributed_config import *
from .config import Config
from fltk.util.config.distributed_config import DistributedConfig
from fltk.util.config.config import Config
4 changes: 3 additions & 1 deletion fltk/util/config/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,12 @@ def create_util_run_parser(subparsers) -> None:

def create_remote_parser(subparsers) -> None:
remote_parser = subparsers.add_parser('remote')
add_default_arguments(remote_parser)

remote_parser.add_argument('rank', type=int)
remote_parser.add_argument('--nic', type=str, default=None)
remote_parser.add_argument('--host', type=str, default=None)
add_default_arguments(remote_parser)



def create_single_parser(subparsers) -> None:
Expand Down
2 changes: 1 addition & 1 deletion fltk/util/config/distributed_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from dataclasses_json import config, dataclass_json

from fltk.nets.util.reproducability import init_reproducibility
from fltk.util import init_reproducibility


@dataclass_json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import torch


def cuda_reproducible_backend(cuda: bool) -> None:
def _cuda_reproducible_backend(cuda: bool) -> None:
"""
Function to set the CUDA backend to reproducible (i.e. deterministic) or to default configuration (per PyTorch
1.9.1).
Expand Down Expand Up @@ -40,6 +40,6 @@ def init_reproducibility(torch_seed: int = 42, cuda: bool = False, numpy_seed: i
torch.manual_seed(torch_seed)
if cuda:
torch.cuda.manual_seed_all(torch_seed)
cuda_reproducible_backend(True)
_cuda_reproducible_backend(True)
np.random.seed(numpy_seed)
os.environ['PYTHONHASHSEED'] = str(hash_seed)
4 changes: 2 additions & 2 deletions fltk/util/show_client_distributions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pandas as pd
from tqdm import tqdm

from fltk.core.distributed.client import Client
from fltk.core.distributed.client import DistClient
from fltk.datasets.distributed import DistCIFAR10Dataset, DistCIFAR100Dataset, DistFashionMNISTDataset, DistDataset
import logging

Expand Down Expand Up @@ -108,7 +108,7 @@ def gen_distribution(name, params):
# for i, (inputs, labels) in enumerate(dataset.get_train_loader(), 0):
# print(labels)
# print('d')
client = Client("test", None, rank, args.world_size, args)
client = DistClient("test", None, rank, args.world_size, args)
client.init_dataloader()
train_loader = client.dataset.get_train_loader()
train_loader2 = dataset.get_train_loader()
Expand Down
Loading

0 comments on commit 9691b5e

Please sign in to comment.