Skip to content

Commit

Permalink
Make batch orchestrator run all experiments regardless of duration
Browse files Browse the repository at this point in the history
  • Loading branch information
JMGaljaard committed Sep 26, 2022
1 parent 824fae1 commit 763bf5e
Showing 1 changed file with 66 additions and 39 deletions.
105 changes: 66 additions & 39 deletions fltk/core/distributed/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import abc
import collections
import logging
import re
import time
import uuid
from queue import PriorityQueue
from typing import OrderedDict, Dict, Type, Set, Union
from typing import OrderedDict, Dict, Type, Set, Union, Optional, List
from typing import TYPE_CHECKING

from jinja2 import Environment, FileSystemLoader
Expand All @@ -18,6 +19,7 @@
from fltk.core.distributed.dist_node import DistNode
from fltk.util.cluster.client import construct_job, ClusterManager
from fltk.util.task import get_job_arrival_class, DistributedArrivalTask, FederatedArrivalTask, ArrivalTask
from fltk.util.task.arrival_task import HistoricalArrivalTask, _ArrivalTask
from fltk.util.task.generator import ArrivalGenerator

if TYPE_CHECKING:
Expand Down Expand Up @@ -136,8 +138,8 @@ class Orchestrator(DistNode, abc.ABC):
_alive = False
# Priority queue, requires an orderable object, otherwise a Tuple[int, Any] can be used to insert.
pending_tasks: "PriorityQueue[ArrivalTask]" = PriorityQueue()
deployed_tasks: Set[ArrivalTask] = set()
completed_tasks: Set[ArrivalTask] = set()
deployed_tasks: Set[_ArrivalTask] = set()
completed_tasks: Set[_ArrivalTask] = set()
SLEEP_TIME = 5

def __init__(self, cluster_mgr: ClusterManager, arv_gen: ArrivalGenerator, config: DistributedConfig):
Expand Down Expand Up @@ -210,12 +212,18 @@ def _create_config_maps(self, config_maps: Dict[str, V1ConfigMap]) -> None:
self._v1.create_namespaced_config_map(self._config.cluster_config.namespace,
config_map)

def wait_for_jobs_to_complete(self, ret=False):
def wait_for_jobs_to_complete(self, ret=False, others: Optional[List[str]] = None):
"""
Function to wait for all tasks to complete. This allows to wait for all the resources to free-up after running
an experiment. Thereby allowing for running multiple experiments on a single cluster, without letting
experiments interfere with each other.
"""
if others:
uuid_regex = re.compile("[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}")

ids = {uuid_regex.search(task).group() for task in others if uuid_regex.search(task) is not None}
historical_tasks = map(HistoricalArrivalTask, ids)
self.deployed_tasks.update(historical_tasks)
while len(self.deployed_tasks) > 0:
task_to_move = set()
for task in self.deployed_tasks:
Expand All @@ -230,11 +238,9 @@ def wait_for_jobs_to_complete(self, ret=False):
logging.info(f"{task.id} was completed with status: {job_status}, moving to completed")
task_to_move.add(task)
else:
logging.info(f"Waiting for {task.id} to complete")
logging.info(f"Waiting for {task.id} to complete, {self.pending_tasks.qsize()} pending, {self._arrival_generator.arrivals.qsize()} arrivals")
self.completed_tasks.update(task_to_move)
self.deployed_tasks.difference_update(task_to_move)
if ret:
return
time.sleep(self.SLEEP_TIME)


Expand Down Expand Up @@ -297,7 +303,9 @@ class BatchOrchestrator(Orchestrator):
def __init__(self, cluster_mgr: ClusterManager, arrival_generator: ArrivalGenerator, config: DistributedConfig):
super().__init__(cluster_mgr, arrival_generator, config)

