Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: presets for cubi-tk sodar ingest-fastq (#232 ) #235

Merged
104 changes: 76 additions & 28 deletions cubi_tk/sodar/ingest_fastq.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,40 @@
formatter = logzero.LogFormatter(fmt="%(message)s")
output_logger = logzero.setup_logger(formatter=formatter)

DEFAULT_SRC_REGEX = (
r"(.*/)?(?P<sample>.+?)"
r"(?:_S[0-9]+)?"
r"(?:_(?P<lane>L[0-9]+?))?"
r"(?:_(?P<mate>R[0-9]+?))?"
r"(?:_(?P<batch>[0-9]+?))?"
r"\.f(?:ast)?q\.gz"
)

#: Default value for --dest-pattern
DEFAULT_DEST_PATTERN = r"{collection_name}/raw_data/{date}/{filename}"
SRC_REGEX_PRESETS = {
"default": (
r"(.*/)?(?P<sample>.+?)"
r"(?:_S[0-9]+)?"
r"(?:_(?P<lane>L[0-9]+?))?"
r"(?:_(?P<mate>R[0-9]+?))?"
r"(?:_(?P<batch>[0-9]+?))?"
r"\.f(?:ast)?q\.gz"
),
"digestiflow": (
r"(.*/)?(?P<flowcell>[A-Z0-9]{9,10}?)/"
r"(?P<lane>L[0-9]{3}?)/"
r"(?P<sample>.+?)_"
r"S[0-9]+_L[0-9]{3}_R[0-9]_[0-9]{3}"
r"\.fastq\.gz"
),
"ONT": (
r"(.*/)?"
r"[0-9]{8}_" # Date
# Sample could be <ProjectID>_<LibID>_<SampleID>, but this is not given and may change between projects
r"(?P<sample>[a-zA-Z0-9_-]+?)/"
# RunID is <date>_<time>_<position>_<flowcellID>_<hash>
# Flowcells can be re-used, so taking the whole thing for uniqueness is best
r"(?P<RunID>[0-9]{8}_[0-9]+_[A-Z0-9]+_[A-Z0-9]+_[0-9a-z]+?)/"
r"(?:(?P<subfolder>[a-z0-9_]+/))?"
r".+\.(bam|pod5|txt|json)"
),
}

DEST_PATTERN_PRESETS = {
"default": r"{collection_name}/raw_data/{date}/{filename}",
"digestiflow": r"{collection_name}/raw_data/{flowcell}/{filename}",
"ONT": r"{collection_name}/raw_data/{RunID}/{subfolder}{filename}",
}

#: Default number of parallel transfers.
DEFAULT_NUM_TRANSFERS = 8
Expand Down Expand Up @@ -82,7 +105,11 @@ class SodarIngestFastq(SnappyItransferCommandBase):

def __init__(self, args):
super().__init__(args)
self.dest_pattern_fields = set(re.findall(r"(?<={).+?(?=})", self.args.remote_dir_pattern))
if self.args.remote_dir_pattern:
self.remote_dir_pattern = self.args.remote_dir_pattern
else:
self.remote_dir_pattern = DEST_PATTERN_PRESETS[self.args.preset]
self.dest_pattern_fields = set(re.findall(r"(?<={).+?(?=})", self.remote_dir_pattern))

@classmethod
def setup_argparse(cls, parser: argparse.ArgumentParser) -> None:
Expand Down Expand Up @@ -128,20 +155,29 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None:
action="store_true",
help="After files are transferred to SODAR, it will proceed with validation and move.",
)
parser.add_argument(
"--preset",
default="default",
choices=DEST_PATTERN_PRESETS.keys(),
help=f"Use predefined values for regular expression to find local files (--src-regex) and pattern to for "
f"constructing remote file paths.\nDefault src-regex: {SRC_REGEX_PRESETS['default']}.\n"
f"Default --remote-dir-pattern: {DEST_PATTERN_PRESETS['default']}.",
)
parser.add_argument(
"--src-regex",
default=DEFAULT_SRC_REGEX,
help=f"Regular expression to use for matching input fastq files, default: {DEFAULT_SRC_REGEX}. "
"All capture groups can be used for --remote-dir-pattern, but only 'sample' is used by default. "
"Only this regex controls which files are ingested, so other files than fastq.gz can be used too.",
default=None,
help="Manually defined regular expression to use for matching input fastq files. Takes precedence over "
"--preset. This regex controls which files are ingested, so it can be used for any file type. "
"Any named capture group in the regex can be used with --remote-dir-pattern. The 'sample' group is "
"used to set irods collection names (as-is or via --match-column).",
)
parser.add_argument(
"--remote-dir-pattern",
default=DEFAULT_DEST_PATTERN,
help=f"Pattern to use for constructing remote pattern, default: {DEFAULT_DEST_PATTERN}. "
"'collection_name' is the target iRODS collection and will be filled with the (-m regex modified) "
"'sample' unless --match-column is not used to fill it from the assay table. Any capture group of the "
"src-regex ('sample', 'lane', ...) can be used along with 'date' and 'filename'.",
default=None,
help="Manually defined pattern to use for constructing remote file paths. Takes precedence over "
"--preset. 'collection_name' is the target iRODS collection and will be filled with the (-m regex "
tedil marked this conversation as resolved.
Show resolved Hide resolved
"modified) 'sample', or if --match-column is used with teh corresponding value from the assay table. "
Nicolai-vKuegelgen marked this conversation as resolved.
Show resolved Hide resolved
"Any capture group of the src-regex ('sample', 'lane', ...) can be used along with 'date' and 'filename'.",
)
parser.add_argument(
"--match-column",
Expand Down Expand Up @@ -417,6 +453,12 @@ def build_jobs(self, library_names=None):
folders = self.download_webdav(self.args.sources)
transfer_jobs = []

if self.args.src_regex:
use_regex = self.args.src_regex
else:
use_regex = SRC_REGEX_PRESETS[self.args.preset]
Nicolai-vKuegelgen marked this conversation as resolved.
Show resolved Hide resolved
# logger.debug(f"Using regex: {use_regex}")

for folder in folders:
for path in glob.iglob(f"{folder}/**/*", recursive=True):
real_path = os.path.realpath(path)
Expand All @@ -432,11 +474,10 @@ def build_jobs(self, library_names=None):
): # pragma: nocover
raise MissingFileException("Missing file %s" % (real_path + ".md5"))

