Skip to content

Commit

Permalink
Implemented check_max_simultaneous_jobs_limit() in Scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
alongd committed Jun 25, 2024
1 parent 88572f4 commit 358cc27
Showing 1 changed file with 32 additions and 9 deletions.
41 changes: 32 additions & 9 deletions arc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@
logger = get_logger()

LOWEST_MAJOR_TS_FREQ, HIGHEST_MAJOR_TS_FREQ, default_job_settings, \
default_job_types, rotor_scan_resolution, default_ts_adapters, max_rotor_trsh = \
default_job_types, default_ts_adapters, max_rotor_trsh, rotor_scan_resolution, servers_dict = \
settings['LOWEST_MAJOR_TS_FREQ'], settings['HIGHEST_MAJOR_TS_FREQ'], settings['default_job_settings'], \
settings['default_job_types'], settings['rotor_scan_resolution'], settings['ts_adapters'], settings['max_rotor_trsh']
settings['default_job_types'], settings['ts_adapters'], settings['max_rotor_trsh'], \
settings['rotor_scan_resolution'], settings['servers']


class Scheduler(object):
Expand Down Expand Up @@ -887,6 +888,7 @@ def run_job(self,
self.job_dict[label]['tsg'][tsg] = job # save job object
if job.server is not None and job.server not in self.servers:
self.servers.append(job.server)
self.check_max_simultaneous_jobs_limit(job.server)
job.execute()
self.save_restart_dict()

Expand Down Expand Up @@ -3080,17 +3082,21 @@ def check_all_done(self, label: str):
# Update restart dictionary and save the yaml restart file:
self.save_restart_dict()

def get_server_job_ids(self):
def get_server_job_ids(self, specific_server: Optional[str] = None):
"""
Check job status on all active servers, get a list of relevant running job IDs.
Check job status on a specific server or on all active servers, get a list of relevant running job IDs.
Args:
specific_server (str, optional): The server to check. If ``None``, check all active servers.
"""
self.server_job_ids = list()
for server in self.servers:
if server != 'local':
with SSHClient(server) as ssh:
self.server_job_ids.extend(ssh.check_running_jobs_ids())
else:
self.server_job_ids.extend(check_running_jobs_ids())
if specific_server is None or server == specific_server:
if server != 'local':
with SSHClient(server) as ssh:
self.server_job_ids.extend(ssh.check_running_jobs_ids())
else:
self.server_job_ids.extend(check_running_jobs_ids())

def get_completed_incore_jobs(self):
"""
Expand Down Expand Up @@ -3794,6 +3800,23 @@ def save_e_elect(self, label: str):
content[label] = self.species_dict[label].e_elect
save_yaml_file(path=path, content=content)

def check_max_simultaneous_jobs_limit(self, server: Optional[str]):
"""
Check if the number of running jobs on the server is not above the set server limit.
Args:
server (str): The server name.
"""
if server is not None and 'max_simultaneous_jobs' in servers_dict[server]:
continue_lopping = True
while continue_lopping:
self.get_server_job_ids(specific_server=server)
if len(self.server_job_ids) >= servers_dict[server]['max_simultaneous_jobs']:
time.sleep(90)
else:
continue_lopping = False
self.get_server_job_ids()


def species_has_freq(species_output_dict: dict,
yml_path: Optional[str] = None,
Expand Down

0 comments on commit 358cc27

Please sign in to comment.