def run(self, clear: bool = False, experiment_replication: int = 1) -> None:
def run(self, clear: bool = False,
experiment_replication: int = 1,
wait_historical: bool = True) -> None:
"""
Main loop of the Orchestrator for processing a configuration as a batch, i.e. deploy all-at-once (batch)
without any scheduling or simulation applied. This will make use of Kubeflow Training-operators to ensure that
Expand All @@ -308,39 +316,58 @@ def run(self, clear: bool = False, experiment_replication: int = 1) -> None:
@return: None
@rtype: None
"""
self._logger.info(f"Starting experiment Orchestrator: {experiment_replication}")
self._alive = True
start_time = time.time()
if clear:
self._clear_jobs()
duration = self._config.get_duration()
while self._alive and ((duration < 0) or not time.time() - start_time < duration):
# 1. Check arrivals
# If new arrivals, store them in arrival PriorityQueue
while not self._arrival_generator.arrivals.empty():
arrival = self._arrival_generator.arrivals.get()
task = _generate_task(arrival)
self._logger.debug(f"Arrival of: {task}")
self.pending_tasks.put(task)
# 2. Schedule all tasks that arrived previously
while not self.pending_tasks.empty():
# Do blocking request to priority queue
curr_task: ArrivalTask = self.pending_tasks.get()
self._logger.info(f"Scheduling arrival of Arrival: {curr_task.id}")
try:
if wait_historical:
curr_jobs = self._client.get(namespace="test")
jobs = [job['metadata']['name'] for job in curr_jobs['items']]
self.wait_for_jobs_to_complete(ret=False, others=jobs)
start_time = time.time()

if clear:
self._clear_jobs()
except Exception as e:
self._logger.warning(f"Failed during house keeping: {e}")

# Create persistent logging information. A these will not be deleted by the Orchestrator, as such
# allow you to retrieve information of experiments even after removing the PytorchJob after completion.
config_dict, configmap_name_dict = _prepare_experiment_maps(curr_task,
config=self._config,
u_id=curr_task.id,
replication=experiment_replication)
self._create_config_maps(config_dict)
duration = self._config.get_duration()
# In case client does not generate experiment in-time

job_to_start = construct_job(self._config, curr_task, configmap_name_dict)
self._logger.info(f"Deploying on cluster: {curr_task.id}")
self._client.create(job_to_start, namespace=self._config.cluster_config.namespace)
self.deployed_tasks.add(curr_task)
if not self._config.cluster_config.orchestrator.parallel_execution:
self.wait_for_jobs_to_complete(ret=True)
# TODO: Add test suite for batch orchestrator
while self._arrival_generator.arrivals.qsize() == 0:
self._logger.info("Waiting for first arrival!")
time.sleep(self.SLEEP_TIME)
# 1. Check arrivals
# If new arrivals, store them in arrival PriorityQueue
while not self._arrival_generator.arrivals.empty():
arrival = self._arrival_generator.arrivals.get()
task = _generate_task(arrival)
self._logger.debug(f"Arrival of: {task}")
self.pending_tasks.put(task)
# 2. Schedule all tasks that arrived previously
while not self.pending_tasks.empty():
# Do blocking request to priority queue
curr_task: ArrivalTask = self.pending_tasks.get()
self._logger.info(f"Scheduling arrival of Arrival: {curr_task.id}")

# Create persistent logging information. A these will not be deleted by the Orchestrator, as such
# allow you to retrieve information of experiments even after removing the PytorchJob after completion.
config_dict, configmap_name_dict = _prepare_experiment_maps(curr_task,
config=self._config,
u_id=curr_task.id,
replication=experiment_replication)
self._create_config_maps(config_dict)

job_to_start = construct_job(self._config, curr_task, configmap_name_dict)
self._logger.info(f"Deploying on cluster: {curr_task.id}")
self._client.create(job_to_start, namespace=self._config.cluster_config.namespace)
self.deployed_tasks.add(curr_task)
# Either wait to complete, or continue. Note that the orchestrator currently does not support scaling
# experiments up or down.
if not self._config.cluster_config.orchestrator.parallel_execution:
self.wait_for_jobs_to_complete(ret=True)
if self._config.cluster_config.orchestrator.parallel_execution:
self.wait_for_jobs_to_complete(ret=False)
self.stop()
logging.info('Experiment completed.')
# Stop experiment
self.stop()

0 comments on commit 763bf5e

Please sign in to comment.