Skip to content

Commit

Permalink
Resolve issues with example deployment
Browse files Browse the repository at this point in the history
  • Loading branch information
JMGaljaard committed Sep 6, 2022
1 parent 33b8429 commit 96b927e
Show file tree
Hide file tree
Showing 26 changed files with 231 additions and 198 deletions.
4 changes: 2 additions & 2 deletions configs/distributed_tasks/example_arrival_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
"dataset": "mnist"
},
"systemParameters": {
"dataParallelism": 4,
"dataParallelism": 2,
"configurations": {
"default": {
"cores": "1000m",
"cores": "500m",
"memory": "1Gi"
}
}
Expand Down
2 changes: 1 addition & 1 deletion configs/example_cloud_experiment.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"namespace": "test"
},
"execution_config": {
"duration": 3600,
"duration": 60,
"experiment_prefix": "cloud_experiment",
"cuda": false,
"tensorboard": {
Expand Down
2 changes: 2 additions & 0 deletions fltk/core/distributed/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from fltk.core.distributed.client import DistClient
from fltk.core.distributed.dist_node import DistNode
from fltk.core.distributed.extractor import download_datasets
Expand Down
3 changes: 2 additions & 1 deletion fltk/core/distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from torch.utils.tensorboard import SummaryWriter

from fltk.core.distributed.dist_node import DistNode
from fltk.datasets import get_dist_dataset
from fltk.util.config.definitions.dataset import get_dist_dataset
from fltk.nets import get_net
from fltk.nets.util import calculate_class_precision, calculate_class_recall, save_model, load_model_from_file
from fltk.schedulers import MinCapableStepLR, LearningScheduler
Expand All @@ -20,6 +20,7 @@
if TYPE_CHECKING:
from fltk.util.config import DistributedConfig, DistLearningConfig


class DistClient(DistNode):

def __init__(self, rank: int, task_id: str, world_size: int, config: DistributedConfig = None,
Expand Down
21 changes: 1 addition & 20 deletions fltk/datasets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
from fltk.datasets.cifar10 import CIFAR10Dataset
from fltk.datasets.cifar100 import CIFAR100Dataset
from fltk.datasets.fashion_mnist import FashionMNISTDataset
from fltk.datasets.fashion_mnist import FashionMNISTDataset
from fltk.datasets.mnist import MNIST
from fltk.util.config.definitions import Dataset

def available_dataparallel_datasets():
return {
Dataset.cifar10: CIFAR10Dataset,
Dataset.cifar100: CIFAR100Dataset,
Dataset.fashion_mnist: FashionMNISTDataset,
Dataset.mnist: MNIST
}


def get_train_loader_path(name: Dataset) -> str:
paths = {
Dataset.cifar10: 'data_loaders/cifar10/train_data_loader.pickle',
Expand All @@ -32,13 +23,3 @@ def get_test_loader_path(name: Dataset) -> str:
}
return paths[name]


def get_dist_dataset(name: Dataset):
"""
Function to retrieve distributed dataset (Distributed Learning Experiment).
@param name: Definition name of the datset.
@type name: Dataset
@return:
@rtype:
"""
return available_dataparallel_datasets()[name]
5 changes: 3 additions & 2 deletions fltk/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
from kubernetes import config
from torch.distributed import rpc
from fltk.core.distributed import DistClient, download_datasets
from fltk.util.config.definitions.orchestrator import get_orchestrator
from fltk.core import Client, Federator
from fltk.nets.util.reproducability import init_reproducibility, init_learning_reproducibility
from fltk.util.cluster.client import ClusterManager

from fltk.util.cluster.worker import should_distribute
from fltk.util.config import DistributedConfig, FedLearningConfig, retrieve_config_network_params, get_learning_param_config, \
DistLearningConfig
from fltk.util.config.definitions import get_orchestrator, OrchestratorType
from fltk.util.config.definitions import OrchestratorType

from fltk.util.environment import retrieve_or_init_env, retrieve_env_config

Expand Down Expand Up @@ -121,8 +122,8 @@ def exec_orchestrator(args: Namespace = None, conf: DistributedConfig = None):
logging.info("Starting orchestrator")
pool.apply(orchestrator.run)

pool.close()
pool.join()
pool.close()

logging.info("Stopped execution of Orchestrator...")

Expand Down
4 changes: 2 additions & 2 deletions fltk/util/cluster/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ def _generate_command(config: DistributedConfig, task: ArrivalTask) -> List[str]
command = 'python3 -m fltk remote experiments/node.config.yaml'
else:
# TODO: Set correct backend depending on CUDA.
command = (f'python3 -m fltk client {config.config_path} {task.id} '
f'experiments/node.config.json --backend gloo')
command = (f'python3 -m fltk client experiments/node.config.yaml {task.id} '
f'{config.config_path} --backend gloo')
return command.split(' ')


Expand Down
9 changes: 8 additions & 1 deletion fltk/util/config/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
from __future__ import annotations

import json
from pathlib import Path
from typing import Optional
from typing import Optional, Union, Type, Dict

import torch
import yaml
import logging

from torch.nn.modules.loss import _Loss

from fltk.util.config.definitions import Loss

from fltk.util.config.distributed_config import DistributedConfig
from fltk.util.config.learning_config import FedLearningConfig, get_safe_loader, DistLearningConfig

Expand Down
6 changes: 3 additions & 3 deletions fltk/util/config/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ def _create_client_parser(subparsers) -> None:
@rtype: None
"""
client_parser = subparsers.add_parser('client')
client_parser.add_argument('config', type=str)
client_parser.add_argument('task_id', type=str)
client_parser.add_argument('experiment_config', type=str)
client_parser.add_argument('experiment_config', type=str, help="Experiment specific config (yaml).")
client_parser.add_argument('task_id', type=str, help="Unique identifier for task.")
client_parser.add_argument('config', type=str, help="General cluster/orchestrator config (json).")
# Add parameter parser for backend
client_parser.add_argument('--backend', type=str, help='Distributed backend',
choices=[dist.Backend.GLOO, dist.Backend.NCCL, dist.Backend.MPI],
Expand Down
70 changes: 1 addition & 69 deletions fltk/util/config/definitions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,6 @@
Module for declaring types and definitions, including helper functions that allow to retrieve
object (types) from a definition.
"""
from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Union, Type, Dict

import torch
from torch.nn.modules.loss import _Loss

from .data_sampler import DataSampler
from .optim import Optimizations
from .aggregate import Aggregations
Expand All @@ -19,64 +11,4 @@
from .optim import Optimizations
from .experiment_type import ExperimentType
from .loss import Loss
from fltk.util.config.definitions.orchestrator import OrchestratorType


if TYPE_CHECKING:
from fltk.core.distributed import Orchestrator, BatchOrchestrator, SimulatedOrchestrator
from fltk.util.config import DistributedConfig
from fltk.util.cluster import ClusterManager
from fltk.util.task.generator import ArrivalGenerator


def get_orchestrator(config: DistributedConfig, cluster_manager: ClusterManager, arrival_generator: ArrivalGenerator) -> Orchestrator:
"""
Retrieve Orchestrator type given a Distributed (experiment) configuration. This allows for defining the
type of experiment (Batch or Simulated arrivals) once, and letting the Orchestrator implementation
make sure that the tasks are scheduled correctly.
@param config: Distributed (cluster) configuration object for experiments.
@type config: DistributedConfig
@return: Type of Orchestrator as requested by configuration object.
@rtype: Type[Orchestrator]
"""
__lookup = {
OrchestratorType.BATCH: BatchOrchestrator,
OrchestratorType.SIMULATED: SimulatedOrchestrator
}

orchestrator_type = __lookup.get(config.cluster_config.orchestrator.orchestrator_type, None)
return orchestrator_type(cluster_manager, arrival_generator, config)


def get_loss_function(request: Union[str, Loss]) -> Type[_Loss]:
"""
Mapper function to map a request to a loss function. As fallback behavior the request is evaluated
using the Python interpreter to try to load an existing implementation dynamically.
"""
__lookup_dict: Dict[Loss, Type[_Loss]] = {
Loss.l1_loss: torch.nn.L1Loss,
Loss.mse_loss: torch.nn.MSELoss,
Loss.cross_entropy_loss: torch.nn.CrossEntropyLoss,
Loss.ctc_loss: torch.nn.CTCLoss,
Loss.nll_loss: torch.nn.NLLLoss,
Loss.poisson_nll_loss: torch.nn.PoissonNLLLoss,
Loss.gaussian_nll_loss: torch.nn.GaussianNLLLoss,
Loss.kldiv_loss: torch.nn.KLDivLoss,
Loss.bce_loss: torch.nn.BCELoss,
Loss.bce_with_logits_loss: torch.nn.BCEWithLogitsLoss,
Loss.margin_ranking_loss: torch.nn.MarginRankingLoss,
Loss.multi_label_margin_loss: torch.nn.MultiLabelMarginLoss,
Loss.huber_loss: torch.nn.HuberLoss,
Loss.smooth_l1_loss: torch.nn.SmoothL1Loss,
Loss.soft_margin_loss: torch.nn.SoftMarginLoss,
Loss.multi_label_soft_margin_loss: torch.nn.MultiLabelSoftMarginLoss,
Loss.cosine_embedding_loss: torch.nn.CosineEmbeddingLoss,
Loss.multi_margin_loss: torch.nn.MultiMarginLoss,
Loss.triplet_margin_loss: torch.nn.TripletMarginLoss,
Loss.triplet_margin_with_distance_loss: torch.nn.TripletMarginWithDistanceLoss}

if isinstance(request, Loss):
return __lookup_dict.get(request)
else:
logging.info(f"Loading non-predefined loss function {request}")
return eval(request)
from .orchestrator import OrchestratorType
19 changes: 19 additions & 0 deletions fltk/util/config/definitions/dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from aenum import unique, Enum
from re import T

from fltk.datasets import CIFAR10Dataset, CIFAR100Dataset, FashionMNISTDataset, MNIST


@unique
class Dataset(Enum):
Expand All @@ -14,3 +16,20 @@ def _missing_name_(cls, name: str) -> T:
for member in cls:
if member.name.lower() == name.lower():
return member


def get_dist_dataset(name: Dataset):
"""
Function to retrieve distributed dataset (Distributed Learning Experiment).
@param name: Definition name of the datset.
@type name: Dataset
@return:
@rtype:
"""
__lookup = {
Dataset.cifar10: CIFAR10Dataset,
Dataset.cifar100: CIFAR100Dataset,
Dataset.fashion_mnist: FashionMNISTDataset,
Dataset.mnist: MNIST
}
return __lookup[name]
37 changes: 35 additions & 2 deletions fltk/util/config/definitions/loss.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
Module providing an Enum based definition for different loss functions that are compatible with Pytorch.
In addition, it provides functionality to map a definition to an implementation.
"""

import logging
from enum import Enum, unique
from typing import Dict, Type
from typing import Dict, Type, Union

import torch
from torch.nn.modules.loss import _Loss
Expand Down Expand Up @@ -33,3 +33,36 @@ class Loss(Enum):
triplet_margin_loss = 'TripletMarginLoss'
triplet_margin_with_distance_loss = 'TripletMarginWithDistanceLoss'


def get_loss_function(request: Union[str, Loss]) -> Type[_Loss]:
"""
Mapper function to map a request to a loss function. As fallback behavior the request is evaluated
using the Python interpreter to try to load an existing implementation dynamically.
"""
__lookup_dict: Dict[Loss, Type[_Loss]] = {
Loss.l1_loss: torch.nn.L1Loss,
Loss.mse_loss: torch.nn.MSELoss,
Loss.cross_entropy_loss: torch.nn.CrossEntropyLoss,
Loss.ctc_loss: torch.nn.CTCLoss,
Loss.nll_loss: torch.nn.NLLLoss,
Loss.poisson_nll_loss: torch.nn.PoissonNLLLoss,
Loss.gaussian_nll_loss: torch.nn.GaussianNLLLoss,
Loss.kldiv_loss: torch.nn.KLDivLoss,
Loss.bce_loss: torch.nn.BCELoss,
Loss.bce_with_logits_loss: torch.nn.BCEWithLogitsLoss,
Loss.margin_ranking_loss: torch.nn.MarginRankingLoss,
Loss.multi_label_margin_loss: torch.nn.MultiLabelMarginLoss,
Loss.huber_loss: torch.nn.HuberLoss,
Loss.smooth_l1_loss: torch.nn.SmoothL1Loss,
Loss.soft_margin_loss: torch.nn.SoftMarginLoss,
Loss.multi_label_soft_margin_loss: torch.nn.MultiLabelSoftMarginLoss,
Loss.cosine_embedding_loss: torch.nn.CosineEmbeddingLoss,
Loss.multi_margin_loss: torch.nn.MultiMarginLoss,
Loss.triplet_margin_loss: torch.nn.TripletMarginLoss,
Loss.triplet_margin_with_distance_loss: torch.nn.TripletMarginWithDistanceLoss}

if isinstance(request, Loss):
return __lookup_dict.get(request)
else:
logging.info(f"Loading non-predefined loss function {request}")
return eval(request)
29 changes: 29 additions & 0 deletions fltk/util/config/definitions/orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,36 @@
from __future__ import annotations

from enum import unique, Enum

from fltk.core.distributed import Orchestrator, BatchOrchestrator, SimulatedOrchestrator


from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fltk.util.config import DistributedConfig
from fltk.util.cluster import ClusterManager
from fltk.util.task.generator import ArrivalGenerator

@unique
class OrchestratorType(Enum):
BATCH = 'batch'
SIMULATED = 'simulated'


def get_orchestrator(config: DistributedConfig, cluster_manager: ClusterManager, arrival_generator: ArrivalGenerator) -> Orchestrator:
"""
Retrieve Orchestrator type given a Distributed (experiment) configuration. This allows for defining the
type of experiment (Batch or Simulated arrivals) once, and letting the Orchestrator implementation
make sure that the tasks are scheduled correctly.
@param config: Distributed (cluster) configuration object for experiments.
@type config: DistributedConfig
@return: Type of Orchestrator as requested by configuration object.
@rtype: Type[Orchestrator]
"""
__lookup = {
OrchestratorType.BATCH: BatchOrchestrator,
OrchestratorType.SIMULATED: SimulatedOrchestrator
}

orchestrator_type = __lookup.get(config.cluster_config.orchestrator.orchestrator_type, None)
return orchestrator_type(cluster_manager, arrival_generator, config)
3 changes: 2 additions & 1 deletion fltk/util/config/learning_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
# noinspection PyProtectedMember
from torch.nn.modules.loss import _Loss

from fltk.util.config.definitions import (DataSampler, Loss, get_loss_function, Aggregations, Dataset, LogLevel, Nets,
from fltk.util.config.definitions import (DataSampler, Loss, Aggregations, Dataset, LogLevel, Nets,
Optimizations)
from fltk.util.config.definitions.loss import get_loss_function


def _eval_decoder(obj: Union[str, T]) -> Union[Any, T]:
Expand Down
2 changes: 1 addition & 1 deletion fltk/util/task/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def build(arrival: Arrival, u_id: uuid.UUID, replication: int) -> T:
priority=arrival.get_priority(),
dataset=arrival.get_dataset(),
loss_function=arrival.task.network_configuration.loss_function,
seed=random.randint(0, sys.maxsize),
seed=random.randint(0, 2**32 - 2),
replication=replication,
type_map=collections.OrderedDict({
'Master': MASTER_REPLICATION,
Expand Down
Loading

0 comments on commit 96b927e

Please sign in to comment.