Skip to content

Commit

Permalink
feat: Adding cubi-tk snappy itransfer_sv_calling (#213)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolai-vKuegelgen authored Jan 26, 2024
1 parent 98e21c1 commit 5a00e40
Show file tree
Hide file tree
Showing 3 changed files with 377 additions and 0 deletions.
10 changes: 10 additions & 0 deletions cubi_tk/snappy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
Transfer results and logs from ``output`` directory of ``ngs_mapping``.
``itransfer-variant-calling``
Transfer results and logs from ``output`` directory of ``variant_calling``.
``itransfer-sv-calling``
Transfer results and logs from ``output`` directory of ``sv_calling`` or ``sv_calling_targeted``.
``itransfer-step``
Transfer results and logs from ``output`` directory of any snappy pipeline step.
``pull-sheet``
Expand Down Expand Up @@ -45,6 +47,7 @@
)
from .itransfer_raw_data import setup_argparse as setup_argparse_itransfer_raw_data
from .itransfer_step import setup_argparse as setup_argparse_itransfer_step
from .itransfer_sv_calling import setup_argparse as setup_argparse_itransfer_sv_calling
from .itransfer_variant_calling import (
setup_argparse as setup_argparse_itransfer_variant_calling,
)
Expand Down Expand Up @@ -90,6 +93,13 @@ def setup_argparse(parser: argparse.ArgumentParser) -> None:
)
)

setup_argparse_itransfer_sv_calling(
subparsers.add_parser(
"itransfer-sv-calling",
help="Transfer sv_calling or sv_calling_targeted results into iRODS landing zone",
)
)

