From 954cbdd3ee5127d6618db9d144508505e41cffcc Mon Sep 17 00:00:00 2001 From: Sebastiaan Huber Date: Thu, 11 Jul 2024 22:52:44 +0200 Subject: [PATCH] `Scheduler`: Refactor interface to make it more generic (#6043) The original `Scheduler` interface made the assumption that all interfaces would interact with the scheduler through a command line interface that would be invoked through a bash shell. However, this is not always the case. Prime example is the new FirecREST service, being developed by CSCS, that will allow to interact with the scheduler through a REST API. Due to the assumptions of the `Scheduler` interface, it was difficult to implement it for this use case. The `Scheduler` interface is made more generic, by removing the following (abstract) methods: * `_get_joblist_command` * `_parse_joblist_output` * `_get_submit_command` * `_parse_submit_output` * `submit_from_script` * `kill` * `_get_kill_command` * `_parse_kill_output` They are replaced by three abstract methods: * `submit_job` * `get_jobs` * `kill_job` The new interface no longer makes an assumption about how a plugin implements these methods. The first one should simply submit the job, given the location of the submission script on the remote computer. The second should return the status of the list of active jobs. And the final should kill a job and return the result. Unfortunately, this change is backwards incompatible and will break existing scheduler plugins. To simplify the migration pathway, a subclass `BashCliScheduler` is added. This implements the new `Scheduler` interface while maintaining the old interface. This means that this new class is a drop-in replacement of the old `Scheduler` class for existing plugins. The plugins that ship with `aiida-core` are all updated to subclass from `BashCliScheduler`. Any existing plugins that subclassed from these plugins will therefore not be affected whatsoever by these changes. --- src/aiida/engine/daemon/execmanager.py | 10 +- src/aiida/engine/processes/calcjobs/tasks.py | 2 +- src/aiida/schedulers/__init__.py | 2 + src/aiida/schedulers/plugins/__init__.py | 3 + src/aiida/schedulers/plugins/bash.py | 123 ++++++++++++++++ src/aiida/schedulers/plugins/direct.py | 4 +- src/aiida/schedulers/plugins/lsf.py | 7 +- .../schedulers/plugins/pbsbaseclasses.py | 7 +- src/aiida/schedulers/plugins/sge.py | 4 +- src/aiida/schedulers/plugins/slurm.py | 4 +- src/aiida/schedulers/scheduler.py | 138 +++++------------- .../processes/calcjobs/test_calc_job.py | 4 +- tests/transports/test_all_plugins.py | 2 +- 13 files changed, 194 insertions(+), 116 deletions(-) create mode 100644 src/aiida/schedulers/plugins/bash.py diff --git a/src/aiida/engine/daemon/execmanager.py b/src/aiida/engine/daemon/execmanager.py index 6f2a42fa15..045347404c 100644 --- a/src/aiida/engine/daemon/execmanager.py +++ b/src/aiida/engine/daemon/execmanager.py @@ -397,7 +397,7 @@ def submit_calculation(calculation: CalcJobNode, transport: Transport) -> str | :param calculation: the instance of CalcJobNode to submit. :param transport: an already opened transport to use to submit the calculation. - :return: the job id as returned by the scheduler `submit_from_script` call + :return: the job id as returned by the scheduler `submit_job` call """ job_id = calculation.get_job_id() @@ -414,7 +414,7 @@ def submit_calculation(calculation: CalcJobNode, transport: Transport) -> str | submit_script_filename = calculation.get_option('submit_script_filename') workdir = calculation.get_remote_workdir() - result = scheduler.submit_from_script(workdir, submit_script_filename) + result = scheduler.submit_job(workdir, submit_script_filename) if isinstance(result, str): calculation.set_job_id(result) @@ -572,7 +572,7 @@ def kill_calculation(calculation: CalcJobNode, transport: Transport) -> None: scheduler.set_transport(transport) # Call the proper kill method for the job ID of this calculation - result = scheduler.kill(job_id) + result = scheduler.kill_job(job_id) if result is not True: # Failed to kill because the job might have already been completed @@ -581,10 +581,10 @@ def kill_calculation(calculation: CalcJobNode, transport: Transport) -> None: # If the job is returned it is still running and the kill really failed, so we raise if job is not None and job.job_state != JobState.DONE: - raise exceptions.RemoteOperationError(f'scheduler.kill({job_id}) was unsuccessful') + raise exceptions.RemoteOperationError(f'scheduler.kill_job({job_id}) was unsuccessful') else: EXEC_LOGGER.warning( - 'scheduler.kill() failed but job<{%s}> no longer seems to be running regardless', job_id + 'scheduler.kill_job() failed but job<{%s}> no longer seems to be running regardless', job_id ) diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index 085e5c1f50..8b8231634f 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -510,7 +510,7 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override result = await self._launch_task(task_submit_job, node, transport_queue) if isinstance(result, ExitCode): - # The scheduler plugin returned an exit code from ``Scheduler.submit_from_script`` indicating the + # The scheduler plugin returned an exit code from ``Scheduler.submit_job`` indicating the # job submission failed due to a non-transient problem and the job should be terminated. return self.create_state(ProcessState.RUNNING, self.process.terminate, result) diff --git a/src/aiida/schedulers/__init__.py b/src/aiida/schedulers/__init__.py index b81d7f79c4..748e23b5d5 100644 --- a/src/aiida/schedulers/__init__.py +++ b/src/aiida/schedulers/__init__.py @@ -13,9 +13,11 @@ # fmt: off from .datastructures import * +from .plugins import * from .scheduler import * __all__ = ( + 'BashCliScheduler', 'JobInfo', 'JobResource', 'JobState', diff --git a/src/aiida/schedulers/plugins/__init__.py b/src/aiida/schedulers/plugins/__init__.py index c56ff0a1f8..cae7feed47 100644 --- a/src/aiida/schedulers/plugins/__init__.py +++ b/src/aiida/schedulers/plugins/__init__.py @@ -6,3 +6,6 @@ # For further information on the license, see the LICENSE.txt file # # For further information please visit http://www.aiida.net # ########################################################################### +from .bash import BashCliScheduler + +__all__ = ('BashCliScheduler',) diff --git a/src/aiida/schedulers/plugins/bash.py b/src/aiida/schedulers/plugins/bash.py new file mode 100644 index 0000000000..0511a4cb99 --- /dev/null +++ b/src/aiida/schedulers/plugins/bash.py @@ -0,0 +1,123 @@ +########################################################################### +# Copyright (c), The AiiDA team. All rights reserved. # +# This file is part of the AiiDA code. # +# # +# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core # +# For further information on the license, see the LICENSE.txt file # +# For further information please visit http://www.aiida.net # +########################################################################### +"""Job scheduler that is interacted with through a CLI in bash.""" + +from __future__ import annotations + +import abc + +from aiida.common.escaping import escape_for_bash +from aiida.engine.processes.exit_code import ExitCode +from aiida.schedulers.datastructures import JobInfo +from aiida.schedulers.scheduler import Scheduler, SchedulerError + +__all__ = ('BashCliScheduler',) + + +class BashCliScheduler(Scheduler, metaclass=abc.ABCMeta): + """Job scheduler that is interacted with through a CLI in bash.""" + + def submit_job(self, working_directory: str, filename: str) -> str | ExitCode: + """Submit a job. + + :param working_directory: The absolute filepath to the working directory where the job is to be exectued. + :param filename: The filename of the submission script relative to the working directory. + """ + self.transport.chdir(working_directory) + result = self.transport.exec_command_wait(self._get_submit_command(escape_for_bash(filename))) + return self._parse_submit_output(*result) + + def get_jobs( + self, + jobs: list[str] | None = None, + user: str | None = None, + as_dict: bool = False, + ) -> list[JobInfo] | dict[str, JobInfo]: + """Return the list of currently active jobs. + + :param jobs: A list of jobs to check; only these are checked. + :param user: A string with a user: only jobs of this user are checked. + :param as_dict: If ``False`` (default), a list of ``JobInfo`` objects is returned. If ``True``, a dictionary is + returned, where the ``job_id`` is the key and the values are the ``JobInfo`` objects. + :returns: List of active jobs. + """ + with self.transport: + retval, stdout, stderr = self.transport.exec_command_wait(self._get_joblist_command(jobs=jobs, user=user)) + + joblist = self._parse_joblist_output(retval, stdout, stderr) + if as_dict: + jobdict = {job.job_id: job for job in joblist} + if None in jobdict: + raise SchedulerError('Found at least one job without jobid') + return jobdict + + return joblist + + def kill_job(self, jobid: str) -> bool: + """Kill a remote job and parse the return value of the scheduler to check if the command succeeded. + + ..note:: + + On some schedulers, even if the command is accepted, it may take some seconds for the job to actually + disappear from the queue. + + :param jobid: the job ID to be killed + :returns: True if everything seems ok, False otherwise. + """ + retval, stdout, stderr = self.transport.exec_command_wait(self._get_kill_command(jobid)) + return self._parse_kill_output(retval, stdout, stderr) + + @abc.abstractmethod + def _get_submit_command(self, submit_script: str) -> str: + """Return the string to execute to submit a given script. + + .. warning:: the `submit_script` should already have been bash-escaped + + :param submit_script: the path of the submit script relative to the working directory. + :return: the string to execute to submit a given script. + """ + + @abc.abstractmethod + def _parse_submit_output(self, retval: int, stdout: str, stderr: str) -> str | ExitCode: + """Parse the output of the submit command returned by calling the `_get_submit_command` command. + + :return: a string with the job ID or an exit code if the submission failed because the submission script is + invalid and the job should be terminated. + """ + + @abc.abstractmethod + def _get_joblist_command(self, jobs: list[str] | None = None, user: str | None = None) -> str: + """Return the command to get the most complete description possible of currently active jobs. + + .. note:: + + Typically one can pass only either jobs or user, depending on the specific plugin. The choice can be done + according to the value returned by `self.get_feature('can_query_by_user')` + + :param jobs: either None to get a list of all jobs in the machine, or a list of jobs. + :param user: either None, or a string with the username (to show only jobs of the specific user). + """ + + @abc.abstractmethod + def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list[JobInfo]: + """Parse the joblist output as returned by executing the command returned by `_get_joblist_command` method. + + :return: list of `JobInfo` objects, one of each job each with at least its default params implemented. + """ + + @abc.abstractmethod + def _get_kill_command(self, jobid: str) -> str: + """Return the command to kill the job with specified jobid.""" + + @abc.abstractmethod + def _parse_kill_output(self, retval: int, stdout: str, stderr: str) -> bool: + """Parse the output of the kill command. + + :return: True if everything seems ok, False otherwise. + """ diff --git a/src/aiida/schedulers/plugins/direct.py b/src/aiida/schedulers/plugins/direct.py index 21d368a15f..78421acb73 100644 --- a/src/aiida/schedulers/plugins/direct.py +++ b/src/aiida/schedulers/plugins/direct.py @@ -13,6 +13,8 @@ from aiida.schedulers import SchedulerError from aiida.schedulers.datastructures import JobInfo, JobState, NodeNumberJobResource +from .bash import BashCliScheduler + ## From the ps man page on Mac OS X 10.12 # state The state is given by a sequence of characters, for example, # ``RWNA''. The first character indicates the run state of the @@ -74,7 +76,7 @@ def accepts_default_memory_per_machine(cls): return False -class DirectScheduler(aiida.schedulers.Scheduler): +class DirectScheduler(BashCliScheduler): """Support for the direct execution bypassing schedulers.""" _logger = aiida.schedulers.Scheduler._logger.getChild('direct') diff --git a/src/aiida/schedulers/plugins/lsf.py b/src/aiida/schedulers/plugins/lsf.py index aafeb2d167..33512ba944 100644 --- a/src/aiida/schedulers/plugins/lsf.py +++ b/src/aiida/schedulers/plugins/lsf.py @@ -16,6 +16,8 @@ from aiida.schedulers import SchedulerError, SchedulerParsingError from aiida.schedulers.datastructures import JobInfo, JobResource, JobState +from .bash import BashCliScheduler + # This maps LSF status codes to our own state list # # List of states from @@ -167,9 +169,10 @@ def accepts_default_mpiprocs_per_machine(cls): return False -class LsfScheduler(aiida.schedulers.Scheduler): +class LsfScheduler(BashCliScheduler): """Support for the IBM LSF scheduler - 'https://www-01.ibm.com/support/knowledgecenter/SSETD4_9.1.2/lsf_welcome.html' + + https://www-01.ibm.com/support/knowledgecenter/SSETD4_9.1.2/lsf_welcome.html """ _logger = aiida.schedulers.Scheduler._logger.getChild('lsf') diff --git a/src/aiida/schedulers/plugins/pbsbaseclasses.py b/src/aiida/schedulers/plugins/pbsbaseclasses.py index 12331010aa..bcceeae19d 100644 --- a/src/aiida/schedulers/plugins/pbsbaseclasses.py +++ b/src/aiida/schedulers/plugins/pbsbaseclasses.py @@ -11,9 +11,11 @@ import logging from aiida.common.escaping import escape_for_bash -from aiida.schedulers import Scheduler, SchedulerError, SchedulerParsingError +from aiida.schedulers import SchedulerError, SchedulerParsingError from aiida.schedulers.datastructures import JobInfo, JobState, MachineInfo, NodeNumberJobResource +from .bash import BashCliScheduler + _LOGGER = logging.getLogger(__name__) # This maps PbsPro status letters to our own status list @@ -95,8 +97,9 @@ def validate_resources(cls, **kwargs): return resources -class PbsBaseClass(Scheduler): +class PbsBaseClass(BashCliScheduler): """Base class with support for the PBSPro scheduler + (http://www.pbsworks.com/) and for PBS and Torque (http://www.adaptivecomputing.com/products/open-source/torque/). diff --git a/src/aiida/schedulers/plugins/sge.py b/src/aiida/schedulers/plugins/sge.py index fbc6a74233..c01381ba3d 100644 --- a/src/aiida/schedulers/plugins/sge.py +++ b/src/aiida/schedulers/plugins/sge.py @@ -21,6 +21,8 @@ from aiida.schedulers import SchedulerError, SchedulerParsingError from aiida.schedulers.datastructures import JobInfo, JobState, ParEnvJobResource +from .bash import BashCliScheduler + # 'http://www.loni.ucla.edu/twiki/bin/view/Infrastructure/GridComputing?skin=plain': # Jobs Status: # 'qw' - Queued and waiting, @@ -88,7 +90,7 @@ class SgeJobResource(ParEnvJobResource): pass -class SgeScheduler(aiida.schedulers.Scheduler): +class SgeScheduler(BashCliScheduler): """Support for the Sun Grid Engine scheduler and its variants/forks (Son of Grid Engine, Oracle Grid Engine, ...)""" _logger = aiida.schedulers.Scheduler._logger.getChild('sge') diff --git a/src/aiida/schedulers/plugins/slurm.py b/src/aiida/schedulers/plugins/slurm.py index 77ddc2711e..0ef2568c22 100644 --- a/src/aiida/schedulers/plugins/slurm.py +++ b/src/aiida/schedulers/plugins/slurm.py @@ -16,6 +16,8 @@ from aiida.schedulers import Scheduler, SchedulerError from aiida.schedulers.datastructures import JobInfo, JobState, NodeNumberJobResource +from .bash import BashCliScheduler + # This maps SLURM state codes to our own status list ## List of states from the man page of squeue @@ -141,7 +143,7 @@ def validate_resources(cls, **kwargs): return resources -class SlurmScheduler(Scheduler): +class SlurmScheduler(BashCliScheduler): """Support for the SLURM scheduler (http://slurm.schedmd.com/).""" _logger = Scheduler._logger.getChild('slurm') diff --git a/src/aiida/schedulers/scheduler.py b/src/aiida/schedulers/scheduler.py index 571b132672..5168762f80 100644 --- a/src/aiida/schedulers/scheduler.py +++ b/src/aiida/schedulers/scheduler.py @@ -125,6 +125,44 @@ def create_job_resource(cls, **kwargs): assert cls._job_resource_class is not None and issubclass(cls._job_resource_class, JobResource) return cls._job_resource_class(**kwargs) + @abc.abstractmethod + def submit_job(self, working_directory: str, filename: str) -> str | ExitCode: + """Submit a job. + + :param working_directory: The absolute filepath to the working directory where the job is to be exectued. + :param filename: The filename of the submission script relative to the working directory. + :returns: + """ + + @abc.abstractmethod + def get_jobs( + self, + jobs: list[str] | None = None, + user: str | None = None, + as_dict: bool = False, + ) -> list[JobInfo] | dict[str, JobInfo]: + """Return the list of currently active jobs. + + :param jobs: A list of jobs to check; only these are checked. + :param user: A string with a user: only jobs of this user are checked. + :param as_dict: If ``False`` (default), a list of ``JobInfo`` objects is returned. If ``True``, a dictionary is + returned, where the ``job_id`` is the key and the values are the ``JobInfo`` objects. + :returns: List of active jobs. + """ + + @abc.abstractmethod + def kill_job(self, jobid: str) -> bool: + """Kill a remote job and parse the return value of the scheduler to check if the command succeeded. + + ..note:: + + On some schedulers, even if the command is accepted, it may take some seconds for the job to actually + disappear from the queue. + + :param jobid: the job ID to be killed + :returns: True if everything seems ok, False otherwise. + """ + def get_submit_script(self, job_tmpl: JobTemplate) -> str: """Return the submit script as a string. @@ -287,19 +325,6 @@ def _get_run_line(self, codes_info: list[JobTemplateCodeInfo], codes_run_mode: C raise NotImplementedError('Unrecognized code run mode') - @abc.abstractmethod - def _get_joblist_command(self, jobs: list[str] | None = None, user: str | None = None) -> str: - """Return the command to get the most complete description possible of currently active jobs. - - .. note:: - - Typically one can pass only either jobs or user, depending on the specific plugin. The choice can be done - according to the value returned by `self.get_feature('can_query_by_user')` - - :param jobs: either None to get a list of all jobs in the machine, or a list of jobs. - :param user: either None, or a string with the username (to show only jobs of the specific user). - """ - def _get_detailed_job_info_command(self, job_id: str) -> dict[str, t.Any]: """Return the command to run to get detailed information for a given job. @@ -332,41 +357,6 @@ def get_detailed_job_info(self, job_id: str) -> dict[str, str | int]: return detailed_job_info - @abc.abstractmethod - def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list[JobInfo]: - """Parse the joblist output as returned by executing the command returned by `_get_joblist_command` method. - - :return: list of `JobInfo` objects, one of each job each with at least its default params implemented. - """ - - def get_jobs( - self, - jobs: list[str] | None = None, - user: str | None = None, - as_dict: bool = False, - ) -> list[JobInfo] | dict[str, JobInfo]: - """Return the list of currently active jobs. - - .. note:: typically, only either jobs or user can be specified. See also comments in `_get_joblist_command`. - - :param list jobs: a list of jobs to check; only these are checked - :param str user: a string with a user: only jobs of this user are checked - :param list as_dict: if False (default), a list of JobInfo objects is returned. If True, a dictionary is - returned, having as key the job_id and as value the JobInfo object. - :return: list of active jobs - """ - with self.transport: - retval, stdout, stderr = self.transport.exec_command_wait(self._get_joblist_command(jobs=jobs, user=user)) - - joblist = self._parse_joblist_output(retval, stdout, stderr) - if as_dict: - jobdict = {job.job_id: job for job in joblist} - if None in jobdict: - raise SchedulerError('Found at least one job without jobid') - return jobdict - - return joblist - @property def transport(self): """Return the transport set for this scheduler.""" @@ -382,58 +372,6 @@ def set_transport(self, transport: Transport): """ self._transport = transport - @abc.abstractmethod - def _get_submit_command(self, submit_script: str) -> str: - """Return the string to execute to submit a given script. - - .. warning:: the `submit_script` should already have been bash-escaped - - :param submit_script: the path of the submit script relative to the working directory. - :return: the string to execute to submit a given script. - """ - - @abc.abstractmethod - def _parse_submit_output(self, retval: int, stdout: str, stderr: str) -> str | ExitCode: - """Parse the output of the submit command returned by calling the `_get_submit_command` command. - - :return: a string with the job ID or an exit code if the submission failed because the submission script is - invalid and the job should be terminated. - """ - - def submit_from_script(self, working_directory: str, submit_script: str) -> str | ExitCode: - """Submit the submission script to the scheduler. - - :return: return a string with the job ID in a valid format to be used for querying. - """ - self.transport.chdir(working_directory) - result = self.transport.exec_command_wait(self._get_submit_command(escape_for_bash(submit_script))) - return self._parse_submit_output(*result) - - def kill(self, jobid: str) -> bool: - """Kill a remote job and parse the return value of the scheduler to check if the command succeeded. - - ..note:: - - On some schedulers, even if the command is accepted, it may take some seconds for the job to actually - disappear from the queue. - - :param jobid: the job ID to be killed - :return: True if everything seems ok, False otherwise. - """ - retval, stdout, stderr = self.transport.exec_command_wait(self._get_kill_command(jobid)) - return self._parse_kill_output(retval, stdout, stderr) - - @abc.abstractmethod - def _get_kill_command(self, jobid: str) -> str: - """Return the command to kill the job with specified jobid.""" - - @abc.abstractmethod - def _parse_kill_output(self, retval: int, stdout: str, stderr: str) -> bool: - """Parse the output of the kill command. - - :return: True if everything seems ok, False otherwise. - """ - def parse_output( self, detailed_job_info: dict[str, str | int] | None = None, diff --git a/tests/engine/processes/calcjobs/test_calc_job.py b/tests/engine/processes/calcjobs/test_calc_job.py index 4583d9ea18..4e679d5cdf 100644 --- a/tests/engine/processes/calcjobs/test_calc_job.py +++ b/tests/engine/processes/calcjobs/test_calc_job.py @@ -1273,10 +1273,10 @@ def test_monitor_result_action_disable_self(get_calcjob_builder, entry_points, c def test_submit_return_exit_code(get_calcjob_builder, monkeypatch): - """Test that a job is terminated if ``Scheduler.submit_from_script`` returns an exit code. + """Test that a job is terminated if ``Scheduler.submit_job`` returns an exit code. To simulate this situation we monkeypatch ``DirectScheduler._parse_submit_output`` because that is the method that - is called internally by ``Scheduler.submit_from_script`` and it returns its result, and the ``DirectScheduler`` is + is called internally by ``Scheduler.submit_job`` and it returns its result, and the ``DirectScheduler`` is the plugin that is used by the localhost computer used in the inputs for this calcjob. """ from aiida.schedulers.plugins.direct import DirectScheduler diff --git a/tests/transports/test_all_plugins.py b/tests/transports/test_all_plugins.py index 517efe106a..986dd465a9 100644 --- a/tests/transports/test_all_plugins.py +++ b/tests/transports/test_all_plugins.py @@ -1301,7 +1301,7 @@ def test_asynchronous_execution(custom_transport): transport.putfile(tmpf.name, script_fname) timestamp_before = time.time() - job_id_string = scheduler.submit_from_script('/tmp', script_fname) + job_id_string = scheduler.submit_job('/tmp', script_fname) elapsed_time = time.time() - timestamp_before # We want to get back control. If it takes < 5 seconds, it means that it is not blocking