Skip to content

Commit

Permalink
Make test-ready version of deployment with Fed and Dis tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
JMGaljaard committed Sep 4, 2022
1 parent c2e791f commit cc00faa
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 78 deletions.
4 changes: 2 additions & 2 deletions configs/distributed_tasks/example_arrival_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"trainTasks": [
{
"type": "distributed",
"lambda": 0.004,
"lambda": 1.5,
"preemptJobs": false,
"jobClassParameters": [
{
Expand All @@ -32,7 +32,7 @@
"dataset": "mnist"
},
"systemParameters": {
"dataParallelism": 2,
"dataParallelism": 4,
"configurations": {
"default": {
"cores": "1000m",
Expand Down
19 changes: 3 additions & 16 deletions configs/federated_tasks/example_arrival_config.json
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
[
{
"type": "federated",
"jobClassParameters": {
"jobClassParameters": [{
"networkConfiguration": {
"network": "FashionMNISTCNN",
"lossFunction": "CrossEntropyLoss",
"dataset": "mnist"
},
"systemParameters": {
"dataParallelism": null,
"dataParallelism": 4,
"configurations": {
"Master": {
"cores": "1000m",
Expand Down Expand Up @@ -62,20 +62,7 @@
"shuffle": true
},
"aggregation": "FedAvg"
},
"experimentConfiguration": {
"randomSeed": [
1,
41,
42,
43,
430
],
"workerReplication": {
"Master": 1,
"Worker": 2
}
}
}
}]
}
]
2 changes: 1 addition & 1 deletion experiments/dist_node.jinja.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ optimizer: {{ task.get_optimizer_param(tpe, 'type').value }}
optimizer_args: {{ task.get_optimizer_args(tpe) }}
model: {{ task.get_net_param('network').value }}
dataset: {{ task.get_net_param('dataset').value }}
max_epoch: {{ task.get_learn_param('total_epochs') }}
max_epoch: {{ task.get_hyper_param(tpe, 'total_epochs') }}
learning_rate: {{ task.get_optimizer_param(tpe, 'lr') }}
learning_rate_decay: {{ task.get_hyper_param(tpe, 'lr_decay') }}
seed: {{ task.get_net_param('seed') }}
Expand Down
13 changes: 4 additions & 9 deletions fltk/core/distributed/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,21 +231,16 @@ def run(self, clear: bool = False) -> None:
curr_task: ArrivalTask = self.pending_tasks.get()
self._logger.info(f"Scheduling arrival of Arrival: {curr_task.id}")

# Create persistent logging information. A these will not be deleted by the Orchestrator, as such
# allow you to retrieve information of experiments even after removing the PytorchJob after completion.
# Create persistent logging information. A these will not be deleted by the Orchestrator, as such, they
# allow you to retrieve information of experiments after removing the PytorchJob after completion.
config_dict, configmap_name_dict = _prepare_experiment_maps(curr_task, self._config, curr_task.id, 1)
# self._create_config_maps(config_dict)
self._create_config_maps(config_dict)

job_to_start = construct_job(self._config, curr_task, configmap_name_dict)
self._logger.info(f"Deploying on cluster: {curr_task.id}")
# self._client.create(job_to_start, namespace=self._config.cluster_config.namespace)
self._client.create(job_to_start, namespace=self._config.cluster_config.namespace)
self.deployed_tasks.append(curr_task)

# TODO: Extend this logic in your real project, this is only meant for demo purposes
# For now we exit the thread after scheduling a single task.

# self.stop()
# return

self._logger.debug("Still alive...")
# Prevent high cpu utilization by sleeping between checks.
Expand Down
45 changes: 31 additions & 14 deletions fltk/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,23 @@
from pathlib import Path

import torch.distributed as dist
import yaml
from kubernetes import config
from torch.distributed import rpc

from fltk.core.client import Client
from fltk.core.distributed import DistClient
from fltk.core.distributed import Orchestrator
from fltk.core.distributed.extractor import download_datasets
from fltk.core.federator import Federator
from fltk.core.distributed import DistClient, download_datasets
from fltk.core import Client, Federator
from fltk.nets.util.reproducability import init_reproducibility, init_learning_reproducibility
from fltk.util.cluster.client import ClusterManager

from fltk.util.cluster.worker import should_distribute
from fltk.util.config import DistributedConfig, FedLearningConfig, retrieve_config_network_params, get_learning_param_config, \
DistLearningConfig
from fltk.util.config.definitions import get_orchestrator, OrchestratorType

from fltk.util.environment import retrieve_or_init_env, retrieve_env_config
from fltk.util.task.generator.arrival_generator import SimulatedArrivalGenerator, SequentialArrivalGenerator

# Define types for clarity in execution
from fltk.util.task.generator.arrival_generator import SimulatedArrivalGenerator, SequentialArrivalGenerator

Rank = NewType('Rank', int)
NIC = NewType('NIC', str)
Host = NewType('Host', int)
Expand Down Expand Up @@ -66,7 +65,25 @@ def exec_distributed_client(task_id: str, conf: DistributedConfig = None,
print(epoch_data)


def exec_orchestrator(args: Namespace = None, conf: DistributedConfig = None, simulate_arrivals: bool = False):
def get_arrival_generator(config: DistributedConfig, experiment: str):
"""
Retrieval function to create generator functions
@param config:
@type config:
@param experiment:
@type experiment:
@return:
@rtype:
"""
__lookup = {
OrchestratorType.BATCH: SequentialArrivalGenerator,
OrchestratorType.SIMULATED: SimulatedArrivalGenerator
}

return __lookup.get(config.cluster_config.orchestrator.orchestrator_type, None)(Path(experiment))


def exec_orchestrator(args: Namespace = None, conf: DistributedConfig = None):
"""
Default runner for the Orchestrator that is based on KubeFlow
@param args: Commandline arguments passed to the execution. Might be removed in a future commit.
Expand All @@ -88,11 +105,11 @@ def exec_orchestrator(args: Namespace = None, conf: DistributedConfig = None, si
logging.info("Pointing configuration to in cluster configuration.")
conf.cluster_config.load_incluster_namespace()
conf.cluster_config.load_incluster_image()
arrival_generator = (SimulatedArrivalGenerator if simulate_arrivals else SequentialArrivalGenerator)(
args.experiment)
cluster_manager = ClusterManager()

orchestrator = Orchestrator(cluster_manager, arrival_generator, conf)

cluster_manager = ClusterManager()
arrival_generator = get_arrival_generator(conf, args.experiment)
orchestrator = get_orchestrator(conf, cluster_manager, arrival_generator)

pool = ThreadPool(3)

Expand All @@ -102,7 +119,7 @@ def exec_orchestrator(args: Namespace = None, conf: DistributedConfig = None, si
logging.info("Starting arrival generator")
pool.apply_async(arrival_generator.start, args=[conf.get_duration()])
logging.info("Starting orchestrator")
pool.apply(orchestrator.run if simulate_arrivals else orchestrator.run_batch)
pool.apply(orchestrator.run)

pool.close()
pool.join()
Expand Down
7 changes: 3 additions & 4 deletions fltk/nets/util/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import torch
from torch.utils.tensorboard import SummaryWriter


from fltk.util.results import EpochData
from typing import TYPE_CHECKING

Expand All @@ -26,8 +25,8 @@ def flatten_params(model_description: Union[torch.nn.Module, OrderedDict]):
parameters = model_description.parameters()
else:
parameters = model_description.values()
parameter_list = [torch.flatten(p) for p in parameters] # pylint: disable=no-member
flat_params = torch.cat(parameter_list).view(-1, 1) # pylint: disable=no-member
parameter_list = [torch.flatten(p) for p in parameters] # pylint: disable=no-member
flat_params = torch.cat(parameter_list).view(-1, 1) # pylint: disable=no-member
return flat_params


Expand Down Expand Up @@ -84,7 +83,7 @@ def load_model_from_file(model: torch.nn.Module, model_file_path: Path) -> None:
if model_file_path.is_file():
try:
model.load_state_dict(torch.load(model_file_path))
except Exception: # pylint: disable=broad-except
except Exception: # pylint: disable=broad-except
logging.warning("Couldn't load model. Attempting to map CUDA tensors to CPU to solve error.")
else:
logging.warning(f'Could not find model: {model_file_path}')
Expand Down
8 changes: 4 additions & 4 deletions fltk/util/task/config/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class SystemParameters:
executor_memory: Amount of RAM allocated to each executor.
action: Indicating whether it regards 'inference' or 'train'ing time.
"""
data_parallelism: Optional[int]
data_parallelism: int
configurations: OrderedDict[str, SystemResources]

def get(self, tpe: str):
Expand Down Expand Up @@ -290,16 +290,16 @@ class TrainTask:
Training description used by the orchestrator to generate tasks. Contains 'transposed' information of the
configuration file.
Dataclass is ordered, to allow for ordering of arrived tasks in a PriorityQueue (for scheduling).
Dataclass is ordered, to allow for ordering of arrived tasks in a PriorityQueue (can be used for scheduling).
"""
network_configuration: NetworkConfiguration = field(compare=False)
system_parameters: SystemParameters = field(compare=False)
hyper_parameters: HyperParameters = field(compare=False)
learning_parameters: Optional[LearningParameters] = field(compare=False)
seed: int = field(compare=False)
identifier: str = field(compare=False)
replication: Optional[int] = None
priority: Optional[int] = None
replication: Optional[int] = field(compare=False, default=None) # Utilized for batch arrivals.
priority: Optional[int] = None # Allow for sorting/priority.
experiment_type: ExperimentType = field(compare=False, metadata=config(field_name="type"), default=None)

def __init__(self, identity: str,
Expand Down
43 changes: 28 additions & 15 deletions fltk/util/task/generator/arrival_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
from abc import abstractmethod
from dataclasses import dataclass
from datetime import timedelta
from pathlib import Path
from queue import Queue
from random import choices
Expand Down Expand Up @@ -42,7 +43,7 @@ def load_config(self):
parser = ExperimentParser(config_path=self.configuration_path)
experiment_descriptions = parser.parse()
self.job_dict = collections.OrderedDict(
{f'train_job_{indx}': item for indx, item in enumerate(experiment_descriptions)})
{f'train_job_{indx}': item for indx, item in enumerate(experiment_descriptions.train_tasks)})

def start(self, duration: Union[float, int]):
"""
Expand Down Expand Up @@ -89,8 +90,8 @@ def set_logger(self, name: str = None):
@dataclass
class Arrival:
"""
Dataclass describing the information needed to keep track of Arrivals and let them `Arrive'. Uses a single timer
to allow for easy generation of tasks.
Dataclass containing the information needed to keep track of Arrivals to allow their arrival to be scheduled.
Uses a single timer to allow for generation of tasks with lower overhead.
"""
ticks: Optional[int]
task: TrainTask
Expand All @@ -111,9 +112,6 @@ def get_system_config(self) -> SystemParameters: # pylint: disable=missing-funct
def get_parameter_config(self) -> HyperParameters: # pylint: disable=missing-function-docstring
return self.task.hyper_parameters

def get_experiment_config(self) -> ExperimentConfiguration: # pylint: disable=missing-function-docstring
return self.task.experiment_configuration

def get_learning_config(self) -> LearningParameters: # pylint: disable=missing-function-docstring
return self.task.learning_parameters

Expand All @@ -123,6 +121,10 @@ class SimulatedArrivalGenerator(ArrivalGenerator):
Experiments (on K8s) generator that simulates the arrival of training tasks according to a pre-defined distribution.
As such, a set of clients can be simulated that submit various types of training jobs. See also
BatchArrivalGenerator for an implementation that will directly schedule all arrivals on the cluster.
N.B. it's intended purpose is to easily execute simulate different users/components requesting training jobs.
Allowing to schedule different configuration of experiments, to see how a scheduling algorithm behaves. For example
simulating users/systems deploying training pipelines with regular intervals.
"""
job_dict: Dict[str, JobDescription] = None

Expand All @@ -145,7 +147,7 @@ def set_logger(self, name: str = None):
logging_name = name or self.__class__.__name__
self.logger = logging.getLogger(logging_name)

def generate_arrival(self, task_id: str) -> Arrival:
def generate_arrival(self, task_id: str, inter_arrival_unit: timedelta = timedelta(minutes=1)) -> Arrival:
"""
Generate a training task for a JobDescription once the inter-arrival time has been 'deleted'.
@param task_id: identifier for a training task corresponding to the JobDescription.
Expand All @@ -156,12 +158,18 @@ def generate_arrival(self, task_id: str) -> Arrival:
msg = f"Creating task for {task_id}"
self.logger.info(msg)
job: JobDescription = self.job_dict[task_id]
parameters: JobClassParameter = \
choices(job.job_class_parameters, [param.class_probability for param in job.job_class_parameters])[0]
priority = choices(parameters.priorities, [prio.probability for prio in parameters.priorities], k=1)[0]

inter_arrival_ticks = np.random.poisson(lam=job.arrival_statistic)
train_task = TrainTask(task_id, parameters, priority)
# Select job configuration according to the weight of the `classProbability` (limit 1)
parameters, *_ = choices(job.job_class_parameters,
[job_param.class_probability for job_param in job.job_class_parameters], k=1)
# Select job configuration according to the weight of the selected `classParameter`'s priorities (limit 1)
priority, *_ = choices(parameters.priorities, [prio.probability for prio in parameters.priorities], k=1)

inter_arrival_ticks = np.random.poisson(lam=job.arrival_statistic) * inter_arrival_unit.seconds
train_task = TrainTask(identity=task_id,
job_parameters=parameters,
priority=priority,
experiment_type=job.experiment_type)

return Arrival(inter_arrival_ticks, train_task, task_id)

Expand All @@ -182,15 +190,14 @@ def run(self, duration: float):
event = multiprocessing.Event()
while self.alive and time.time() - self.start_time < duration:
save_time = time.time()

new_scheduled = []
for entry in self._tick_list:
entry.ticks -= self._decrement
if entry.ticks <= 0:
self.arrivals.put(entry)
new_arrival = self.generate_arrival(entry.task_id)
new_scheduled.append(new_arrival)
msg = f"Arrival {new_arrival} arrives at {new_arrival.ticks} seconds"
msg = f"Arrival {new_arrival.task_id} arrives in {new_arrival.ticks} seconds"
self.logger.info(msg)
else:
new_scheduled.append(entry)
Expand All @@ -207,9 +214,14 @@ class SequentialArrivalGenerator(ArrivalGenerator):
"""
Experiments (on K8s) generator that directly generates all arrivals to be executed. This will rely on the scheduling
policy of Kubeflows' Pytorch TrainOperator.
This allows for running batches of train jobs, e.g. to run a certain experiment configuration with a number of
replications in a fire-and-forget fashion. SimulatedArrivalGenerator for an implementation that will simulate
arrivals following a pre-defined distribution.
N.B. it's intended purpose is to easily execute a range of experiments, with possibly different configurations,
where reproducability is important. For example, running a batch of experiments of a training algorithm to see
the effect of hyperparameters on test/validation performance.
"""

def __init__(self, custom_config: Path):
Expand All @@ -233,12 +245,13 @@ def run(self, duration: float):

description: JobDescription
for job_name, description in self.job_dict.items():
# TODO: Ensure seeds are set properly
for repl, seed in enumerate(description.job_class_parameters.experiment_configuration.random_seed):
replication_name = f"{job_name}_{repl}_{seed}"
train_task = TrainTask(identity=replication_name,
job_parameters=description.job_class_parameters,
priority=description.priority,
experiment_config=description.get_experiment_configuration(),
# experiment_config=description.get_experiment_configuration(),
replication=repl,
experiment_type=description.experiment_type)

Expand Down
Loading

0 comments on commit cc00faa

Please sign in to comment.