Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hostlist #414

Merged
merged 6 commits into from
Sep 24, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/psij/executors/batch/batch_scheduler_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ class BatchSchedulerExecutor(JobExecutor):
2. store the exit code of the launch command in the *exit code file* named
`<native_id>.ec`, also inside `<script_dir>`.

Additionally, where appropriate, the submit script should set the environment variable named
``PSIJ_NODEFILE`` to point to a file containing a list of nodes that are allocated for the job,
one per line, with a total number of lines matching the process count of the job.

Once the submit script is generated, the executor renders the submit command using
:func:`~get_submit_command` and executes it. Its output is then parsed using
:func:`~job_id_from_submit_output` to retrieve the `native_id` of the job. Subsequently, the
Expand Down
3 changes: 3 additions & 0 deletions src/psij/executors/batch/cobalt/cobalt.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ only results in empty files that are not cleaned up}}
#COBALT -e /dev/null
#COBALT -o /dev/null

{{!like PBS, this is also cheap and there is not need to check setting}}
PSIJ_NODEFILE="$COBALT_NODEFILE"
export PSIJ_NODEFILE

{{!redirect output here instead of through #COBALT directive since COBALT_JOB_ID is not available
when the directives are evaluated; the reason for using the job id in the first place being the
Expand Down
3 changes: 3 additions & 0 deletions src/psij/executors/batch/lsf/lsf.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ only results in empty files that are not cleaned up}}
#BSUB -e /dev/null
#BSUB -o /dev/null

PSIJ_NODEFILE="$LSB_HOSTS"
export PSIJ_NODEFILE

{{!redirect output here instead of through #BSUB directive since LSB_JOBID is not available
when the directives are evaluated; the reason for using the job id in the first place being the
same as for the exit code file.}}
Expand Down
3 changes: 3 additions & 0 deletions src/psij/executors/batch/pbspro/pbspro.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ only results in empty files that are not cleaned up}}
#PBS -v {{name}}={{value}}
{{/env}}

PSIJ_NODEFILE="$PBS_NODEFILE"
export PSIJ_NODEFILE


