diff --git a/fltk/core/distributed/orchestrator.py b/fltk/core/distributed/orchestrator.py index 699591a9..154bf560 100644 --- a/fltk/core/distributed/orchestrator.py +++ b/fltk/core/distributed/orchestrator.py @@ -212,13 +212,20 @@ def _create_config_maps(self, config_maps: Dict[str, V1ConfigMap]) -> None: def wait_for_jobs_to_complete(self): """ Function to wait for all tasks to complete. This allows to wait for all the resources to free-up after running - an experiment. Thereby not letting running e + an experiment. Thereby allowing for running multiple experiments on a single cluster, without letting + experiments interfere with each other. """ while len(self.deployed_tasks) > 0: task_to_move = set() for task in self.deployed_tasks: - job_status = self._client.get_job_status(name=f"trainjob-{task.id}", namespace='test') - if job_status != 'Running': + try: + job_status = self._client.get_job_status(name=f"trainjob-{task.id}", + namespace='test') + except Exception as e: + logging.debug(msg=f"Could not retrieve job_status for {task.id}") + job_status = None + + if job_status and job_status in {'Completed', 'Failed'}: logging.info(f"{task.id} was completed with status: {job_status}, moving to completed") task_to_move.add(task) else: @@ -231,7 +238,7 @@ def wait_for_jobs_to_complete(self): class SimulatedOrchestrator(Orchestrator): """ - Orchestrator implementation for Simulated arrivals. Currently, supports only Poisson inter-arrival times. + Orchestrator implementation for Simulated arrivals. Currently, currently only Poisson inter-arrival times. """ def __init__(self, cluster_mgr: ClusterManager, arrival_generator: ArrivalGenerator, config: DistributedConfig): @@ -302,7 +309,8 @@ def run(self, clear: bool = False, experiment_replication: int = 1) -> None: start_time = time.time() if clear: self._clear_jobs() - while self._alive and time.time() - start_time < self._config.get_duration(): + duration = self._config.get_duration() + while self._alive and (duration < 0 or not time.time() - start_time < duration): # 1. Check arrivals # If new arrivals, store them in arrival PriorityQueue while not self._arrival_generator.arrivals.empty():