Skip to content

Commit

Permalink
Update orchestrator with bugfixes found during testing
Browse files Browse the repository at this point in the history
  • Loading branch information
JMGaljaard committed Sep 18, 2022
1 parent 6ecc5cc commit 177f31d
Showing 1 changed file with 13 additions and 5 deletions.
18 changes: 13 additions & 5 deletions fltk/core/distributed/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,20 @@ def _create_config_maps(self, config_maps: Dict[str, V1ConfigMap]) -> None:
def wait_for_jobs_to_complete(self):
"""
Function to wait for all tasks to complete. This allows to wait for all the resources to free-up after running
an experiment. Thereby not letting running e
an experiment. Thereby allowing for running multiple experiments on a single cluster, without letting
experiments interfere with each other.
"""
while len(self.deployed_tasks) > 0:
task_to_move = set()
for task in self.deployed_tasks:
job_status = self._client.get_job_status(name=f"trainjob-{task.id}", namespace='test')
if job_status != 'Running':
try:
job_status = self._client.get_job_status(name=f"trainjob-{task.id}",
namespace='test')
except Exception as e:
logging.debug(msg=f"Could not retrieve job_status for {task.id}")
job_status = None

if job_status and job_status in {'Completed', 'Failed'}:
logging.info(f"{task.id} was completed with status: {job_status}, moving to completed")
task_to_move.add(task)
else:
Expand All @@ -231,7 +238,7 @@ def wait_for_jobs_to_complete(self):

class SimulatedOrchestrator(Orchestrator):
"""
Orchestrator implementation for Simulated arrivals. Currently, supports only Poisson inter-arrival times.
Orchestrator implementation for Simulated arrivals. Currently, currently only Poisson inter-arrival times.
"""

def __init__(self, cluster_mgr: ClusterManager, arrival_generator: ArrivalGenerator, config: DistributedConfig):
Expand Down Expand Up @@ -302,7 +309,8 @@ def run(self, clear: bool = False, experiment_replication: int = 1) -> None:
start_time = time.time()
if clear:
self._clear_jobs()
while self._alive and time.time() - start_time < self._config.get_duration():
duration = self._config.get_duration()
while self._alive and (duration < 0 or not time.time() - start_time < duration):
# 1. Check arrivals
# If new arrivals, store them in arrival PriorityQueue
while not self._arrival_generator.arrivals.empty():
Expand Down

0 comments on commit 177f31d

Please sign in to comment.