Skip to content

Commit

Permalink
Introduce experiment_replication to orchestrator and allow for waitin…
Browse files Browse the repository at this point in the history
…g for jobs to complete
  • Loading branch information
JMGaljaard committed Sep 6, 2022
1 parent 945339e commit f894399
Showing 1 changed file with 57 additions and 21 deletions.
78 changes: 57 additions & 21 deletions fltk/core/distributed/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
import logging
import time
import uuid
from pathlib import Path
from queue import PriorityQueue
from typing import List, OrderedDict, Dict, Type
from typing import OrderedDict, Dict, Type, Set

from jinja2 import Environment, FileSystemLoader
from kubeflow.training import PyTorchJobClient
Expand Down Expand Up @@ -137,8 +136,8 @@ class Orchestrator(DistNode, abc.ABC):
_alive = False
# Priority queue, requires an orderable object, otherwise a Tuple[int, Any] can be used to insert.
pending_tasks: "PriorityQueue[ArrivalTask]" = PriorityQueue()
deployed_tasks: List[DistributedArrivalTask] = []
completed_tasks: List[str] = []
deployed_tasks: Set[ArrivalTask] = set()
completed_tasks: Set[ArrivalTask] = set()
SLEEP_TIME = 5

def __init__(self, cluster_mgr: ClusterManager, arv_gen: ArrivalGenerator, config: DistributedConfig):
Expand All @@ -161,15 +160,20 @@ def stop(self) -> None:
self._logger.info("Received stop signal for the Orchestrator.")
self._alive = False

self._cluster_mgr.stop()

@abc.abstractmethod
def run(self, clear: bool = False) -> None:
def run(self, clear: bool = False, experiment_replication: int = -1) -> None:
"""
Main loop of the Orchestrator for simulated arrivals. By default previous deployments are not stopped (i.e.
PytorchTrainingJobs) on the cluster, which may interfere with utilization statistics of your cluster.
Make sure to check if you want previous results to be removed.
@param clear: Boolean indicating whether a previous deployment needs to be cleaned up (i.e. lingering jobs that
were deployed by the previous run).
@type clear: bool
@param experiment_replication: Replication index (integer) to allow for the logging to experiment specific
directories for experiments.
@type experiment_replication: int
@return: None
@rtype: None
"""
Expand Down Expand Up @@ -206,13 +210,35 @@ def _create_config_maps(self, config_maps: Dict[str, V1ConfigMap]) -> None:
self._v1.create_namespaced_config_map(self._config.cluster_config.namespace,
config_map)

def wait_for_jobs_to_complete(self):
"""
Function to wait for all tasks to complete. This allows to wait for all the resources to free-up after running
an experiment. Thereby not letting running e
"""
while len(self.deployed_tasks) > 0:
task_to_move = set()
for task in self.deployed_tasks:
job_status = self._client.get_job_status(name=f"trainjob-{task.id}", namespace='test')
if job_status != 'Running':
logging.info(f"{task.id} was completed with status: {job_status}, moving to completed")
task_to_move.add(task)
else:
logging.info(f"Waiting for {task.id} to complete")

self.completed_tasks.update(task_to_move)
self.deployed_tasks.difference_update(task_to_move)
time.sleep(self.SLEEP_TIME)


class SimulatedOrchestrator(Orchestrator):
"""
Orchestrator implementation for Simulated arrivals. Currently, supports only Poisson inter-arrival times.
"""

def __init__(self, cluster_mgr: ClusterManager, arrival_generator: ArrivalGenerator, config: DistributedConfig):
super().__init__(cluster_mgr, arrival_generator, config)

def run(self, clear: bool = False) -> None:
def run(self, clear: bool = False, experiment_replication: int = -1) -> None:
self._alive = True
start_time = time.time()
if clear:
Expand All @@ -231,22 +257,29 @@ def run(self, clear: bool = False) -> None:
curr_task: ArrivalTask = self.pending_tasks.get()
self._logger.info(f"Scheduling arrival of Arrival: {curr_task.id}")

# Create persistent logging information. A these will not be deleted by the Orchestrator, as such, they
# allow you to retrieve information of experiments after removing the PytorchJob after completion.
config_dict, configmap_name_dict = _prepare_experiment_maps(curr_task, self._config, curr_task.id, 1)
self._create_config_maps(config_dict)
try:
# Create persistent logging information. A these will not be deleted by the Orchestrator, as such, they
# allow you to retrieve information of experiments after removing the PytorchJob after completion.
config_dict, configmap_name_dict = _prepare_experiment_maps(curr_task,
config=self._config,
u_id=curr_task.id,
replication=experiment_replication)
self._create_config_maps(config_dict)

job_to_start = construct_job(self._config, curr_task, configmap_name_dict)
self._logger.info(f"Deploying on cluster: {curr_task.id}")
self._client.create(job_to_start, namespace=self._config.cluster_config.namespace)
self.deployed_tasks.append(curr_task)
job_to_start = construct_job(self._config, curr_task, configmap_name_dict)
self._logger.info(f"Deploying on cluster: {curr_task.id}")
self._client.create(job_to_start, namespace=self._config.cluster_config.namespace)
self.deployed_tasks.add(curr_task)
except:
pass


self._logger.debug("Still alive...")
self._logger.info("Still alive...")
# Prevent high cpu utilization by sleeping between checks.
time.sleep(self.SLEEP_TIME)

logging.info('Experiment completed, currently does not support waiting.')
self.stop()
self.wait_for_jobs_to_complete()
self._logger.info('Experiment completed.')


class BatchOrchestrator(Orchestrator):
Expand All @@ -257,7 +290,7 @@ class BatchOrchestrator(Orchestrator):
def __init__(self, cluster_mgr: ClusterManager, arrival_generator: ArrivalGenerator, config: DistributedConfig):
super().__init__(cluster_mgr, arrival_generator, config)

def run(self, clear: bool = False) -> None:
def run(self, clear: bool = False, experiment_replication: int = 1) -> None:
"""
Main loop of the Orchestrator for processing a configuration as a batch, i.e. deploy all-at-once (batch)
without any scheduling or simulation applied. This will make use of Kubeflow Training-operators to ensure that
Expand Down Expand Up @@ -288,7 +321,10 @@ def run(self, clear: bool = False) -> None:

# Create persistent logging information. A these will not be deleted by the Orchestrator, as such
# allow you to retrieve information of experiments even after removing the PytorchJob after completion.
config_dict, configmap_name_dict = _prepare_experiment_maps(curr_task, self._config, curr_task.id, 1)
config_dict, configmap_name_dict = _prepare_experiment_maps(curr_task,
config=self._config,
u_id=curr_task.id,
replication=experiment_replication)
self._create_config_maps(config_dict)

job_to_start = construct_job(self._config, curr_task, configmap_name_dict)
Expand All @@ -298,9 +334,9 @@ def run(self, clear: bool = False) -> None:

# TODO: Extend this logic in your real project, this is only meant for demo purposes
# For now we exit the thread after scheduling a single task.
self.stop()
self.stop()

self._logger.debug("Still alive...")
time.sleep(self.SLEEP_TIME)

self.wait_for_jobs_to_complete()
logging.info('Experiment completed, currently does not support waiting.')

0 comments on commit f894399

Please sign in to comment.