diff --git a/src/aiida/engine/daemon/execmanager.py b/src/aiida/engine/daemon/execmanager.py index 6f2a42fa15..045347404c 100644 --- a/src/aiida/engine/daemon/execmanager.py +++ b/src/aiida/engine/daemon/execmanager.py @@ -397,7 +397,7 @@ def submit_calculation(calculation: CalcJobNode, transport: Transport) -> str | :param calculation: the instance of CalcJobNode to submit. :param transport: an already opened transport to use to submit the calculation. - :return: the job id as returned by the scheduler `submit_from_script` call + :return: the job id as returned by the scheduler `submit_job` call """ job_id = calculation.get_job_id() @@ -414,7 +414,7 @@ def submit_calculation(calculation: CalcJobNode, transport: Transport) -> str | submit_script_filename = calculation.get_option('submit_script_filename') workdir = calculation.get_remote_workdir() - result = scheduler.submit_from_script(workdir, submit_script_filename) + result = scheduler.submit_job(workdir, submit_script_filename) if isinstance(result, str): calculation.set_job_id(result) @@ -572,7 +572,7 @@ def kill_calculation(calculation: CalcJobNode, transport: Transport) -> None: scheduler.set_transport(transport) # Call the proper kill method for the job ID of this calculation - result = scheduler.kill(job_id) + result = scheduler.kill_job(job_id) if result is not True: # Failed to kill because the job might have already been completed @@ -581,10 +581,10 @@ def kill_calculation(calculation: CalcJobNode, transport: Transport) -> None: # If the job is returned it is still running and the kill really failed, so we raise if job is not None and job.job_state != JobState.DONE: - raise exceptions.RemoteOperationError(f'scheduler.kill({job_id}) was unsuccessful') + raise exceptions.RemoteOperationError(f'scheduler.kill_job({job_id}) was unsuccessful') else: EXEC_LOGGER.warning( - 'scheduler.kill() failed but job<{%s}> no longer seems to be running regardless', job_id + 'scheduler.kill_job() failed but job<{%s}> no longer seems to be running regardless', job_id ) diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index 085e5c1f50..8b8231634f 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -510,7 +510,7 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override result = await self._launch_task(task_submit_job, node, transport_queue) if isinstance(result, ExitCode): - # The scheduler plugin returned an exit code from ``Scheduler.submit_from_script`` indicating the + # The scheduler plugin returned an exit code from ``Scheduler.submit_job`` indicating the # job submission failed due to a non-transient problem and the job should be terminated. return self.create_state(ProcessState.RUNNING, self.process.terminate, result) diff --git a/src/aiida/schedulers/__init__.py b/src/aiida/schedulers/__init__.py index b81d7f79c4..748e23b5d5 100644 --- a/src/aiida/schedulers/__init__.py +++ b/src/aiida/schedulers/__init__.py @@ -13,9 +13,11 @@ # fmt: off from .datastructures import * +from .plugins import * from .scheduler import * __all__ = ( + 'BashCliScheduler', 'JobInfo', 'JobResource', 'JobState', diff --git a/src/aiida/schedulers/plugins/__init__.py b/src/aiida/schedulers/plugins/__init__.py index c56ff0a1f8..cae7feed47 100644 --- a/src/aiida/schedulers/plugins/__init__.py +++ b/src/aiida/schedulers/plugins/__init__.py @@ -6,3 +6,6 @@ # For further information on the license, see the LICENSE.txt file # # For further information please visit http://www.aiida.net # ########################################################################### +from .bash import BashCliScheduler + +__all__ = ('BashCliScheduler',) diff --git a/src/aiida/schedulers/plugins/bash.py b/src/aiida/schedulers/plugins/bash.py new file mode 100644 index 0000000000..0511a4cb99 --- /dev/null +++ b/src/aiida/schedulers/plugins/bash.py @@ -0,0 +1,123 @@ +########################################################################### +# Copyright (c), The AiiDA team. All rights reserved. # +# This file is part of the AiiDA code. # +# # +# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core # +# For further information on the license, see the LICENSE.txt file # +# For further information please visit http://www.aiida.net # +########################################################################### +"""Job scheduler that is interacted with through a CLI in bash.""" + +from __future__ import annotations + +import abc + +from aiida.common.escaping import escape_for_bash +from aiida.engine.processes.exit_code import ExitCode +from aiida.schedulers.datastructures import JobInfo +from aiida.schedulers.scheduler import Scheduler, SchedulerError + +__all__ = ('BashCliScheduler',) + + +class BashCliScheduler(Scheduler, metaclass=abc.ABCMeta): + """Job scheduler that is interacted with through a CLI in bash.""" + + def submit_job(self, working_directory: str, filename: str) -> str | ExitCode: + """Submit a job. + + :param working_directory: The absolute filepath to the working directory where the job is to be exectued. + :param filename: The filename of the submission script relative to the working directory. + """ + self.transport.chdir(working_directory) + result = self.transport.exec_command_wait(self._get_submit_command(escape_for_bash(filename))) + return self._parse_submit_output(*result) + + def get_jobs( + self, + jobs: list[str] | None = None, + user: str | None = None, + as_dict: bool = False, + ) -> list[JobInfo] | dict[str, JobInfo]: + """Return the list of currently active jobs. + + :param jobs: A list of jobs to check; only these are checked. + :param user: A string with a user: only jobs of this user are checked. + :param as_dict: If ``False`` (default), a list of ``JobInfo`` objects is returned. If ``True``, a dictionary is + returned, where the ``job_id`` is the key and the values are the ``JobInfo`` objects. + :returns: List of active jobs. + """ + with self.transport: + retval, stdout, stderr = self.transport.exec_command_wait(self._get_joblist_command(jobs=jobs, user=user)) + + joblist = self._parse_joblist_output(retval, stdout, stderr) + if as_dict: + jobdict = {job.job_id: job for job in joblist} + if None in jobdict: + raise SchedulerError('Found at least one job without jobid') + return jobdict + + return joblist + + def kill_job(self, jobid: str) -> bool: + """Kill a remote job and parse the return value of the scheduler to check if the command succeeded. + + ..note:: + + On some schedulers, even if the command is accepted, it may take some seconds for the job to actually + disappear from the queue. + + :param jobid: the job ID to be killed + :returns: True if everything seems ok, False otherwise. + """ + retval, stdout, stderr = self.transport.exec_command_wait(self._get_kill_command(jobid)) + return self._parse_kill_output(retval, stdout, stderr) + + @abc.abstractmethod + def _get_submit_command(self, submit_script: str) -> str: + """Return the string to execute to submit a given script. + + .. warning:: the `submit_script` should already have been bash-escaped + + :param submit_script: the path of the submit script relative to the working directory. + :return: the string to execute to submit a given script. + """ + + @abc.abstractmethod + def _parse_submit_output(self, retval: int, stdout: str, stderr: str) -> str | ExitCode: + """Parse the output of the submit command returned by calling the `_get_submit_command` command. + + :return: a string with the job ID or an exit code if the submission failed because the submission script is + invalid and the job should be terminated. + """ + + @abc.abstractmethod + def _get_joblist_command(self, jobs: list[str] | None = None, user: str | None = None) -> str: + """Return the command to get the most complete description possible of currently active jobs. + + .. note:: + + Typically one can pass only either jobs or user, depending on the specific plugin. The choice can be done + according to the value returned by `self.get_feature('can_query_by_user')` + + :param jobs: either None to get a list of all jobs in the machine, or a list of jobs. + :param user: either None, or a string with the username (to show only jobs of the specific user). + """ + + @abc.abstractmethod + def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list[JobInfo]: + """Parse the joblist output as returned by executing the command returned by `_get_joblist_command` method. + + :return: list of `JobInfo` objects, one of each job each with at least its default params implemented. + """ + + @abc.abstractmethod + def _get_kill_command(self, jobid: str) -> str: + """Return the command to kill the job with specified jobid.""" + + @abc.abstractmethod + def _parse_kill_output(self, retval: int, stdout: str, stderr: str) -> bool: + """Parse the output of the kill command. + + :return: True if everything seems ok, False otherwise. + """ diff --git a/src/aiida/schedulers/plugins/direct.py b/src/aiida/schedulers/plugins/direct.py index 21d368a15f..78421acb73 100644 --- a/src/aiida/schedulers/plugins/direct.py +++ b/src/aiida/schedulers/plugins/direct.py @@ -13,6 +13,8 @@ from aiida.schedulers import SchedulerError from aiida.schedulers.datastructures import JobInfo, JobState, NodeNumberJobResource +from .bash import BashCliScheduler + ## From the ps man page on Mac OS X 10.12 # state The state is given by a sequence of characters, for example, # ``RWNA''. The first character indicates the run state of the @@ -74,7 +76,7 @@ def accepts_default_memory_per_machine(cls): return False -class DirectScheduler(aiida.schedulers.Scheduler): +class DirectScheduler(BashCliScheduler): """Support for the direct execution bypassing schedulers.""" _logger = aiida.schedulers.Scheduler._logger.getChild('direct') diff --git a/src/aiida/schedulers/plugins/lsf.py b/src/aiida/schedulers/plugins/lsf.py index aafeb2d167..33512ba944 100644 --- a/src/aiida/schedulers/plugins/lsf.py +++ b/src/aiida/schedulers/plugins/lsf.py @@ -16,6 +16,8 @@ from aiida.schedulers import SchedulerError, SchedulerParsingError from aiida.schedulers.datastructures import JobInfo, JobResource, JobState +from .bash import BashCliScheduler + # This maps LSF status codes to our own state list # # List of states from @@ -167,9 +169,10 @@ def accepts_default_mpiprocs_per_machine(cls): return False -class LsfScheduler(aiida.schedulers.Scheduler): +class LsfScheduler(BashCliScheduler): """Support for the IBM LSF scheduler - 'https://www-01.ibm.com/support/knowledgecenter/SSETD4_9.1.2/lsf_welcome.html' + + https://www-01.ibm.com/support/knowledgecenter/SSETD4_9.1.2/lsf_welcome.html """ _logger = aiida.schedulers.Scheduler._logger.getChild('lsf') diff --git a/src/aiida/schedulers/plugins/pbsbaseclasses.py b/src/aiida/schedulers/plugins/pbsbaseclasses.py index 12331010aa..bcceeae19d 100644 --- a/src/aiida/schedulers/plugins/pbsbaseclasses.py +++ b/src/aiida/schedulers/plugins/pbsbaseclasses.py @@ -11,9 +11,11 @@ import logging from aiida.common.escaping import escape_for_bash -from aiida.schedulers import Scheduler, SchedulerError, SchedulerParsingError +from aiida.schedulers import SchedulerError, SchedulerParsingError from aiida.schedulers.datastructures import JobInfo, JobState, MachineInfo, NodeNumberJobResource +from .bash import BashCliScheduler + _LOGGER = logging.getLogger(__name__) # This maps PbsPro status letters to our own status list @@ -95,8 +97,9 @@ def validate_resources(cls, **kwargs): return resources -class PbsBaseClass(Scheduler): +class PbsBaseClass(BashCliScheduler): """Base class with support for the PBSPro scheduler + (http://www.pbsworks.com/) and for PBS and Torque (http://www.adaptivecomputing.com/products/open-source/torque/). diff --git a/src/aiida/schedulers/plugins/sge.py b/src/aiida/schedulers/plugins/sge.py index fbc6a74233..c01381ba3d 100644 --- a/src/aiida/schedulers/plugins/sge.py +++ b/src/aiida/schedulers/plugins/sge.py @@ -21,6 +21,8 @@ from aiida.schedulers import SchedulerError, SchedulerParsingError from aiida.schedulers.datastructures import JobInfo, JobState, ParEnvJobResource +from .bash import BashCliScheduler + # 'http://www.loni.ucla.edu/twiki/bin/view/Infrastructure/GridComputing?skin=plain': # Jobs Status: # 'qw' - Queued and waiting, @@ -88,7 +90,7 @@ class SgeJobResource(ParEnvJobResource): pass -class SgeScheduler(aiida.schedulers.Scheduler): +class SgeScheduler(BashCliScheduler): """Support for the Sun Grid Engine scheduler and its variants/forks (Son of Grid Engine, Oracle Grid Engine, ...)""" _logger = aiida.schedulers.Scheduler._logger.getChild('sge') diff --git a/src/aiida/schedulers/plugins/slurm.py b/src/aiida/schedulers/plugins/slurm.py index 77ddc2711e..0ef2568c22 100644 --- a/src/aiida/schedulers/plugins/slurm.py +++ b/src/aiida/schedulers/plugins/slurm.py @@ -16,6 +16,8 @@ from aiida.schedulers import Scheduler, SchedulerError from aiida.schedulers.datastructures import JobInfo, JobState, NodeNumberJobResource +from .bash import BashCliScheduler + # This maps SLURM state codes to our own status list ## List of states from the man page of squeue @@ -141,7 +143,7 @@ def validate_resources(cls, **kwargs): return resources -class SlurmScheduler(Scheduler): +class SlurmScheduler(BashCliScheduler): """Support for the SLURM scheduler (http://slurm.schedmd.com/).""" _logger = Scheduler._logger.getChild('slurm') diff --git a/src/aiida/schedulers/scheduler.py b/src/aiida/schedulers/scheduler.py index 571b132672..5168762f80 100644 --- a/src/aiida/schedulers/scheduler.py +++ b/src/aiida/schedulers/scheduler.py @@ -125,6 +125,44 @@ def create_job_resource(cls, **kwargs): assert cls._job_resource_class is not None and issubclass(cls._job_resource_class, JobResource) return cls._job_resource_class(**kwargs) + @abc.abstractmethod + def submit_job(self, working_directory: str, filename: str) -> str | ExitCode: + """Submit a job. + + :param working_directory: The absolute filepath to the working directory where the job is to be exectued. + :param filename: The filename of the submission script relative to the working directory. + :returns: + """ + + @abc.abstractmethod + def get_jobs( + self, + jobs: list[str] | None = None, + user: str | None = None, + as_dict: bool = False, + ) -> list[JobInfo] | dict[str, JobInfo]: + """Return the list of currently active jobs. + + :param jobs: A list of jobs to check; only these are checked. + :param user: A string with a user: only jobs of this user are checked. + :param as_dict: If ``False`` (default), a list of ``JobInfo`` objects is returned. If ``True``, a dictionary is + returned, where the ``job_id`` is the key and the values are the ``JobInfo`` objects. + :returns: List of active jobs. + """ + + @abc.abstractmethod + def kill_job(self, jobid: str) -> bool: + """Kill a remote job and parse the return value of the scheduler to check if the command succeeded. + + ..note:: + + On some schedulers, even if the command is accepted, it may take some seconds for the job to actually + disappear from the queue. + + :param jobid: the job ID to be killed + :returns: True if everything seems ok, False otherwise. + """ + def get_submit_script(self, job_tmpl: JobTemplate) -> str: """Return the submit script as a string. @@ -287,19 +325,6 @@ def _get_run_line(self, codes_info: list[JobTemplateCodeInfo], codes_run_mode: C raise NotImplementedError('Unrecognized code run mode') - @abc.abstractmethod - def _get_joblist_command(self, jobs: list[str] | None = None, user: str | None = None) -> str: - """Return the command to get the most complete description possible of currently active jobs. - - .. note:: - - Typically one can pass only either jobs or user, depending on the specific plugin. The choice can be done - according to the value returned by `self.get_feature('can_query_by_user')` - - :param jobs: either None to get a list of all jobs in the machine, or a list of jobs. - :param user: either None, or a string with the username (to show only jobs of the specific user). - """ - def _get_detailed_job_info_command(self, job_id: str) -> dict[str, t.Any]: """Return the command to run to get detailed information for a given job. @@ -332,41 +357,6 @@ def get_detailed_job_info(self, job_id: str) -> dict[str, str | int]: return detailed_job_info - @abc.abstractmethod - def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list[JobInfo]: - """Parse the joblist output as returned by executing the command returned by `_get_joblist_command` method. - - :return: list of `JobInfo` objects, one of each job each with at least its default params implemented. - """ - - def get_jobs( - self, - jobs: list[str] | None = None, - user: str | None = None, - as_dict: bool = False, - ) -> list[JobInfo] | dict[str, JobInfo]: - """Return the list of currently active jobs. - - .. note:: typically, only either jobs or user can be specified. See also comments in `_get_joblist_command`. - - :param list jobs: a list of jobs to check; only these are checked - :param str user: a string with a user: only jobs of this user are checked - :param list as_dict: if False (default), a list of JobInfo objects is returned. If True, a dictionary is - returned, having as key the job_id and as value the JobInfo object. - :return: list of active jobs - """ - with self.transport: - retval, stdout, stderr = self.transport.exec_command_wait(self._get_joblist_command(jobs=jobs, user=user)) - - joblist = self._parse_joblist_output(retval, stdout, stderr) - if as_dict: - jobdict = {job.job_id: job for job in joblist} - if None in jobdict: - raise SchedulerError('Found at least one job without jobid') - return jobdict - - return joblist - @property def transport(self): """Return the transport set for this scheduler.""" @@ -382,58 +372,6 @@ def set_transport(self, transport: Transport): """ self._transport = transport - @abc.abstractmethod - def _get_submit_command(self, submit_script: str) -> str: - """Return the string to execute to submit a given script. - - .. warning:: the `submit_script` should already have been bash-escaped - - :param submit_script: the path of the submit script relative to the working directory. - :return: the string to execute to submit a given script. - """ - - @abc.abstractmethod - def _parse_submit_output(self, retval: int, stdout: str, stderr: str) -> str | ExitCode: - """Parse the output of the submit command returned by calling the `_get_submit_command` command. - - :return: a string with the job ID or an exit code if the submission failed because the submission script is - invalid and the job should be terminated. - """ - - def submit_from_script(self, working_directory: str, submit_script: str) -> str | ExitCode: - """Submit the submission script to the scheduler. - - :return: return a string with the job ID in a valid format to be used for querying. - """ - self.transport.chdir(working_directory) - result = self.transport.exec_command_wait(self._get_submit_command(escape_for_bash(submit_script))) - return self._parse_submit_output(*result) - - def kill(self, jobid: str) -> bool: - """Kill a remote job and parse the return value of the scheduler to check if the command succeeded. - - ..note:: - - On some schedulers, even if the command is accepted, it may take some seconds for the job to actually - disappear from the queue. - - :param jobid: the job ID to be killed - :return: True if everything seems ok, False otherwise. - """ - retval, stdout, stderr = self.transport.exec_command_wait(self._get_kill_command(jobid)) - return self._parse_kill_output(retval, stdout, stderr) - - @abc.abstractmethod - def _get_kill_command(self, jobid: str) -> str: - """Return the command to kill the job with specified jobid.""" - - @abc.abstractmethod - def _parse_kill_output(self, retval: int, stdout: str, stderr: str) -> bool: - """Parse the output of the kill command. - - :return: True if everything seems ok, False otherwise. - """ - def parse_output( self, detailed_job_info: dict[str, str | int] | None = None, diff --git a/tests/engine/processes/calcjobs/test_calc_job.py b/tests/engine/processes/calcjobs/test_calc_job.py index 4583d9ea18..4e679d5cdf 100644 --- a/tests/engine/processes/calcjobs/test_calc_job.py +++ b/tests/engine/processes/calcjobs/test_calc_job.py @@ -1273,10 +1273,10 @@ def test_monitor_result_action_disable_self(get_calcjob_builder, entry_points, c def test_submit_return_exit_code(get_calcjob_builder, monkeypatch): - """Test that a job is terminated if ``Scheduler.submit_from_script`` returns an exit code. + """Test that a job is terminated if ``Scheduler.submit_job`` returns an exit code. To simulate this situation we monkeypatch ``DirectScheduler._parse_submit_output`` because that is the method that - is called internally by ``Scheduler.submit_from_script`` and it returns its result, and the ``DirectScheduler`` is + is called internally by ``Scheduler.submit_job`` and it returns its result, and the ``DirectScheduler`` is the plugin that is used by the localhost computer used in the inputs for this calcjob. """ from aiida.schedulers.plugins.direct import DirectScheduler diff --git a/tests/transports/test_all_plugins.py b/tests/transports/test_all_plugins.py index 517efe106a..986dd465a9 100644 --- a/tests/transports/test_all_plugins.py +++ b/tests/transports/test_all_plugins.py @@ -1301,7 +1301,7 @@ def test_asynchronous_execution(custom_transport): transport.putfile(tmpf.name, script_fname) timestamp_before = time.time() - job_id_string = scheduler.submit_from_script('/tmp', script_fname) + job_id_string = scheduler.submit_job('/tmp', script_fname) elapsed_time = time.time() - timestamp_before # We want to get back control. If it takes < 5 seconds, it means that it is not blocking