diff --git a/src/psij/executors/batch/batch_scheduler_executor.py b/src/psij/executors/batch/batch_scheduler_executor.py index e5a8ca06..ab997314 100644 --- a/src/psij/executors/batch/batch_scheduler_executor.py +++ b/src/psij/executors/batch/batch_scheduler_executor.py @@ -154,6 +154,10 @@ class BatchSchedulerExecutor(JobExecutor): 2. store the exit code of the launch command in the *exit code file* named `.ec`, also inside ``. + Additionally, where appropriate, the submit script should set the environment variable named + ``PSIJ_NODEFILE`` to point to a file containing a list of nodes that are allocated for the job, + one per line, with a total number of lines matching the process count of the job. + Once the submit script is generated, the executor renders the submit command using :func:`~get_submit_command` and executes it. Its output is then parsed using :func:`~job_id_from_submit_output` to retrieve the `native_id` of the job. Subsequently, the diff --git a/src/psij/executors/batch/cobalt/cobalt.mustache b/src/psij/executors/batch/cobalt/cobalt.mustache index a52b50b6..922e9eea 100644 --- a/src/psij/executors/batch/cobalt/cobalt.mustache +++ b/src/psij/executors/batch/cobalt/cobalt.mustache @@ -46,6 +46,9 @@ only results in empty files that are not cleaned up}} #COBALT -e /dev/null #COBALT -o /dev/null +{{!like PBS, this is also cheap and there is not need to check setting}} +PSIJ_NODEFILE="$COBALT_NODEFILE" +export PSIJ_NODEFILE {{!redirect output here instead of through #COBALT directive since COBALT_JOB_ID is not available when the directives are evaluated; the reason for using the job id in the first place being the diff --git a/src/psij/executors/batch/lsf/lsf.mustache b/src/psij/executors/batch/lsf/lsf.mustache index e5f166c7..781e9f82 100644 --- a/src/psij/executors/batch/lsf/lsf.mustache +++ b/src/psij/executors/batch/lsf/lsf.mustache @@ -71,6 +71,9 @@ only results in empty files that are not cleaned up}} #BSUB -e /dev/null #BSUB -o /dev/null +PSIJ_NODEFILE="$LSB_HOSTS" +export PSIJ_NODEFILE + {{!redirect output here instead of through #BSUB directive since LSB_JOBID is not available when the directives are evaluated; the reason for using the job id in the first place being the same as for the exit code file.}} diff --git a/src/psij/executors/batch/pbspro/pbspro.mustache b/src/psij/executors/batch/pbspro/pbspro.mustache index a19cbf5d..f0ed319e 100644 --- a/src/psij/executors/batch/pbspro/pbspro.mustache +++ b/src/psij/executors/batch/pbspro/pbspro.mustache @@ -48,6 +48,9 @@ only results in empty files that are not cleaned up}} #PBS -v {{name}}={{value}} {{/env}} +PSIJ_NODEFILE="$PBS_NODEFILE" +export PSIJ_NODEFILE + {{#job.spec.directory}} cd "{{.}}" diff --git a/src/psij/executors/batch/slurm.py b/src/psij/executors/batch/slurm.py index fd5117e6..44e5e2e1 100644 --- a/src/psij/executors/batch/slurm.py +++ b/src/psij/executors/batch/slurm.py @@ -185,3 +185,7 @@ def _format_duration(self, d: timedelta) -> str: if d.days > 0: days = str(d.days) + '-' return days + "%s:%s:%s" % (d.seconds // 3600, (d.seconds // 60) % 60, d.seconds % 60) + + def _clean_submit_script(self, job: Job) -> None: + super()._clean_submit_script(job) + self._delete_aux_file(job, '.nodefile') diff --git a/src/psij/executors/batch/slurm/slurm.mustache b/src/psij/executors/batch/slurm/slurm.mustache index 7757b933..4b0387c2 100644 --- a/src/psij/executors/batch/slurm/slurm.mustache +++ b/src/psij/executors/batch/slurm/slurm.mustache @@ -74,6 +74,11 @@ only results in empty files that are not cleaned up}} #SBATCH -e /dev/null #SBATCH -o /dev/null +PSIJ_NODEFILE="{{psij.script_dir}}/$SLURM_JOB_ID.nodefile" +scontrol show hostnames >"$PSIJ_NODEFILE" +export PSIJ_NODEFILE + + {{#env}} #SBATCH --export={{name}}={{value}} {{/env}} diff --git a/src/psij/executors/local.py b/src/psij/executors/local.py index 0567a982..cdf91493 100644 --- a/src/psij/executors/local.py +++ b/src/psij/executors/local.py @@ -1,17 +1,19 @@ """This module contains the local :class:`~psij.JobExecutor`.""" import logging import os +import shlex import signal import subprocess import threading import time from abc import ABC, abstractmethod +from tempfile import mkstemp from types import FrameType from typing import Optional, Dict, List, Tuple, Type, cast import psutil -from psij import InvalidJobException, SubmitException, Launcher +from psij import InvalidJobException, SubmitException, Launcher, ResourceSpecV1 from psij import Job, JobSpec, JobExecutorConfig, JobState, JobStatus from psij import JobExecutor from psij.utils import SingletonThread @@ -19,6 +21,15 @@ logger = logging.getLogger(__name__) +def _format_shell_cmd(args: List[str]) -> str: + """Formats an argument list in a way that allows it to be pasted in a shell.""" + cmd = '' + for arg in args: + cmd += shlex.quote(arg) + cmd += ' ' + return cmd + + def _handle_sigchld(signum: int, frame: Optional[FrameType]) -> None: _ProcessReaper.get_instance()._handle_sigchld() @@ -67,6 +78,7 @@ class _ChildProcessEntry(_ProcessEntry): def __init__(self, job: Job, executor: 'LocalJobExecutor', launcher: Optional[Launcher]) -> None: super().__init__(job, executor, launcher) + self.nodefile: Optional[str] = None def kill(self) -> None: super().kill() @@ -75,6 +87,8 @@ def poll(self) -> Tuple[Optional[int], Optional[str]]: assert self.process is not None exit_code = self.process.poll() if exit_code is not None: + if self.nodefile: + os.unlink(self.nodefile) if self.process.stdout: return exit_code, self.process.stdout.read().decode('utf-8') else: @@ -103,19 +117,30 @@ def poll(self) -> Tuple[Optional[int], Optional[str]]: return None, None -def _get_env(spec: JobSpec) -> Optional[Dict[str, str]]: +def _get_env(spec: JobSpec, nodefile: Optional[str]) -> Optional[Dict[str, str]]: + env: Optional[Dict[str, str]] = None if spec.inherit_environment: - if not spec.environment: + if spec.environment is None and nodefile is None: # if env is none in Popen, it inherits env from parent return None else: # merge current env with spec env env = os.environ.copy() - env.update(spec.environment) + if spec.environment: + env.update(spec.environment) + if nodefile is not None: + env['PSIJ_NODEFILE'] = nodefile return env else: # only spec env - return spec.environment + if nodefile is None: + env = spec.environment + else: + env = {'PSIJ_NODEFILE': nodefile} + if spec.environment: + env.update(spec.environment) + + return env class _ProcessReaper(SingletonThread): @@ -222,6 +247,26 @@ def __init__(self, url: Optional[str] = None, super().__init__(url=url, config=config if config else JobExecutorConfig()) self._reaper = _ProcessReaper.get_instance() + def _generate_nodefile(self, job: Job, p: _ChildProcessEntry) -> Optional[str]: + assert job.spec is not None + if job.spec.resources is None: + return None + if job.spec.resources.version == 1: + assert isinstance(job.spec.resources, ResourceSpecV1) + n = job.spec.resources.computed_process_count + if n == 1: + # as a bit of an optimization, we don't generate a nodefile when doing "single + # node" jobs on local. + return None + (file, p.nodefile) = mkstemp(suffix='.nodelist') + for i in range(n): + os.write(file, 'localhost\n'.encode()) + os.close(file) + return p.nodefile + else: + raise SubmitException('Cannot handle resource specification with version %s' + % job.spec.resources.version) + def submit(self, job: Job) -> None: """ Submits the specified :class:`~psij.Job` to be run locally. @@ -244,9 +289,12 @@ def submit(self, job: Job) -> None: with job._status_cv: if job.status.state == JobState.CANCELED: raise SubmitException('Job canceled') - logger.debug('Running %s, out=%s, err=%s', args, spec.stdout_path, spec.stderr_path) + if logger.isEnabledFor(logging.DEBUG): + logger.debug('Running %s', _format_shell_cmd(args)) + nodefile = self._generate_nodefile(job, p) + env = _get_env(spec, nodefile) p.process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - close_fds=True, cwd=spec.directory, env=_get_env(spec)) + close_fds=True, cwd=spec.directory, env=env) self._reaper.register(p) job._native_id = p.process.pid self._set_job_status(job, JobStatus(JobState.QUEUED, time=time.time(), diff --git a/src/psij/launchers/script_based_launcher.py b/src/psij/launchers/script_based_launcher.py index 07f89612..6c662dea 100644 --- a/src/psij/launchers/script_based_launcher.py +++ b/src/psij/launchers/script_based_launcher.py @@ -202,8 +202,9 @@ def get_additional_args(self, job: Job) -> List[str]: def is_launcher_failure(self, output: str) -> bool: """See :func:`~psij.job_launcher.Launcher.is_launcher_failure`.""" - return output.split('\n')[-1] != '_PSI_J_LAUNCHER_DONE' + # last line should be an empty line + return output.split('\n')[-2] != '_PSI_J_LAUNCHER_DONE' def get_launcher_failure_message(self, output: str) -> str: """See :func:`~psij.job_launcher.Launcher.get_launcher_failure_message`.""" - return '\n'.join(output.split('\n')[:-1]) + return '\n'.join(output.split('\n')[:-2]) diff --git a/tests/_test_tools.py b/tests/_test_tools.py index b317c9d7..7aa4b34d 100644 --- a/tests/_test_tools.py +++ b/tests/_test_tools.py @@ -34,8 +34,9 @@ def assert_completed(job: Job, status: Optional[JobStatus]) -> None: assert job.spec is not None stdout = _read_file(job.spec.stdout_path) stderr = _read_file(job.spec.stderr_path) - raise AssertionError('Job not completed. Status message: %s, stdout: %s, stderr: %s' - % (status.message, stdout, stderr)) + raise AssertionError('Job not completed. Exit code: %s, Status message: %s, ' + 'stdout: %s, stderr: %s' + % (status.exit_code, status.message, stdout, stderr)) def _get_executor_instance(ep: ExecutorTestParams, job: Optional[Job] = None) -> JobExecutor: diff --git a/tests/plugins1/_batch_test/_batch_test.py b/tests/plugins1/_batch_test/_batch_test.py index 51f2e764..c7b3971b 100644 --- a/tests/plugins1/_batch_test/_batch_test.py +++ b/tests/plugins1/_batch_test/_batch_test.py @@ -91,6 +91,10 @@ def _get_state(self, state: str) -> JobState: assert state in _TestJobExecutor._STATE_MAP return _TestJobExecutor._STATE_MAP[state] + def _clean_submit_script(self, job: Job): + super()._clean_submit_script(job) + self._delete_aux_file(job, '.nodefile') + class _TestLauncher(MultipleLauncher): def __init__(self, config: Optional[JobExecutorConfig] = None): diff --git a/tests/plugins1/_batch_test/test/test.mustache b/tests/plugins1/_batch_test/test/test.mustache index 8cd3aae9..a1866803 100644 --- a/tests/plugins1/_batch_test/test/test.mustache +++ b/tests/plugins1/_batch_test/test/test.mustache @@ -1,6 +1,6 @@ #!/bin/bash -exec &>> "{{psij.script_dir}}/$PSIJ_BATCH_TEST_JOB_ID.out" +exec &> "{{psij.script_dir}}/$PSIJ_BATCH_TEST_JOB_ID.out" {{#job.spec.directory}} cd "{{.}}" @@ -31,6 +31,15 @@ export {{key}}="{{value}}" {{/custom_attributes}} +HOSTNAME=`hostname` +PSIJ_NODEFILE="{{psij.script_dir}}/$PSIJ_BATCH_TEST_JOB_ID.nodefile" +rm -f "$PSIJ_NODEFILE" +for NODE in $(seq 1 1 "$PSIJ_TEST_BATCH_EXEC_COUNT"); do + echo "$HOSTNAME-$NODE" >> "$PSIJ_NODEFILE" +done + +export PSIJ_NODEFILE + {{#job.spec.inherit_environment}}env \{{/job.spec.inherit_environment}}{{^job.spec.inherit_environment}}env --ignore-environment \{{/job.spec.inherit_environment}}{{#env}} {{name}}="{{value}}" \ {{/env}}{{#psij.launch_command}}{{.}} {{/psij.launch_command}} diff --git a/tests/test_nodefile.py b/tests/test_nodefile.py new file mode 100644 index 00000000..07af2df7 --- /dev/null +++ b/tests/test_nodefile.py @@ -0,0 +1,30 @@ +import os +from pathlib import Path +from tempfile import TemporaryDirectory + +import pytest + +from _test_tools import assert_completed +from executor_test_params import ExecutorTestParams +from psij import Job, JobSpec, JobExecutor, ResourceSpecV1 + +NOT_TESTED = set(['rp', 'flux']) + + +def test_nodefile(execparams: ExecutorTestParams) -> None: + if execparams.executor in NOT_TESTED: + pytest.skip('This test does not work with %s' % execparams.executor) + + my_path = os.path.dirname(os.path.realpath(__file__)) + + N_PROC = 4 + with TemporaryDirectory(dir=Path.home() / '.psij' / 'test') as td: + outp = Path(td, 'stdout.txt') + spec = JobSpec('/bin/bash', [os.path.join(my_path, 'test_nodefile.sh'), str(N_PROC)], + stdout_path=outp) + job = Job(spec) + spec.resources = ResourceSpecV1(process_count=N_PROC) + ex = JobExecutor.get_instance(execparams.executor) + ex.submit(job) + status = job.wait() + assert_completed(job, status) diff --git a/tests/test_nodefile.sh b/tests/test_nodefile.sh new file mode 100755 index 00000000..f061d5ee --- /dev/null +++ b/tests/test_nodefile.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +set -e + +EXPECTED_N_NODES="$1" +if [ "$EXPECTED_N_NODES" == "" ]; then + echo "Missing expected node count argument" + exit 3 +fi + +ACTUAL_N_NODES=`cat "$PSIJ_NODEFILE" | wc -l` + +if [ "$EXPECTED_N_NODES" != "$ACTUAL_N_NODES" ]; then + echo "Invalid node file. Expected $EXPECTED_N_NODES nodes, but got $ACTUAL_N_NODES." + echo "Nodefile contents follows" + cat "$PSIJ_NODEFILE" + exit 2 +fi