forked from Snakemake-Profiles/lsf
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
185 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
import unittest | ||
from unittest.mock import patch | ||
|
||
from tests.src.OSLayer import OSLayer | ||
from tests.src.lsf_cancel import kill_jobs, parse_input, KILL | ||
|
||
|
||
class TestParseInput(unittest.TestCase): | ||
script = "lsf_cancel.py" | ||
|
||
def test_parse_input_no_args(self): | ||
fake_args = [self.script] | ||
with patch("sys.argv", fake_args): | ||
actual = parse_input() | ||
|
||
assert not actual | ||
|
||
def test_parse_input_one_job_no_log(self): | ||
fake_args = [self.script, "1234"] | ||
with patch("sys.argv", fake_args): | ||
actual = parse_input() | ||
|
||
expected = fake_args[1:] | ||
assert actual == expected | ||
|
||
def test_parse_input_one_job_and_log(self): | ||
fake_args = [self.script, "1234", "log/file.out"] | ||
with patch("sys.argv", fake_args): | ||
actual = parse_input() | ||
|
||
expected = [fake_args[1]] | ||
assert actual == expected | ||
|
||
def test_parse_input_two_jobs_and_log(self): | ||
fake_args = [self.script, "1234", "log/file.out", "9090", "log/other.out"] | ||
with patch("sys.argv", fake_args): | ||
actual = parse_input() | ||
|
||
expected = [fake_args[1], fake_args[3]] | ||
assert actual == expected | ||
|
||
def test_parse_input_two_jobs_and_digits_in_log(self): | ||
fake_args = [self.script, "1234", "log/file.out", "9090", "log/123"] | ||
with patch("sys.argv", fake_args): | ||
actual = parse_input() | ||
|
||
expected = [fake_args[1], fake_args[3]] | ||
assert actual == expected | ||
|
||
def test_parse_input_multiple_args_but_no_jobs(self): | ||
fake_args = [self.script, "log/file.out", "log/123"] | ||
with patch("sys.argv", fake_args): | ||
actual = parse_input() | ||
|
||
assert not actual | ||
|
||
|
||
class TestKillJobs(unittest.TestCase): | ||
@patch.object( | ||
OSLayer, | ||
OSLayer.run_process.__name__, | ||
return_value=( | ||
"Job <123> is being terminated", | ||
"", | ||
), | ||
) | ||
def test_kill_jobs_one_job( | ||
self, | ||
run_process_mock, | ||
): | ||
jobids = ["123"] | ||
expected_kill_cmd = "{} {}".format(KILL, " ".join(jobids)) | ||
|
||
kill_jobs(jobids) | ||
|
||
run_process_mock.assert_called_once_with(expected_kill_cmd) | ||
|
||
@patch.object( | ||
OSLayer, | ||
OSLayer.run_process.__name__, | ||
return_value=( | ||
"Job <123> is being terminated\nJob <456> is being terminated", | ||
"", | ||
), | ||
) | ||
def test_kill_jobs_two_jobs( | ||
self, | ||
run_process_mock, | ||
): | ||
jobids = ["123", "456"] | ||
expected_kill_cmd = "{} {}".format(KILL, " ".join(jobids)) | ||
|
||
kill_jobs(jobids) | ||
|
||
run_process_mock.assert_called_once_with(expected_kill_cmd) | ||
|
||
@patch.object( | ||
OSLayer, | ||
OSLayer.run_process.__name__, | ||
return_value=("", ""), | ||
) | ||
def test_kill_jobs_no_jobs( | ||
self, | ||
run_process_mock, | ||
): | ||
jobids = [] | ||
|
||
kill_jobs(jobids) | ||
|
||
run_process_mock.assert_not_called() | ||
|
||
@patch.object( | ||
OSLayer, | ||
OSLayer.run_process.__name__, | ||
return_value=("", ""), | ||
) | ||
def test_kill_jobs_empty_jobs(self, run_process_mock): | ||
jobids = ["", ""] | ||
|
||
kill_jobs(jobids) | ||
|
||
run_process_mock.assert_not_called() | ||
|
||
@patch.object( | ||
OSLayer, | ||
OSLayer.run_process.__name__, | ||
return_value=("", ""), | ||
) | ||
def test_kill_jobs_empty_job_and_non_empty_job(self, run_process_mock): | ||
jobids = ["", "123"] | ||
|
||
expected_kill_cmd = "{} {}".format(KILL, " ".join(jobids)) | ||
|
||
kill_jobs(jobids) | ||
|
||
run_process_mock.assert_called_once_with(expected_kill_cmd) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
#!/usr/bin/env python3 | ||
import re | ||
import shlex | ||
import sys | ||
from pathlib import Path | ||
from typing import List | ||
|
||
if not __name__.startswith("tests.src."): | ||
sys.path.append(str(Path(__file__).parent.absolute())) | ||
from OSLayer import OSLayer | ||
else: | ||
from .OSLayer import OSLayer | ||
|
||
KILL = "bkill" | ||
|
||
|
||
def kill_jobs(ids_to_kill: List[str]): | ||
# we don't want to run bkill with no argument as this will kill the last job | ||
if any(ids_to_kill): | ||
cmd = "{} {}".format(KILL, " ".join(ids_to_kill)) | ||
_ = OSLayer.run_process(cmd) | ||
|
||
|
||
def parse_input() -> List[str]: | ||
# need to support quoted and unquoted jobid | ||
# see https://github.com/Snakemake-Profiles/lsf/issues/45 | ||
split_args = shlex.split(" ".join(sys.argv[1:])) | ||
valid_ids = [] | ||
for arg in map(str.strip, split_args): | ||
if re.fullmatch(r"\d+", arg): | ||
valid_ids.append(arg) | ||
|
||
return valid_ids | ||
|
||
|
||
if __name__ == "__main__": | ||
jobids = parse_input() | ||
|
||
if jobids: | ||
kill_jobs(jobids) | ||
else: | ||
OSLayer.eprint( | ||
"[cluster-cancel error] Did not get any valid jobids to cancel..." | ||
) |