Skip to content

Commit

Permalink
Don't execute jobs on a server if the maximum job limit has been reac…
Browse files Browse the repository at this point in the history
…hed (#744)

Limit the overall number of simultaneous jobs on a server if the user
defined `max_simultaneous_jobs` in the servers dict in settings.
This feature was not tested yet.
  • Loading branch information
alongd authored Jun 25, 2024
2 parents 30e8601 + 8cafa91 commit 34ef50d
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 11 deletions.
3 changes: 2 additions & 1 deletion arc/job/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,8 @@ def write_submit_script(self) -> None:
}

if queue is None:
logger.warning(f'Queue not defined for server {self.server}. Assuming the queue name is defined in your submit.py script.')
logger.debug(f'Queue not defined for server {self.server}. '
f'Assuming the queue name is defined in your submit.py script.')
del format_params['queue']

try:
Expand Down
41 changes: 32 additions & 9 deletions arc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@
logger = get_logger()

LOWEST_MAJOR_TS_FREQ, HIGHEST_MAJOR_TS_FREQ, default_job_settings, \
default_job_types, rotor_scan_resolution, default_ts_adapters, max_rotor_trsh = \
default_job_types, default_ts_adapters, max_rotor_trsh, rotor_scan_resolution, servers_dict = \
settings['LOWEST_MAJOR_TS_FREQ'], settings['HIGHEST_MAJOR_TS_FREQ'], settings['default_job_settings'], \
settings['default_job_types'], settings['rotor_scan_resolution'], settings['ts_adapters'], settings['max_rotor_trsh']
settings['default_job_types'], settings['ts_adapters'], settings['max_rotor_trsh'], \
settings['rotor_scan_resolution'], settings['servers']


class Scheduler(object):
Expand Down Expand Up @@ -887,6 +888,7 @@ def run_job(self,
self.job_dict[label]['tsg'][tsg] = job # save job object
if job.server is not None and job.server not in self.servers:
self.servers.append(job.server)
self.check_max_simultaneous_jobs_limit(job.server)
job.execute()
self.save_restart_dict()

Expand Down Expand Up @@ -3080,17 +3082,21 @@ def check_all_done(self, label: str):
# Update restart dictionary and save the yaml restart file:
self.save_restart_dict()

def get_server_job_ids(self):
def get_server_job_ids(self, specific_server: Optional[str] = None):
"""
Check job status on all active servers, get a list of relevant running job IDs.
Check job status on a specific server or on all active servers, get a list of relevant running job IDs.
Args:
specific_server (str, optional): The server to check. If ``None``, check all active servers.
"""
self.server_job_ids = list()
for server in self.servers:
if server != 'local':
with SSHClient(server) as ssh:
self.server_job_ids.extend(ssh.check_running_jobs_ids())
else:
self.server_job_ids.extend(check_running_jobs_ids())
if specific_server is None or server == specific_server:
if server != 'local':
with SSHClient(server) as ssh:
self.server_job_ids.extend(ssh.check_running_jobs_ids())
else:
self.server_job_ids.extend(check_running_jobs_ids())

def get_completed_incore_jobs(self):
"""
Expand Down Expand Up @@ -3794,6 +3800,23 @@ def save_e_elect(self, label: str):
content[label] = self.species_dict[label].e_elect
save_yaml_file(path=path, content=content)

def check_max_simultaneous_jobs_limit(self, server: Optional[str]):
"""
Check if the number of running jobs on the server is not above the set server limit.
Args:
server (str): The server name.
"""
if server is not None and 'max_simultaneous_jobs' in servers_dict[server]:
continue_lopping = True
while continue_lopping:
self.get_server_job_ids(specific_server=server)
if len(self.server_job_ids) >= servers_dict[server]['max_simultaneous_jobs']:
time.sleep(90)
else:
continue_lopping = False
self.get_server_job_ids()


def species_has_freq(species_output_dict: dict,
yml_path: Optional[str] = None,
Expand Down
3 changes: 2 additions & 1 deletion arc/settings/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
'address': 'server1.host.edu',
'un': '<username>',
'key': 'path_to_rsa_key',
'max_simultaneous_jobs': 10, # optional, "check_status_command" must be set to only return jobs for your user
},
'server2': {
'cluster_soft': 'Slurm',
Expand All @@ -58,7 +59,7 @@
'cluster_soft': 'HTCondor',
'un': '<username>',
'cpus': 48,
'queues': {'':''}, #{'queue_name':'HH:MM:SS'}
'queues': {'':''}, # {'queue_name':'HH:MM:SS'}
'excluded_queues': ['queue_name1', 'queue_name2'],
},
}
Expand Down

0 comments on commit 34ef50d

Please sign in to comment.