{{#job.spec.directory}}
cd "{{.}}"
Expand Down
4 changes: 4 additions & 0 deletions src/psij/executors/batch/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,7 @@ def _format_duration(self, d: timedelta) -> str:
if d.days > 0:
days = str(d.days) + '-'
return days + "%s:%s:%s" % (d.seconds // 3600, (d.seconds // 60) % 60, d.seconds % 60)

def _clean_submit_script(self, job: Job) -> None:
super()._clean_submit_script(job)
self._delete_aux_file(job, '.nodefile')
5 changes: 5 additions & 0 deletions src/psij/executors/batch/slurm/slurm.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ only results in empty files that are not cleaned up}}
#SBATCH -e /dev/null
#SBATCH -o /dev/null

PSIJ_NODEFILE="{{psij.script_dir}}/$SLURM_JOB_ID.nodefile"
scontrol show hostnames >"$PSIJ_NODEFILE"
export PSIJ_NODEFILE


{{#env}}
#SBATCH --export={{name}}={{value}}
{{/env}}
Expand Down
62 changes: 55 additions & 7 deletions src/psij/executors/local.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,35 @@
"""This module contains the local :class:`~psij.JobExecutor`."""
import logging
import os
import shlex
import signal
import subprocess
import threading
import time
from abc import ABC, abstractmethod
from tempfile import mkstemp
from types import FrameType
from typing import Optional, Dict, List, Tuple, Type, cast

import psutil

from psij import InvalidJobException, SubmitException, Launcher
from psij import InvalidJobException, SubmitException, Launcher, ResourceSpecV1
from psij import Job, JobSpec, JobExecutorConfig, JobState, JobStatus
from psij import JobExecutor
from psij.utils import SingletonThread

logger = logging.getLogger(__name__)


def _format_shell_cmd(args: List[str]) -> str:
"""Formats an argument list in a way that allows it to be pasted in a shell."""
cmd = ''
for arg in args:
cmd += shlex.quote(arg)
cmd += ' '
return cmd


def _handle_sigchld(signum: int, frame: Optional[FrameType]) -> None:
_ProcessReaper.get_instance()._handle_sigchld()

Expand Down Expand Up @@ -67,6 +78,7 @@ class _ChildProcessEntry(_ProcessEntry):
def __init__(self, job: Job, executor: 'LocalJobExecutor',
launcher: Optional[Launcher]) -> None:
super().__init__(job, executor, launcher)
self.nodefile: Optional[str] = None

def kill(self) -> None:
super().kill()
Expand All @@ -75,6 +87,8 @@ def poll(self) -> Tuple[Optional[int], Optional[str]]:
assert self.process is not None
exit_code = self.process.poll()
if exit_code is not None:
if self.nodefile:
os.unlink(self.nodefile)
if self.process.stdout:
return exit_code, self.process.stdout.read().decode('utf-8')
else:
Expand Down Expand Up @@ -103,19 +117,30 @@ def poll(self) -> Tuple[Optional[int], Optional[str]]:
return None, None


def _get_env(spec: JobSpec) -> Optional[Dict[str, str]]:
def _get_env(spec: JobSpec, nodefile: Optional[str]) -> Optional[Dict[str, str]]:
env: Optional[Dict[str, str]] = None
if spec.inherit_environment:
if not spec.environment:
if spec.environment is None and nodefile is None:
# if env is none in Popen, it inherits env from parent
return None
else:
# merge current env with spec env
env = os.environ.copy()
env.update(spec.environment)
if spec.environment:
env.update(spec.environment)
if nodefile is not None:
env['PSIJ_NODEFILE'] = nodefile
return env
else:
# only spec env
return spec.environment
if nodefile is None:
env = spec.environment
else:
env = {'PSIJ_NODEFILE': nodefile}
if spec.environment:
env.update(spec.environment)

return env


class _ProcessReaper(SingletonThread):
Expand Down Expand Up @@ -222,6 +247,26 @@ def __init__(self, url: Optional[str] = None,
super().__init__(url=url, config=config if config else JobExecutorConfig())
self._reaper = _ProcessReaper.get_instance()

def _generate_nodefile(self, job: Job, p: _ChildProcessEntry) -> Optional[str]:
assert job.spec is not None
if job.spec.resources is None:
return None
if job.spec.resources.version == 1:
assert isinstance(job.spec.resources, ResourceSpecV1)
n = job.spec.resources.computed_process_count
if n == 1:
# as a bit of an optimization, we don't generate a nodefile when doing "single
# node" jobs on local.
return None
(file, p.nodefile) = mkstemp(suffix='.nodelist')
for i in range(n):
os.write(file, 'localhost\n'.encode())
os.close(file)
return p.nodefile
else:
raise SubmitException('Cannot handle resource specification with version %s'
% job.spec.resources.version)

def submit(self, job: Job) -> None:
"""
Submits the specified :class:`~psij.Job` to be run locally.
Expand All @@ -244,9 +289,12 @@ def submit(self, job: Job) -> None:
with job._status_cv:
if job.status.state == JobState.CANCELED:
raise SubmitException('Job canceled')
logger.debug('Running %s, out=%s, err=%s', args, spec.stdout_path, spec.stderr_path)
if logger.isEnabledFor(logging.DEBUG):
logger.debug('Running %s', _format_shell_cmd(args))
nodefile = self._generate_nodefile(job, p)
env = _get_env(spec, nodefile)
p.process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
close_fds=True, cwd=spec.directory, env=_get_env(spec))
close_fds=True, cwd=spec.directory, env=env)
self._reaper.register(p)
job._native_id = p.process.pid
self._set_job_status(job, JobStatus(JobState.QUEUED, time=time.time(),
Expand Down
5 changes: 3 additions & 2 deletions src/psij/launchers/script_based_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ def get_additional_args(self, job: Job) -> List[str]:

def is_launcher_failure(self, output: str) -> bool:
"""See :func:`~psij.job_launcher.Launcher.is_launcher_failure`."""
return output.split('\n')[-1] != '_PSI_J_LAUNCHER_DONE'
# last line should be an empty line
return output.split('\n')[-2] != '_PSI_J_LAUNCHER_DONE'

def get_launcher_failure_message(self, output: str) -> str:
"""See :func:`~psij.job_launcher.Launcher.get_launcher_failure_message`."""
return '\n'.join(output.split('\n')[:-1])
return '\n'.join(output.split('\n')[:-2])
5 changes: 3 additions & 2 deletions tests/_test_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ def assert_completed(job: Job, status: Optional[JobStatus]) -> None:
assert job.spec is not None
stdout = _read_file(job.spec.stdout_path)
stderr = _read_file(job.spec.stderr_path)
raise AssertionError('Job not completed. Status message: %s, stdout: %s, stderr: %s'
% (status.message, stdout, stderr))
raise AssertionError('Job not completed. Exit code: %s, Status message: %s, '
'stdout: %s, stderr: %s'
% (status.exit_code, status.message, stdout, stderr))


def _get_executor_instance(ep: ExecutorTestParams, job: Optional[Job] = None) -> JobExecutor:
Expand Down
4 changes: 4 additions & 0 deletions tests/plugins1/_batch_test/_batch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ def _get_state(self, state: str) -> JobState:
assert state in _TestJobExecutor._STATE_MAP
return _TestJobExecutor._STATE_MAP[state]

def _clean_submit_script(self, job: Job):
super()._clean_submit_script(job)
self._delete_aux_file(job, '.nodefile')


class _TestLauncher(MultipleLauncher):
def __init__(self, config: Optional[JobExecutorConfig] = None):
Expand Down
11 changes: 10 additions & 1 deletion tests/plugins1/_batch_test/test/test.mustache
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

exec &>> "{{psij.script_dir}}/$PSIJ_BATCH_TEST_JOB_ID.out"
exec &> "{{psij.script_dir}}/$PSIJ_BATCH_TEST_JOB_ID.out"

{{#job.spec.directory}}
cd "{{.}}"
Expand All @@ -22,6 +22,15 @@ export {{key}}="{{value}}"
{{/custom_attributes.test}}
{{/job.spec.attributes}}

HOSTNAME=`hostname`
PSIJ_NODEFILE="{{psij.script_dir}}/$PSIJ_BATCH_TEST_JOB_ID.nodefile"
rm -f "$PSIJ_NODEFILE"
for NODE in $(seq 1 1 "$PSIJ_TEST_BATCH_EXEC_COUNT"); do
echo "$HOSTNAME-$NODE" >> "$PSIJ_NODEFILE"
done

export PSIJ_NODEFILE

{{#job.spec.inherit_environment}}env \{{/job.spec.inherit_environment}}{{^job.spec.inherit_environment}}env --ignore-environment \{{/job.spec.inherit_environment}}{{#env}}
{{name}}="{{value}}" \
{{/env}}{{#psij.launch_command}}{{.}} {{/psij.launch_command}}
Expand Down
30 changes: 30 additions & 0 deletions tests/test_nodefile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os
from pathlib import Path
from tempfile import TemporaryDirectory

import pytest

from _test_tools import assert_completed
from executor_test_params import ExecutorTestParams
from psij import Job, JobSpec, JobExecutor, ResourceSpecV1

NOT_TESTED = set(['rp', 'flux'])


def test_nodefile(execparams: ExecutorTestParams) -> None:
if execparams.executor in NOT_TESTED:
pytest.skip('This test does not work with %s' % execparams.executor)

my_path = os.path.dirname(os.path.realpath(__file__))

N_PROC = 4
with TemporaryDirectory(dir=Path.home() / '.psij' / 'test') as td:
outp = Path(td, 'stdout.txt')
spec = JobSpec('/bin/bash', [os.path.join(my_path, 'test_nodefile.sh'), str(N_PROC)],
stdout_path=outp)
job = Job(spec)
spec.resources = ResourceSpecV1(process_count=N_PROC)
ex = JobExecutor.get_instance(execparams.executor)
ex.submit(job)
status = job.wait()
assert_completed(job, status)
18 changes: 18 additions & 0 deletions tests/test_nodefile.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

set -e

EXPECTED_N_NODES="$1"
if [ "$EXPECTED_N_NODES" == "" ]; then
echo "Missing expected node count argument"
exit 3
fi

ACTUAL_N_NODES=`cat "$PSIJ_NODEFILE" | wc -l`

if [ "$EXPECTED_N_NODES" != "$ACTUAL_N_NODES" ]; then
echo "Invalid node file. Expected $EXPECTED_N_NODES nodes, but got $ACTUAL_N_NODES."
echo "Nodefile contents follows"
cat "$PSIJ_NODEFILE"
exit 2
fi