m = re.match(self.args.src_regex, path)
# logger.debug(f"Checking file: {path}")
m = re.match(use_regex, path)
if m:
logger.debug(
"Matched %s with regex %s: %s", path, self.args.src_regex, m.groupdict()
)
logger.debug("Matched %s with regex %s: %s", path, use_regex, m.groupdict())
match_wildcards = dict(
item
for item in m.groupdict(default="").items()
Expand All @@ -449,9 +490,7 @@ def build_jobs(self, library_names=None):
sample_name = re.sub(m_pat, r_pat, sample_name)

try:
remote_file = pathlib.Path(
lz_irods_path
) / self.args.remote_dir_pattern.format(
remote_file = pathlib.Path(lz_irods_path) / self.remote_dir_pattern.format(
# Removed the `+ self.args.add_suffix` here, since adding anything after the file extension is a bad idea
filename=pathlib.Path(path).name,
date=self.args.remote_dir_date,
Expand Down Expand Up @@ -495,6 +534,15 @@ def execute(self) -> typing.Optional[int]:

lz_uuid, transfer_jobs = self.build_jobs()
transfer_jobs = sorted(transfer_jobs, key=lambda x: x.path_local)
# Exit early if no files were found/matched
if not transfer_jobs:
if self.args.src_regex:
used_regex = self.args.src_regex
else:
used_regex = SRC_REGEX_PRESETS[self.args.preset]

logger.warning("No matching files were found!\nUsed regex: %s", used_regex)
return None

if self.fix_md5_files:
transfer_jobs = self._execute_md5_files_fix(transfer_jobs)
Expand Down
2 changes: 1 addition & 1 deletion requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ pyflakes
setuptools ==65.6.3

# needed for testing snappy workflow methods
snappy-pipeline @ git+https://github.com/bihealth/snappy-pipeline
snappy-pipeline @ git+https://github.com/bihealth/snappy-pipeline@v0.1.1
180 changes: 173 additions & 7 deletions tests/test_sodar_ingest_fastq.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,66 @@ def test_run_sodar_ingest_fastq_nothing(capsys):
assert res.err


def test_run_sodar_ingest_fastq_src_regex():
from cubi_tk.sodar.ingest_fastq import DEFAULT_SRC_REGEX
def test_run_sodar_ingest_fastq_preset_definitions():
from cubi_tk.sodar.ingest_fastq import DEST_PATTERN_PRESETS, SRC_REGEX_PRESETS

regexes = SRC_REGEX_PRESETS.keys()
patterns = DEST_PATTERN_PRESETS.keys()

# Check that all presets exits for both regex & pattern
assert sorted(regexes) == sorted(patterns)

# Check that all presets are not empty
for preset in regexes:
assert SRC_REGEX_PRESETS[preset]
assert DEST_PATTERN_PRESETS[preset]


def test_run_sodar_ingest_fastq_default_preset_regex():
from cubi_tk.sodar.ingest_fastq import SRC_REGEX_PRESETS

## Test default regex
# Collection of example filenames and the expected {sample} value the regex should capture
test_filenames = {
"Sample1-N1-RNA1-RNA_seq1.fastq.gz": "Sample1-N1-RNA1-RNA_seq1",
"P1234_Samplename_S14_L006_R2_001.fastq.gz": "P1234_Samplename",
"P1234_Samplename2_R1.fastq.gz": "P1234_Samplename2",
}

for test_filename, expected_sample in test_filenames.items():
res = re.match(DEFAULT_SRC_REGEX, test_filename)
res = re.match(SRC_REGEX_PRESETS["default"], test_filename)
assert res is not None
assert res.groupdict()["sample"] == expected_sample


def test_run_sodar_ingest_fastq_ont_preset_regex():
from cubi_tk.sodar.ingest_fastq import SRC_REGEX_PRESETS

test_filenames = {
"fake_base_path/20240101_A0000_sample1/20240101_0000_A1_AB12345_000xyz/bam_fail/"
"AB12345_000xyz_pass_1c1234_0.bam": (
"A0000_sample1",
"20240101_0000_A1_AB12345_000xyz",
"bam_fail/",
),
"fake_base_path/20240101_A0000_sample1/20240101_0000_A1_AB12345_000xyz/"
"final_summary_AB12345_000xyz_1c1234_0.txt": (
"A0000_sample1",
"20240101_0000_A1_AB12345_000xyz",
None,
),
}
for test_filename, expected_res in test_filenames.items():
res = re.match(SRC_REGEX_PRESETS["ONT"], test_filename)
assert res is not None
groups = res.groupdict()
assert groups["sample"] == expected_res[0]
assert groups["RunID"] == expected_res[1]
if expected_res[2]:
assert groups["subfolder"] == expected_res[2]
else:
assert groups["subfolder"] is None


@patch("cubi_tk.sodar.ingest_fastq.api.samplesheet.retrieve")
@patch("cubi_tk.sodar.ingest_fastq.api.samplesheet.export")
def test_run_sodar_ingest_fastq_get_match_to_collection_mapping(mock_api_export, mock_api_retrieve):
Expand Down Expand Up @@ -128,7 +172,7 @@ def test_run_sodar_ingest_fastq_smoke_test(mocker, requests_mock):
# --- setup arguments
irods_path = "/irods/dest"
landing_zone_uuid = "466ab946-ce6a-4c78-9981-19b79e7bbe86"
dest_path = "target/folder/"
# dest_path = "target/folder/"
fake_base_path = "/base/path"
argv = [
"--verbose",
Expand All @@ -139,8 +183,8 @@ def test_run_sodar_ingest_fastq_smoke_test(mocker, requests_mock):
"--sodar-api-token",
"XXXX",
"--yes",
"--remote-dir-pattern",
dest_path,
# "--remote-dir-pattern",
# dest_path,
fake_base_path,
landing_zone_uuid,
]
Expand Down Expand Up @@ -233,3 +277,125 @@ def test_run_sodar_ingest_fastq_smoke_test(mocker, requests_mock):
ingestfastq = SodarIngestFastq(args)
lz, actual = ingestfastq.build_jobs()
assert len(actual) == len(fake_file_paths)


def test_run_sodar_ingest_fastq_smoke_test_ont_preset(mocker, requests_mock):
# --- setup arguments
irods_path = "/irods/dest"
landing_zone_uuid = "466ab946-ce6a-4c78-9981-19b79e7bbe86"
# dest_path = "target/folder/"
fake_base_path = "/base/path"
argv = [
"--verbose",
"sodar",
"ingest-fastq",
"--num-parallel-transfers",
"0",
"--sodar-api-token",
"XXXX",
"--yes",
"--preset",
"ONT",
# "--remote-dir-pattern",
# dest_path,
tedil marked this conversation as resolved.
Show resolved Hide resolved
fake_base_path,
landing_zone_uuid,
]

parser, _subparsers = setup_argparse()
args = parser.parse_args(argv)

# Setup fake file system but only patch selected modules. We cannot use the Patcher approach here as this would
# break biomedsheets.
fs = fake_filesystem.FakeFilesystem()
fake_os = fake_filesystem.FakeOsModule(fs)
fake_pl = fake_pathlib.FakePathlibModule(fs)

# --- add test files
fake_file_paths = []
date = "20240101"
project_sample_id = "A0000_sample{n}"
for sample_n in (1, 2, 3):
sample_path = project_sample_id.format(n=sample_n)
# date _ time _ positions _ ID _ hash
flowcell_id_hash = "AB12345_000xyz"
flowcellrun = f"{date}_0000_1A_{flowcell_id_hash}"
for file_pattern in (
f"{fake_base_path}/{date}_{sample_path}/{flowcellrun}/bam_fail/{flowcell_id_hash}_pass_{sample_n}c1234_0.bam",
f"{fake_base_path}/{date}_{sample_path}/{flowcellrun}/bam_pass/{flowcell_id_hash}_fail_{sample_n}c1234_0.bam",
f"{fake_base_path}/{date}_{sample_path}/{flowcellrun}/final_summary_{flowcell_id_hash}_{sample_n}c1234_0.txt",
f"{fake_base_path}/{date}_{sample_path}/{flowcellrun}/pod5/{flowcell_id_hash}_{sample_n}c1234_0.pod5",
f"{fake_base_path}/{date}_{sample_path}/{flowcellrun}/report_{flowcell_id_hash}.html",
f"{fake_base_path}/{date}_{sample_path}/{flowcellrun}/report_{flowcell_id_hash}.json",
f"{fake_base_path}/{date}_{sample_path}/{flowcellrun}/sequencing_summary_{flowcell_id_hash}_{sample_n}c1234_0.txt",
):
for ext in ("", ".md5"):
fake_file_paths.append(file_pattern + ext)
fs.create_file(fake_file_paths[-1])

# Remove MD5 file for sample 1 fail bam, so it is recreated.
fs.remove(fake_file_paths[3])

# --- mock modules
mocker.patch("glob.os", fake_os)
mocker.patch("cubi_tk.snappy.itransfer_common.os", fake_os)
mocker.patch(
"cubi_tk.snappy.itransfer_common.SnappyItransferCommandBase.get_sodar_info",
my_get_sodar_info,
)

mock_check_output = mock.MagicMock(return_value=0)
mocker.patch("cubi_tk.irods_common.iRODSTransfer.put", mock_check_output)

mock_check_call = mock.MagicMock(return_value=0)
mocker.patch("cubi_tk.snappy.itransfer_common.check_call", mock_check_call)
mocker.patch("cubi_tk.sodar.ingest_fastq.check_call", mock_check_call)

mocker.patch("cubi_tk.sodar.ingest_fastq.pathlib", fake_pl)
mocker.patch("cubi_tk.sodar.ingest_fastq.os", fake_os)

fake_open = fake_filesystem.FakeFileOpen(fs)
mocker.patch("cubi_tk.snappy.itransfer_common.open", fake_open)
mocker.patch("cubi_tk.sodar.ingest_fastq.open", fake_open)

# necessary because independent test fail
mock_value = mock.MagicMock()
mocker.patch("cubi_tk.sodar.ingest_fastq.Value", mock_value)
mocker.patch("cubi_tk.snappy.itransfer_common.Value", mock_value)

# requests mock
return_value = dict(
assay="",
config_data="",
configuration="",
date_modified="",
description="",
irods_path=irods_path,
project="",
sodar_uuid="",
status="",
status_info="",
title="",
user=dict(sodar_uuid="", username="", name="", email=""),
)
url = os.path.join(args.sodar_url, "landingzones", "api", "retrieve", args.destination)
requests_mock.register_uri("GET", url, text=json.dumps(return_value))

# --- run tests
res = main(argv)

assert not res

assert mock_check_call.call_count == 1
assert mock_check_call.call_args[0] == (["md5sum", "AB12345_000xyz_fail_1c1234_0.bam"],)

# The upload logic for multiple transfers/files has been moved into the iRODScommon classes
# We just need one call to that here
assert mock_check_output.call_count == 1

# Test that the TransferJob contain all files, except html (3x2 for md5s)
parser, _subparsers = setup_argparse()
args = parser.parse_args(argv)
ingestfastq = SodarIngestFastq(args)
lz, actual = ingestfastq.build_jobs()
assert len(actual) == len(fake_file_paths) - 6
Loading