Skip to content

Commit

Permalink
feat: Switching cubi-tk sodar ingest-fastq from icommands to irods_…
Browse files Browse the repository at this point in the history
…common (#217)
  • Loading branch information
Nicolai-vKuegelgen authored Jan 25, 2024
1 parent f2303fc commit cd9a3b9
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 62 deletions.
158 changes: 112 additions & 46 deletions cubi_tk/sodar/ingest_fastq.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,26 @@
import os
import pathlib
import re
from subprocess import SubprocessError, check_output
from subprocess import SubprocessError, check_call, check_output
import sys
import typing

import logzero
from logzero import logger
from sodar_cli import api
import tqdm

from ..common import check_irods_icommands, load_toml_config, sizeof_fmt
from ..common import load_toml_config, sizeof_fmt
from ..exceptions import MissingFileException, ParameterException, UserCanceledException
from ..snappy.itransfer_common import (
SnappyItransferCommandBase,
TransferJob,
irsync_transfer,
)
from ..irods_common import TransferJob, iRODSTransfer
from ..snappy.itransfer_common import SnappyItransferCommandBase

# for testing
logger.propagate = True

# no-frills logger
formatter = logzero.LogFormatter(fmt="%(message)s")
output_logger = logzero.setup_logger(formatter=formatter)

DEFAULT_SRC_REGEX = (
r"(.*/)?(?P<sample>.+?)"
Expand All @@ -41,6 +46,34 @@
DEFAULT_NUM_TRANSFERS = 8


def compute_md5sum(job: TransferJob, counter: Value, t: tqdm.tqdm) -> None:
"""Compute MD5 sum with ``md5sum`` command."""
dirname = os.path.dirname(job.path_local)
filename = os.path.basename(job.path_local)[: -len(".md5")]
path_md5 = job.path_local

md5sum_argv = ["md5sum", filename]
logger.debug("Computing MD5sum %s > %s", " ".join(md5sum_argv), filename + ".md5")
try:
with open(path_md5, "wt") as md5f:
check_call(md5sum_argv, cwd=dirname, stdout=md5f)
except SubprocessError as e: # pragma: nocover
logger.error("Problem executing md5sum: %s", e)
logger.info("Removing file after error: %s", path_md5)
try:
os.remove(path_md5)
except OSError as e_rm: # pragma: nocover
logger.error("Could not remove file: %s", e_rm)
raise e

with counter.get_lock():
counter.value = os.path.getsize(job.path_local[: -len(".md5")])
try:
t.update(counter.value)
except TypeError:
pass # swallow, pyfakefs and multiprocessing don't lik each other


class SodarIngestFastq(SnappyItransferCommandBase):
"""Implementation of sodar ingest-fastq command."""

Expand Down Expand Up @@ -75,6 +108,14 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None:
default=DEFAULT_NUM_TRANSFERS,
help="Number of parallel transfers, defaults to %s" % DEFAULT_NUM_TRANSFERS,
)
parser.add_argument(
"-s",
"--sync",
default=False,
action="store_true",
help="Skip upload of files already present in remote collection.",
)

parser.add_argument(
"--yes",
default=False,
Expand Down Expand Up @@ -154,9 +195,10 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None:

def check_args(self, args):
"""Called for checking arguments, override to change behaviour."""
# Check presence of icommands when not testing.
if "pytest" not in sys.modules: # pragma: nocover
check_irods_icommands(warn_only=False)
# DEPRECATING
# # Check presence of icommands when not testing.
# if "pytest" not in sys.modules: # pragma: nocover
# check_irods_icommands(warn_only=False)
res = 0

toml_config = load_toml_config(args)
Expand Down Expand Up @@ -326,9 +368,7 @@ def download_webdav(self, sources):
folders = []
for src in sources:
if re.match("davs://", src):
download_jobs.append(
TransferJob(path_src="i:" + src, path_dest=self.args.tmp, bytes=1)
)
download_jobs.append(TransferJob(path_remote=src, path_local=self.args.tmp))
tmp_folder = f"tmp_folder_{len(download_jobs)}"
pathlib.Path(tmp_folder).mkdir(parents=True, exist_ok=True)
else:
Expand All @@ -337,7 +377,7 @@ def download_webdav(self, sources):
if download_jobs:
logger.info("Planning to download folders...")
for job in download_jobs:
logger.info(" %s => %s", job.path_src, job.path_dest)
logger.info(" %s => %s", job.path_remote, job.path_local)
if not self.args.yes and not input("Is this OK? [yN] ").lower().startswith("y"):
logger.error("OK, breaking at your request")
return []
Expand Down Expand Up @@ -435,19 +475,14 @@ def build_jobs(self, library_names=None):
# remote_file = re.sub(m_pat, r_pat, remote_file)

for ext in ("", ".md5"):
try:
size = os.path.getsize(real_path + ext)
except OSError: # pragma: nocover
size = 0
transfer_jobs.append(
TransferJob(
path_src=real_path + ext,
path_dest=str(remote_file) + ext,
bytes=size,
path_local=real_path + ext,
path_remote=str(remote_file) + ext,
)
)

return lz_irods_path, tuple(sorted(transfer_jobs))
return lz_irods_path, tuple(sorted(transfer_jobs, key=lambda x: x.path_local))

def execute(self) -> typing.Optional[int]:
"""Execute the transfer."""
Expand All @@ -459,36 +494,25 @@ def execute(self) -> typing.Optional[int]:
logger.info(" args: %s", self.args)

lz_uuid, transfer_jobs = self.build_jobs()
logger.debug("Transfer jobs:\n%s", "\n".join(map(lambda x: x.to_oneline(), transfer_jobs)))
transfer_jobs = sorted(transfer_jobs, key=lambda x: x.path_local)

if self.fix_md5_files:
transfer_jobs = self._execute_md5_files_fix(transfer_jobs)

logger.info("Planning to transfer the files as follows...")
# Final go from user & transfer
itransfer = iRODSTransfer(transfer_jobs, ask=not self.args.yes)
logger.info("Planning to transfer the following files:")
for job in transfer_jobs:
logger.info(" %s => %s", job.path_src, job.path_dest)
if not self.args.yes and not input("Is this OK? [yN] ").lower().startswith("y"):
logger.error("OK, breaking at your request")
return 1
output_logger.info(job.path_local)
logger.info(f"With a total size of {sizeof_fmt(itransfer.size)}")

total_bytes = sum([job.bytes for job in transfer_jobs])
logger.info(
"Transferring %d files with a total size of %s",
len(transfer_jobs),
sizeof_fmt(total_bytes),
)
if not self.args.yes:
if not input("Is this OK? [y/N] ").lower().startswith("y"): # pragma: no cover
logger.info("Aborting at your request.")
sys.exit(0)

counter = Value(c_ulonglong, 0)
with tqdm.tqdm(total=total_bytes, unit="B", unit_scale=True) as t:
if self.args.num_parallel_transfers == 0: # pragma: nocover
for job in transfer_jobs:
irsync_transfer(job, counter, t)
else:
pool = ThreadPool(processes=self.args.num_parallel_transfers)
for job in transfer_jobs:
pool.apply_async(irsync_transfer, args=(job, counter, t))
pool.close()
pool.join()
itransfer.put(recursive=True, sync=self.args.sync)
logger.info("File transfer complete.")

# Validate and move transferred files
# Behaviour: If flag is True and lz uuid is not None*,
Expand All @@ -502,11 +526,53 @@ def execute(self) -> typing.Optional[int]:
logger.info("All done")
return None

def _execute_md5_files_fix(
self, transfer_jobs: typing.Tuple[TransferJob, ...]
) -> typing.Tuple[TransferJob, ...]:
"""Create missing MD5 files."""
ok_jobs = []
todo_jobs = []
for job in transfer_jobs:
if not os.path.exists(job.path_local):
todo_jobs.append(job)
else:
ok_jobs.append(job)

total_bytes = sum([os.path.getsize(j.path_local[: -len(".md5")]) for j in todo_jobs])
logger.info(
"Computing MD5 sums for %s files of %s with up to %d processes",
len(todo_jobs),
sizeof_fmt(total_bytes),
self.args.num_parallel_transfers,
)
logger.info("Missing MD5 files:\n%s", "\n".join(map(lambda j: j.path_local, todo_jobs)))
counter = Value(c_ulonglong, 0)
with tqdm.tqdm(total=total_bytes, unit="B", unit_scale=True) as t:
if self.args.num_parallel_transfers == 0: # pragma: nocover
for job in todo_jobs:
compute_md5sum(job, counter, t)
else:
pool = ThreadPool(processes=self.args.num_parallel_transfers)
for job in todo_jobs:
pool.apply_async(compute_md5sum, args=(job, counter, t))
pool.close()
pool.join()

# Finally, determine file sizes after done.
done_jobs = [
TransferJob(
path_local=j.path_local,
path_remote=j.path_remote,
)
for j in todo_jobs
]
return tuple(sorted(done_jobs + ok_jobs, key=lambda x: x.path_local))


def download_folder(job: TransferJob, counter: Value, t: tqdm.tqdm):
"""Perform one piece of work and update the global counter."""

irsync_argv = ["irsync", "-r", "-a", "-K", "i:%s" % job.path_src, job.path_dest]
irsync_argv = ["irsync", "-r", "-a", "-K", "i:%s" % job.path_remote, job.path_local]
logger.debug("Transferring file: %s", " ".join(irsync_argv))
try:
check_output(irsync_argv)
Expand Down
30 changes: 14 additions & 16 deletions tests/test_sodar_ingest_fastq.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def test_run_sodar_ingest_fastq_smoke_test(mocker, requests_mock):
# --- setup arguments
irods_path = "/irods/dest"
landing_zone_uuid = "466ab946-ce6a-4c78-9981-19b79e7bbe86"
dest_path = "target/folder/generic_file.fq.gz"
dest_path = "target/folder/"
fake_base_path = "/base/path"
argv = [
"--verbose",
Expand Down Expand Up @@ -179,16 +179,18 @@ def test_run_sodar_ingest_fastq_smoke_test(mocker, requests_mock):
)

mock_check_output = mock.MagicMock(return_value=0)
mocker.patch("cubi_tk.snappy.itransfer_common.check_output", mock_check_output)
mocker.patch("cubi_tk.irods_common.iRODSTransfer.put", mock_check_output)

mock_check_call = mock.MagicMock(return_value=0)
mocker.patch("cubi_tk.snappy.itransfer_common.check_call", mock_check_call)
mocker.patch("cubi_tk.sodar.ingest_fastq.check_call", mock_check_call)

mocker.patch("cubi_tk.sodar.ingest_fastq.pathlib", fake_pl)
mocker.patch("cubi_tk.sodar.ingest_fastq.os", fake_os)

fake_open = fake_filesystem.FakeFileOpen(fs)
mocker.patch("cubi_tk.snappy.itransfer_common.open", fake_open)
mocker.patch("cubi_tk.sodar.ingest_fastq.open", fake_open)

# necessary because independent test fail
mock_value = mock.MagicMock()
Expand Down Expand Up @@ -218,20 +220,16 @@ def test_run_sodar_ingest_fastq_smoke_test(mocker, requests_mock):

assert not res

# TODO: make mock check_output actually create the file?
# assert fs.exists(fake_file_paths[3])

assert mock_check_call.call_count == 1
assert mock_check_call.call_args[0] == (["md5sum", "sample1-N1-DNA1-WES1.fq.gz"],)

assert mock_check_output.call_count == len(fake_file_paths) * 3
remote_path = os.path.join(irods_path, dest_path)
for path in fake_file_paths:
expected_mkdir_argv = ["imkdir", "-p", os.path.dirname(remote_path)]
ext = ".md5" if path.split(".")[-1] == "md5" else ""
expected_irsync_argv = ["irsync", "-a", "-K", path, ("i:%s" + ext) % remote_path]
expected_ils_argv = ["ils", os.path.dirname(remote_path)]

assert ((expected_mkdir_argv,),) in mock_check_output.call_args_list
assert ((expected_irsync_argv,),) in mock_check_output.call_args_list
assert ((expected_ils_argv,), {"stderr": -2}) in mock_check_output.call_args_list
# The upload logic for multiple transfers/files has been moved into the iRODScommon classes
# We just need one call to that here
assert mock_check_output.call_count == 1

# Test that the TransferJob contain all files (setting the remote_dest with this mock setup does not work)
parser, _subparsers = setup_argparse()
args = parser.parse_args(argv)
ingestfastq = SodarIngestFastq(args)
lz, actual = ingestfastq.build_jobs()
assert len(actual) == len(fake_file_paths)

0 comments on commit cd9a3b9

Please sign in to comment.