Skip to content

Commit

Permalink
Merge pull request Snakemake-Profiles#56 from mbhall88/master
Browse files Browse the repository at this point in the history
Change cluster cancel to a script
  • Loading branch information
mbhall88 authored Jul 12, 2022
2 parents 9abd00e + 3198c66 commit d21dedd
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 4 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ This document tracks changes to the `master` branch of the profile.

## [Unreleased]

### Added
- Exposed `max_status_check` and `wait_between_tries` for status checker [[#48][48]]

### Changed
- Cluster cancel is now a script instead of the `bkill` command in order to handle the log file paths that come with the job ID [[#55][55]]

## [0.2.0] - 28/05/2022

### Added
Expand Down Expand Up @@ -91,6 +97,8 @@ This document tracks changes to the `master` branch of the profile.
[36]: https://github.com/Snakemake-Profiles/lsf/issues/36
[39]: https://github.com/Snakemake-Profiles/lsf/issues/39
[45]: https://github.com/Snakemake-Profiles/lsf/issues/45
[48]: https://github.com/Snakemake-Profiles/lsf/issues/48
[55]: https://github.com/Snakemake-Profiles/lsf/issues/55
[0.1.0]: https://github.com/Snakemake-Profiles/lsf/releases/tag/0.1.0
[0.1.1]: https://github.com/Snakemake-Profiles/lsf/releases/tag/0.1.1
[0.1.2]: https://github.com/Snakemake-Profiles/lsf/releases/tag/0.1.2
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,18 @@ From the `snakemake --help` menu
default is 10, fractions allowed.
```

#### `max_status_checks`

**Default**: `1`

How many times to check the status of a job.

#### `wait_between_tries`

**Default**: `0.001`

How many seconds to wait until checking the status of a job again (if `max_status_checks` is greater than 1).

#### `profile_name`

**Default**: `lsf`
Expand Down
2 changes: 2 additions & 0 deletions cookiecutter.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,7 @@
"default_project": "",
"max_status_checks_per_second": 10,
"max_jobs_per_second": 10,
"max_status_checks": 1,
"wait_between_tries": 0.001,
"profile_name": "lsf"
}
136 changes: 136 additions & 0 deletions tests/test_lsf_cancel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import unittest
from unittest.mock import patch

from tests.src.OSLayer import OSLayer
from tests.src.lsf_cancel import kill_jobs, parse_input, KILL


class TestParseInput(unittest.TestCase):
script = "lsf_cancel.py"

def test_parse_input_no_args(self):
fake_args = [self.script]
with patch("sys.argv", fake_args):
actual = parse_input()

assert not actual

def test_parse_input_one_job_no_log(self):
fake_args = [self.script, "1234"]
with patch("sys.argv", fake_args):
actual = parse_input()

expected = fake_args[1:]
assert actual == expected

def test_parse_input_one_job_and_log(self):
fake_args = [self.script, "1234", "log/file.out"]
with patch("sys.argv", fake_args):
actual = parse_input()

expected = [fake_args[1]]
assert actual == expected

def test_parse_input_two_jobs_and_log(self):
fake_args = [self.script, "1234", "log/file.out", "9090", "log/other.out"]
with patch("sys.argv", fake_args):
actual = parse_input()

expected = [fake_args[1], fake_args[3]]
assert actual == expected

def test_parse_input_two_jobs_and_digits_in_log(self):
fake_args = [self.script, "1234", "log/file.out", "9090", "log/123"]
with patch("sys.argv", fake_args):
actual = parse_input()

expected = [fake_args[1], fake_args[3]]
assert actual == expected

def test_parse_input_multiple_args_but_no_jobs(self):
fake_args = [self.script, "log/file.out", "log/123"]
with patch("sys.argv", fake_args):
actual = parse_input()

assert not actual


class TestKillJobs(unittest.TestCase):
@patch.object(
OSLayer,
OSLayer.run_process.__name__,
return_value=(
"Job <123> is being terminated",
"",
),
)
def test_kill_jobs_one_job(
self,
run_process_mock,
):
jobids = ["123"]
expected_kill_cmd = "{} {}".format(KILL, " ".join(jobids))

kill_jobs(jobids)

run_process_mock.assert_called_once_with(expected_kill_cmd, check=False)

@patch.object(
OSLayer,
OSLayer.run_process.__name__,
return_value=(
"Job <123> is being terminated\nJob <456> is being terminated",
"",
),
)
def test_kill_jobs_two_jobs(
self,
run_process_mock,
):
jobids = ["123", "456"]
expected_kill_cmd = "{} {}".format(KILL, " ".join(jobids))

kill_jobs(jobids)

run_process_mock.assert_called_once_with(expected_kill_cmd, check=False)

@patch.object(
OSLayer,
OSLayer.run_process.__name__,
return_value=("", ""),
)
def test_kill_jobs_no_jobs(
self,
run_process_mock,
):
jobids = []

kill_jobs(jobids)

run_process_mock.assert_not_called()

@patch.object(
OSLayer,
OSLayer.run_process.__name__,
return_value=("", ""),
)
def test_kill_jobs_empty_jobs(self, run_process_mock):
jobids = ["", ""]

kill_jobs(jobids)

run_process_mock.assert_not_called()

@patch.object(
OSLayer,
OSLayer.run_process.__name__,
return_value=("", ""),
)
def test_kill_jobs_empty_job_and_non_empty_job(self, run_process_mock):
jobids = ["", "123"]

expected_kill_cmd = "{} {}".format(KILL, " ".join(jobids))

kill_jobs(jobids)

run_process_mock.assert_called_once_with(expected_kill_cmd, check=False)
8 changes: 8 additions & 0 deletions {{cookiecutter.profile_name}}/CookieCutter.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,11 @@ def get_zombi_behaviour() -> str:
@staticmethod
def get_latency_wait() -> float:
return float("{{cookiecutter.latency_wait}}")

@staticmethod
def get_wait_between_tries() -> float:
return float("{{cookiecutter.wait_between_tries}}")

@staticmethod
def get_max_status_checks() -> int:
return int("{{cookiecutter.max_status_checks}}")
8 changes: 6 additions & 2 deletions {{cookiecutter.profile_name}}/OSLayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ def remove_file(file: Path):
file.unlink()

@staticmethod
def run_process(cmd: str) -> Tuple[stdout, stderr]:
def run_process(cmd: str, check: bool = True) -> Tuple[stdout, stderr]:
completed_process = subprocess.run(
cmd, check=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
cmd, check=check, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
return (
completed_process.stdout.decode().strip(),
Expand All @@ -49,6 +49,10 @@ def run_process(cmd: str) -> Tuple[stdout, stderr]:
def print(string: str):
print(string)

@staticmethod
def eprint(string: str):
print(string, file=sys.stderr)

@staticmethod
def get_uuid4_string() -> str:
return str(uuid.uuid4())
Expand Down
2 changes: 1 addition & 1 deletion {{cookiecutter.profile_name}}/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ restart-times: "{{cookiecutter.restart_times}}"
jobs: "{{cookiecutter.jobs}}"
cluster: "lsf_submit.py"
cluster-status: "lsf_status.py"
cluster-cancel: "bkill"
cluster-cancel: "lsf_cancel.py"
max-jobs-per-second: "{{cookiecutter.max_jobs_per_second}}"
max-status-checks-per-second: "{{cookiecutter.max_status_checks_per_second}}"
44 changes: 44 additions & 0 deletions {{cookiecutter.profile_name}}/lsf_cancel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env python3
import re
import shlex
import sys
from pathlib import Path
from typing import List

if not __name__.startswith("tests.src."):
sys.path.append(str(Path(__file__).parent.absolute()))
from OSLayer import OSLayer
else:
from .OSLayer import OSLayer

KILL = "bkill"


def kill_jobs(ids_to_kill: List[str]):
# we don't want to run bkill with no argument as this will kill the last job
if any(ids_to_kill):
cmd = "{} {}".format(KILL, " ".join(ids_to_kill))
_ = OSLayer.run_process(cmd, check=False)


def parse_input() -> List[str]:
# need to support quoted and unquoted jobid
# see https://github.com/Snakemake-Profiles/lsf/issues/45
split_args = shlex.split(" ".join(sys.argv[1:]))
valid_ids = []
for arg in map(str.strip, split_args):
if re.fullmatch(r"\d+", arg):
valid_ids.append(arg)

return valid_ids


if __name__ == "__main__":
jobids = parse_input()

if jobids:
kill_jobs(jobids)
else:
OSLayer.eprint(
"[cluster-cancel error] Did not get any valid jobids to cancel..."
)
7 changes: 6 additions & 1 deletion {{cookiecutter.profile_name}}/lsf_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ def get_status(self) -> str:
)

lsf_status_checker = StatusChecker(
jobid, outlog, kill_unknown=kill_unknown, kill_zombie=kill_zombie
jobid,
outlog,
kill_unknown=kill_unknown,
kill_zombie=kill_zombie,
wait_between_tries=CookieCutter.get_wait_between_tries(),
max_status_checks=CookieCutter.get_max_status_checks(),
)
print(lsf_status_checker.get_status())

0 comments on commit d21dedd

Please sign in to comment.