setup_argparse_itransfer_step(
subparsers.add_parser(
"itransfer-step", help="Transfer snappy step results into iRODS landing zone"
Expand Down
103 changes: 103 additions & 0 deletions cubi_tk/snappy/itransfer_sv_calling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""``cubi-tk snappy itransfer-variant-calling``: transfer variant_calling results into iRODS landing zone."""

import argparse
import os
import typing

from logzero import logger
import yaml

from . import common
from .itransfer_common import IndexLibrariesOnlyMixin, SnappyItransferCommandBase

#: Template string for variant_calling results files.
TPL_INPUT_DIR = "%(step_name)s/output/%(mapper)s.%(caller)s.%(library_name)s"


class SnappyStepNotFoundException(Exception):
def __str__(self):
return "snappy-pipeline config does not define the expected steps this function needs."


class SnappyItransferSvCallingCommand(IndexLibrariesOnlyMixin, SnappyItransferCommandBase):
"""Implementation of snappy itransfer command for variant calling results."""

fix_md5_files = True
command_name = "itransfer-sv-calling"
step_names = ("sv_calling", "sv_calling_targeted")
start_batch_in_family = True

def __init__(self, args):
super().__init__(args)

path = common.find_snappy_root_dir(self.args.base_path or os.getcwd())
with open(path / ".snappy_pipeline/config.yaml", "rt") as f:
config = yaml.safe_load(f)
self.step_name = None
for step_name in self.__class__.step_names:
if not self.step_name and step_name in config["step_config"]:
self.step_name = step_name
elif self.step_name and step_name in config["step_config"]:
raise SnappyStepNotFoundException(
f"Found multiple sv-calling step names in config.yaml. Only one of {', '.join(self.__class__.step_names)} is allowed."
)
if not self.step_name:
raise SnappyStepNotFoundException(
f"Could not find any sv-calling step name in 'config.yaml'. Was looking for one of: {', '.join(self.__class__.step_names)}"
)

self.defined_callers = config["step_config"][self.step_name]["tools"]

@classmethod
def setup_argparse(cls, parser: argparse.ArgumentParser) -> None:
super().setup_argparse(parser)
parser.add_argument(
"--mapper",
help="Name of the mapper to transfer for, defaults to bwa_mem2.",
default="bwa_mem2",
)
parser.add_argument(
"--caller",
help="Name of the variant caller to transfer for. Defaults to all callers defined in config",
default="all-defined",
)

@classmethod
def run(
cls, args, _parser: argparse.ArgumentParser, _subparser: argparse.ArgumentParser
) -> typing.Optional[int]:
"""Entry point into the command."""
return cls(args).execute_multi()

def execute_multi(self) -> typing.Optional[int]:
"""Execute the transfer."""
ret = 0
if self.args.caller == "all-defined":
logger.info("Starting cubi-tk snappy sv-calling for multiple callers")
for caller in self.defined_callers:
self.args.caller = caller
ret = self.execute() or ret
else:
ret = self.execute()

return int(ret)

def build_base_dir_glob_pattern(self, library_name: str) -> typing.Tuple[str, str]:
return (
os.path.join(
self.args.base_path,
TPL_INPUT_DIR
% {
"step_name": self.step_name,
"mapper": self.args.mapper,
"caller": self.args.caller,
"library_name": library_name,
},
),
"**",
)


def setup_argparse(parser: argparse.ArgumentParser) -> None:
"""Setup argument parser for ``cubi-tk snappy itransfer-variant-calling``."""
return SnappyItransferSvCallingCommand.setup_argparse(parser)
264 changes: 264 additions & 0 deletions tests/test_snappy_itransfer_sv_calling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
"""Tests for ``cubi_tk.snappy.itransfer_variant_calling``.
We only run some smoke tests here.
"""

import os
import textwrap
from unittest import mock
from unittest.mock import ANY

from pyfakefs import fake_filesystem
import pytest

from cubi_tk.__main__ import main, setup_argparse
from cubi_tk.snappy.itransfer_sv_calling import (
SnappyItransferSvCallingCommand,
SnappyStepNotFoundException,
)

from .conftest import my_exists, my_get_sodar_info


def fake_config(n_tools=1):
"""Return configuration text"""
head = textwrap.dedent(
r"""
static_data_config: {}
step_config:
"""
).lstrip()

tool1 = textwrap.dedent(
r"""
sv_calling_targeted:
tools:
- gcnv
- manta
dummy_line
"""
).rstrip("dummy_line\n")

tool2 = textwrap.dedent(
r"""
sv_calling:
tools:
- gcnv
- manta
dummy_line
"""
).rstrip("dummy_line\n")

tail = textwrap.dedent(
r"""
data_sets:
first_batch:
sodar_uuid: 466ab946-ce6a-4c78-9981-19b79e7bbe86
file: sheet.tsv
search_patterns:
- {'left': '*/*/*_R1.fastq.gz', 'right': '*/*/*_R2.fastq.gz'}
search_paths: ['/path']
type: germline_variants
naming_scheme: only_secondary_id
"""
)

if n_tools == 0:
return head.rstrip() + " {}\n" + tail
if n_tools == 1:
return head + tool1 + tail
if n_tools == 2:
return head + tool1 + tool2 + tail


def test_run_snappy_itransfer_sv_calling_help(capsys):
parser, _subparsers = setup_argparse()
with pytest.raises(SystemExit) as e:
parser.parse_args(["snappy", "itransfer-sv-calling", "--help"])

assert e.value.code == 0

res = capsys.readouterr()
assert res.out
assert not res.err


def test_run_snappy_itransfer_sv_calling_nothing(capsys):
parser, _subparsers = setup_argparse()

with pytest.raises(SystemExit) as e:
parser.parse_args(["snappy", "itransfer-sv-calling"])

assert e.value.code == 2

res = capsys.readouterr()
assert not res.out
assert res.err


def test_run_snappy_itransfer_sv_calling_no_sv_step(fs):
fake_base_path = "/base/path"
sodar_uuid = "466ab946-ce6a-4c78-9981-19b79e7bbe86"
argv = [
"--verbose",
"snappy",
"itransfer-sv-calling",
"--base-path",
fake_base_path,
"--sodar-api-token",
"XXXX",
sodar_uuid,
]

no_sv_config = fake_config(0)
print(no_sv_config)
fs.create_file(
os.path.join(fake_base_path, ".snappy_pipeline/config.yaml"),
contents=no_sv_config,
create_missing_dirs=True,
)

parser, _subparsers = setup_argparse()
args = parser.parse_args(argv)
with pytest.raises(SnappyStepNotFoundException):
SnappyItransferSvCallingCommand(args)


def test_run_snappy_itransfer_sv_calling_two_sv_steps(fs):
fake_base_path = "/base/path"
sodar_uuid = "466ab946-ce6a-4c78-9981-19b79e7bbe86"
argv = [
"--verbose",
"snappy",
"itransfer-sv-calling",
"--base-path",
fake_base_path,
"--sodar-api-token",
"XXXX",
sodar_uuid,
]

no_sv_config = fake_config(2)
print(no_sv_config)
fs.create_file(
os.path.join(fake_base_path, ".snappy_pipeline/config.yaml"),
contents=no_sv_config,
create_missing_dirs=True,
)

parser, _subparsers = setup_argparse()
args = parser.parse_args(argv)
with pytest.raises(SnappyStepNotFoundException):
SnappyItransferSvCallingCommand(args)


def test_run_snappy_itransfer_sv_calling_smoke_test(mocker, germline_trio_sheet_tsv):
fake_base_path = "/base/path"
dest_path = "/irods/dest"
sodar_uuid = "466ab946-ce6a-4c78-9981-19b79e7bbe86"
argv = [
"--verbose",
"snappy",
"itransfer-sv-calling",
"--base-path",
fake_base_path,
"--sodar-api-token",
"XXXX",
# tsv_path,
sodar_uuid,
]

# Setup fake file system but only patch selected modules. We cannot use the Patcher approach here as this would
# break both biomedsheets and multiprocessing.
fs = fake_filesystem.FakeFilesystem()

fake_file_paths = []
for member in ("index",):
for ext in ("", ".md5"):
fake_file_paths.append(
"%s/sv_calling_targeted/output/bwa_mem2.gcnv.%s-N1-DNA1-WES1/out/bwa_mem2.gcnv.%s-N1-DNA1-WES1.vcf.gz%s"
% (fake_base_path, member, member, ext)
)
fs.create_file(fake_file_paths[-1])
fake_file_paths.append(
"%s/sv_calling_targeted/output/bwa_mem2.manta.%s-N1-DNA1-WES1/out/bwa_mem2.manta.%s-N1-DNA1-WES1.vcf.gz%s"
% (fake_base_path, member, member, ext)
)
fs.create_file(fake_file_paths[-1])
fake_file_paths.append(
"%s/sv_calling_targeted/output/bwa_mem2.gcnv.%s-N1-DNA1-WES1/log/bwa_mem2.gcnv.%s-N1-DNA1-WES1.log%s"
% (fake_base_path, member, member, ext)
)
fs.create_file(fake_file_paths[-1])
# Create sample sheet in fake file system
sample_sheet_path = fake_base_path + "/.snappy_pipeline/sheet.tsv"
fs.create_file(sample_sheet_path, contents=germline_trio_sheet_tsv, create_missing_dirs=True)
# Create config in fake file system
config_path = fake_base_path + "/.snappy_pipeline/config.yaml"
fs.create_file(config_path, contents=fake_config(), create_missing_dirs=True)

# Print path to all created files
print(fake_config())
print("\n".join(fake_file_paths + [sample_sheet_path, config_path]))

# Remove index's log MD5 file again so it is recreated.
fs.remove(fake_file_paths[3])

# Set Mocker
mocker.patch("pathlib.Path.exists", my_exists)
mocker.patch(
"cubi_tk.snappy.itransfer_common.SnappyItransferCommandBase.get_sodar_info",
my_get_sodar_info,
)

fake_os = fake_filesystem.FakeOsModule(fs)
mocker.patch("glob.os", fake_os)
mocker.patch("cubi_tk.snappy.itransfer_common.os", fake_os)
mocker.patch("cubi_tk.snappy.itransfer_variant_calling.os", fake_os)

mock_check_output = mock.mock_open()
mocker.patch("cubi_tk.snappy.itransfer_common.check_output", mock_check_output)

fake_open = fake_filesystem.FakeFileOpen(fs)
mocker.patch("cubi_tk.snappy.itransfer_sv_calling.open", fake_open)
mocker.patch("cubi_tk.snappy.itransfer_common.open", fake_open)
mocker.patch("cubi_tk.snappy.common.open", fake_open)

mock_check_call = mock.mock_open()
mocker.patch("cubi_tk.snappy.itransfer_common.check_call", mock_check_call)

# Actually exercise code and perform test.
parser, _subparsers = setup_argparse()
args = parser.parse_args(argv)
res = main(argv)

assert not res

# We do not care about call order but simply test call count and then assert that all files are there which would
# be equivalent of comparing sets of files.

assert fs.exists(fake_file_paths[3])

assert mock_check_call.call_count == 1
mock_check_call.assert_called_once_with(
["md5sum", "bwa_mem2.gcnv.index-N1-DNA1-WES1.vcf.gz"],
cwd=os.path.dirname(fake_file_paths[3]),
stdout=ANY,
)

assert mock_check_output.call_count == len(fake_file_paths) * 3
for path in fake_file_paths:
mapper_index, rel_path = os.path.relpath(
path, os.path.join(fake_base_path, "sv_calling_targeted/output")
).split("/", 1)
_mapper, index = mapper_index.rsplit(".", 1)
remote_path = os.path.join(
dest_path, index, "sv_calling_targeted", args.remote_dir_date, rel_path
)
expected_mkdir_argv = ["imkdir", "-p", os.path.dirname(remote_path)]
expected_irsync_argv = ["irsync", "-a", "-K", path, "i:%s" % remote_path]
expected_ils_argv = ["ils", os.path.dirname(remote_path)]
mock_check_output.assert_any_call(expected_mkdir_argv)
mock_check_output.assert_any_call(expected_irsync_argv)
mock_check_output.assert_any_call(expected_ils_argv, stderr=-2)

0 comments on commit 5a00e40

Please sign in to comment.