diff --git a/CHANGELOG.md b/CHANGELOG.md index 50f9450..fb7602d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,12 @@ This document tracks changes to the `master` branch of the profile. ## [Unreleased] +### Added +- Exposed `max_status_check` and `wait_between_tries` for status checker [[#48][48]] + +### Changed +- Cluster cancel is now a script instead of the `bkill` command in order to handle the log file paths that come with the job ID [[#55][55]] + ## [0.2.0] - 28/05/2022 ### Added @@ -91,6 +97,8 @@ This document tracks changes to the `master` branch of the profile. [36]: https://github.com/Snakemake-Profiles/lsf/issues/36 [39]: https://github.com/Snakemake-Profiles/lsf/issues/39 [45]: https://github.com/Snakemake-Profiles/lsf/issues/45 +[48]: https://github.com/Snakemake-Profiles/lsf/issues/48 +[55]: https://github.com/Snakemake-Profiles/lsf/issues/55 [0.1.0]: https://github.com/Snakemake-Profiles/lsf/releases/tag/0.1.0 [0.1.1]: https://github.com/Snakemake-Profiles/lsf/releases/tag/0.1.1 [0.1.2]: https://github.com/Snakemake-Profiles/lsf/releases/tag/0.1.2 diff --git a/README.md b/README.md index ee7e208..40245df 100644 --- a/README.md +++ b/README.md @@ -256,6 +256,18 @@ From the `snakemake --help` menu default is 10, fractions allowed. ``` +#### `max_status_checks` + +**Default**: `1` + +How many times to check the status of a job. + +#### `wait_between_tries` + +**Default**: `0.001` + +How many seconds to wait until checking the status of a job again (if `max_status_checks` is greater than 1). + #### `profile_name` **Default**: `lsf` diff --git a/cookiecutter.json b/cookiecutter.json index 8c38871..14b4b71 100644 --- a/cookiecutter.json +++ b/cookiecutter.json @@ -28,5 +28,7 @@ "default_project": "", "max_status_checks_per_second": 10, "max_jobs_per_second": 10, + "max_status_checks": 1, + "wait_between_tries": 0.001, "profile_name": "lsf" } \ No newline at end of file diff --git a/tests/test_lsf_cancel.py b/tests/test_lsf_cancel.py new file mode 100644 index 0000000..71919d2 --- /dev/null +++ b/tests/test_lsf_cancel.py @@ -0,0 +1,136 @@ +import unittest +from unittest.mock import patch + +from tests.src.OSLayer import OSLayer +from tests.src.lsf_cancel import kill_jobs, parse_input, KILL + + +class TestParseInput(unittest.TestCase): + script = "lsf_cancel.py" + + def test_parse_input_no_args(self): + fake_args = [self.script] + with patch("sys.argv", fake_args): + actual = parse_input() + + assert not actual + + def test_parse_input_one_job_no_log(self): + fake_args = [self.script, "1234"] + with patch("sys.argv", fake_args): + actual = parse_input() + + expected = fake_args[1:] + assert actual == expected + + def test_parse_input_one_job_and_log(self): + fake_args = [self.script, "1234", "log/file.out"] + with patch("sys.argv", fake_args): + actual = parse_input() + + expected = [fake_args[1]] + assert actual == expected + + def test_parse_input_two_jobs_and_log(self): + fake_args = [self.script, "1234", "log/file.out", "9090", "log/other.out"] + with patch("sys.argv", fake_args): + actual = parse_input() + + expected = [fake_args[1], fake_args[3]] + assert actual == expected + + def test_parse_input_two_jobs_and_digits_in_log(self): + fake_args = [self.script, "1234", "log/file.out", "9090", "log/123"] + with patch("sys.argv", fake_args): + actual = parse_input() + + expected = [fake_args[1], fake_args[3]] + assert actual == expected + + def test_parse_input_multiple_args_but_no_jobs(self): + fake_args = [self.script, "log/file.out", "log/123"] + with patch("sys.argv", fake_args): + actual = parse_input() + + assert not actual + + +class TestKillJobs(unittest.TestCase): + @patch.object( + OSLayer, + OSLayer.run_process.__name__, + return_value=( + "Job <123> is being terminated", + "", + ), + ) + def test_kill_jobs_one_job( + self, + run_process_mock, + ): + jobids = ["123"] + expected_kill_cmd = "{} {}".format(KILL, " ".join(jobids)) + + kill_jobs(jobids) + + run_process_mock.assert_called_once_with(expected_kill_cmd, check=False) + + @patch.object( + OSLayer, + OSLayer.run_process.__name__, + return_value=( + "Job <123> is being terminated\nJob <456> is being terminated", + "", + ), + ) + def test_kill_jobs_two_jobs( + self, + run_process_mock, + ): + jobids = ["123", "456"] + expected_kill_cmd = "{} {}".format(KILL, " ".join(jobids)) + + kill_jobs(jobids) + + run_process_mock.assert_called_once_with(expected_kill_cmd, check=False) + + @patch.object( + OSLayer, + OSLayer.run_process.__name__, + return_value=("", ""), + ) + def test_kill_jobs_no_jobs( + self, + run_process_mock, + ): + jobids = [] + + kill_jobs(jobids) + + run_process_mock.assert_not_called() + + @patch.object( + OSLayer, + OSLayer.run_process.__name__, + return_value=("", ""), + ) + def test_kill_jobs_empty_jobs(self, run_process_mock): + jobids = ["", ""] + + kill_jobs(jobids) + + run_process_mock.assert_not_called() + + @patch.object( + OSLayer, + OSLayer.run_process.__name__, + return_value=("", ""), + ) + def test_kill_jobs_empty_job_and_non_empty_job(self, run_process_mock): + jobids = ["", "123"] + + expected_kill_cmd = "{} {}".format(KILL, " ".join(jobids)) + + kill_jobs(jobids) + + run_process_mock.assert_called_once_with(expected_kill_cmd, check=False) diff --git a/{{cookiecutter.profile_name}}/CookieCutter.py b/{{cookiecutter.profile_name}}/CookieCutter.py index 58d2729..5acf8f7 100644 --- a/{{cookiecutter.profile_name}}/CookieCutter.py +++ b/{{cookiecutter.profile_name}}/CookieCutter.py @@ -34,3 +34,11 @@ def get_zombi_behaviour() -> str: @staticmethod def get_latency_wait() -> float: return float("{{cookiecutter.latency_wait}}") + + @staticmethod + def get_wait_between_tries() -> float: + return float("{{cookiecutter.wait_between_tries}}") + + @staticmethod + def get_max_status_checks() -> int: + return int("{{cookiecutter.max_status_checks}}") diff --git a/{{cookiecutter.profile_name}}/OSLayer.py b/{{cookiecutter.profile_name}}/OSLayer.py index 187af60..9e5b8c5 100644 --- a/{{cookiecutter.profile_name}}/OSLayer.py +++ b/{{cookiecutter.profile_name}}/OSLayer.py @@ -36,9 +36,9 @@ def remove_file(file: Path): file.unlink() @staticmethod - def run_process(cmd: str) -> Tuple[stdout, stderr]: + def run_process(cmd: str, check: bool = True) -> Tuple[stdout, stderr]: completed_process = subprocess.run( - cmd, check=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE + cmd, check=check, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) return ( completed_process.stdout.decode().strip(), @@ -49,6 +49,10 @@ def run_process(cmd: str) -> Tuple[stdout, stderr]: def print(string: str): print(string) + @staticmethod + def eprint(string: str): + print(string, file=sys.stderr) + @staticmethod def get_uuid4_string() -> str: return str(uuid.uuid4()) diff --git a/{{cookiecutter.profile_name}}/config.yaml b/{{cookiecutter.profile_name}}/config.yaml index 331c17c..0fb590d 100644 --- a/{{cookiecutter.profile_name}}/config.yaml +++ b/{{cookiecutter.profile_name}}/config.yaml @@ -7,6 +7,6 @@ restart-times: "{{cookiecutter.restart_times}}" jobs: "{{cookiecutter.jobs}}" cluster: "lsf_submit.py" cluster-status: "lsf_status.py" -cluster-cancel: "bkill" +cluster-cancel: "lsf_cancel.py" max-jobs-per-second: "{{cookiecutter.max_jobs_per_second}}" max-status-checks-per-second: "{{cookiecutter.max_status_checks_per_second}}" \ No newline at end of file diff --git a/{{cookiecutter.profile_name}}/lsf_cancel.py b/{{cookiecutter.profile_name}}/lsf_cancel.py new file mode 100755 index 0000000..48399fa --- /dev/null +++ b/{{cookiecutter.profile_name}}/lsf_cancel.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python3 +import re +import shlex +import sys +from pathlib import Path +from typing import List + +if not __name__.startswith("tests.src."): + sys.path.append(str(Path(__file__).parent.absolute())) + from OSLayer import OSLayer +else: + from .OSLayer import OSLayer + +KILL = "bkill" + + +def kill_jobs(ids_to_kill: List[str]): + # we don't want to run bkill with no argument as this will kill the last job + if any(ids_to_kill): + cmd = "{} {}".format(KILL, " ".join(ids_to_kill)) + _ = OSLayer.run_process(cmd, check=False) + + +def parse_input() -> List[str]: + # need to support quoted and unquoted jobid + # see https://github.com/Snakemake-Profiles/lsf/issues/45 + split_args = shlex.split(" ".join(sys.argv[1:])) + valid_ids = [] + for arg in map(str.strip, split_args): + if re.fullmatch(r"\d+", arg): + valid_ids.append(arg) + + return valid_ids + + +if __name__ == "__main__": + jobids = parse_input() + + if jobids: + kill_jobs(jobids) + else: + OSLayer.eprint( + "[cluster-cancel error] Did not get any valid jobids to cancel..." + ) diff --git a/{{cookiecutter.profile_name}}/lsf_status.py b/{{cookiecutter.profile_name}}/lsf_status.py index a0d04e9..b96699a 100755 --- a/{{cookiecutter.profile_name}}/lsf_status.py +++ b/{{cookiecutter.profile_name}}/lsf_status.py @@ -227,6 +227,11 @@ def get_status(self) -> str: ) lsf_status_checker = StatusChecker( - jobid, outlog, kill_unknown=kill_unknown, kill_zombie=kill_zombie + jobid, + outlog, + kill_unknown=kill_unknown, + kill_zombie=kill_zombie, + wait_between_tries=CookieCutter.get_wait_between_tries(), + max_status_checks=CookieCutter.get_max_status_checks(), ) print(lsf_status_checker.get_status())