From cd9a3b9d6ccfa49ccf44e10152b17df30009b2a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolai=20von=20K=C3=BCgelgen?= Date: Thu, 25 Jan 2024 11:44:19 +0100 Subject: [PATCH] feat: Switching `cubi-tk sodar ingest-fastq` from icommands to irods_common (#217) --- cubi_tk/sodar/ingest_fastq.py | 158 ++++++++++++++++++++++--------- tests/test_sodar_ingest_fastq.py | 30 +++--- 2 files changed, 126 insertions(+), 62 deletions(-) diff --git a/cubi_tk/sodar/ingest_fastq.py b/cubi_tk/sodar/ingest_fastq.py index 2d1cc761..6f213f2a 100644 --- a/cubi_tk/sodar/ingest_fastq.py +++ b/cubi_tk/sodar/ingest_fastq.py @@ -9,21 +9,26 @@ import os import pathlib import re -from subprocess import SubprocessError, check_output +from subprocess import SubprocessError, check_call, check_output import sys import typing +import logzero from logzero import logger from sodar_cli import api import tqdm -from ..common import check_irods_icommands, load_toml_config, sizeof_fmt +from ..common import load_toml_config, sizeof_fmt from ..exceptions import MissingFileException, ParameterException, UserCanceledException -from ..snappy.itransfer_common import ( - SnappyItransferCommandBase, - TransferJob, - irsync_transfer, -) +from ..irods_common import TransferJob, iRODSTransfer +from ..snappy.itransfer_common import SnappyItransferCommandBase + +# for testing +logger.propagate = True + +# no-frills logger +formatter = logzero.LogFormatter(fmt="%(message)s") +output_logger = logzero.setup_logger(formatter=formatter) DEFAULT_SRC_REGEX = ( r"(.*/)?(?P.+?)" @@ -41,6 +46,34 @@ DEFAULT_NUM_TRANSFERS = 8 +def compute_md5sum(job: TransferJob, counter: Value, t: tqdm.tqdm) -> None: + """Compute MD5 sum with ``md5sum`` command.""" + dirname = os.path.dirname(job.path_local) + filename = os.path.basename(job.path_local)[: -len(".md5")] + path_md5 = job.path_local + + md5sum_argv = ["md5sum", filename] + logger.debug("Computing MD5sum %s > %s", " ".join(md5sum_argv), filename + ".md5") + try: + with open(path_md5, "wt") as md5f: + check_call(md5sum_argv, cwd=dirname, stdout=md5f) + except SubprocessError as e: # pragma: nocover + logger.error("Problem executing md5sum: %s", e) + logger.info("Removing file after error: %s", path_md5) + try: + os.remove(path_md5) + except OSError as e_rm: # pragma: nocover + logger.error("Could not remove file: %s", e_rm) + raise e + + with counter.get_lock(): + counter.value = os.path.getsize(job.path_local[: -len(".md5")]) + try: + t.update(counter.value) + except TypeError: + pass # swallow, pyfakefs and multiprocessing don't lik each other + + class SodarIngestFastq(SnappyItransferCommandBase): """Implementation of sodar ingest-fastq command.""" @@ -75,6 +108,14 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: default=DEFAULT_NUM_TRANSFERS, help="Number of parallel transfers, defaults to %s" % DEFAULT_NUM_TRANSFERS, ) + parser.add_argument( + "-s", + "--sync", + default=False, + action="store_true", + help="Skip upload of files already present in remote collection.", + ) + parser.add_argument( "--yes", default=False, @@ -154,9 +195,10 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: def check_args(self, args): """Called for checking arguments, override to change behaviour.""" - # Check presence of icommands when not testing. - if "pytest" not in sys.modules: # pragma: nocover - check_irods_icommands(warn_only=False) + # DEPRECATING + # # Check presence of icommands when not testing. + # if "pytest" not in sys.modules: # pragma: nocover + # check_irods_icommands(warn_only=False) res = 0 toml_config = load_toml_config(args) @@ -326,9 +368,7 @@ def download_webdav(self, sources): folders = [] for src in sources: if re.match("davs://", src): - download_jobs.append( - TransferJob(path_src="i:" + src, path_dest=self.args.tmp, bytes=1) - ) + download_jobs.append(TransferJob(path_remote=src, path_local=self.args.tmp)) tmp_folder = f"tmp_folder_{len(download_jobs)}" pathlib.Path(tmp_folder).mkdir(parents=True, exist_ok=True) else: @@ -337,7 +377,7 @@ def download_webdav(self, sources): if download_jobs: logger.info("Planning to download folders...") for job in download_jobs: - logger.info(" %s => %s", job.path_src, job.path_dest) + logger.info(" %s => %s", job.path_remote, job.path_local) if not self.args.yes and not input("Is this OK? [yN] ").lower().startswith("y"): logger.error("OK, breaking at your request") return [] @@ -435,19 +475,14 @@ def build_jobs(self, library_names=None): # remote_file = re.sub(m_pat, r_pat, remote_file) for ext in ("", ".md5"): - try: - size = os.path.getsize(real_path + ext) - except OSError: # pragma: nocover - size = 0 transfer_jobs.append( TransferJob( - path_src=real_path + ext, - path_dest=str(remote_file) + ext, - bytes=size, + path_local=real_path + ext, + path_remote=str(remote_file) + ext, ) ) - return lz_irods_path, tuple(sorted(transfer_jobs)) + return lz_irods_path, tuple(sorted(transfer_jobs, key=lambda x: x.path_local)) def execute(self) -> typing.Optional[int]: """Execute the transfer.""" @@ -459,36 +494,25 @@ def execute(self) -> typing.Optional[int]: logger.info(" args: %s", self.args) lz_uuid, transfer_jobs = self.build_jobs() - logger.debug("Transfer jobs:\n%s", "\n".join(map(lambda x: x.to_oneline(), transfer_jobs))) + transfer_jobs = sorted(transfer_jobs, key=lambda x: x.path_local) if self.fix_md5_files: transfer_jobs = self._execute_md5_files_fix(transfer_jobs) - logger.info("Planning to transfer the files as follows...") + # Final go from user & transfer + itransfer = iRODSTransfer(transfer_jobs, ask=not self.args.yes) + logger.info("Planning to transfer the following files:") for job in transfer_jobs: - logger.info(" %s => %s", job.path_src, job.path_dest) - if not self.args.yes and not input("Is this OK? [yN] ").lower().startswith("y"): - logger.error("OK, breaking at your request") - return 1 + output_logger.info(job.path_local) + logger.info(f"With a total size of {sizeof_fmt(itransfer.size)}") - total_bytes = sum([job.bytes for job in transfer_jobs]) - logger.info( - "Transferring %d files with a total size of %s", - len(transfer_jobs), - sizeof_fmt(total_bytes), - ) + if not self.args.yes: + if not input("Is this OK? [y/N] ").lower().startswith("y"): # pragma: no cover + logger.info("Aborting at your request.") + sys.exit(0) - counter = Value(c_ulonglong, 0) - with tqdm.tqdm(total=total_bytes, unit="B", unit_scale=True) as t: - if self.args.num_parallel_transfers == 0: # pragma: nocover - for job in transfer_jobs: - irsync_transfer(job, counter, t) - else: - pool = ThreadPool(processes=self.args.num_parallel_transfers) - for job in transfer_jobs: - pool.apply_async(irsync_transfer, args=(job, counter, t)) - pool.close() - pool.join() + itransfer.put(recursive=True, sync=self.args.sync) + logger.info("File transfer complete.") # Validate and move transferred files # Behaviour: If flag is True and lz uuid is not None*, @@ -502,11 +526,53 @@ def execute(self) -> typing.Optional[int]: logger.info("All done") return None + def _execute_md5_files_fix( + self, transfer_jobs: typing.Tuple[TransferJob, ...] + ) -> typing.Tuple[TransferJob, ...]: + """Create missing MD5 files.""" + ok_jobs = [] + todo_jobs = [] + for job in transfer_jobs: + if not os.path.exists(job.path_local): + todo_jobs.append(job) + else: + ok_jobs.append(job) + + total_bytes = sum([os.path.getsize(j.path_local[: -len(".md5")]) for j in todo_jobs]) + logger.info( + "Computing MD5 sums for %s files of %s with up to %d processes", + len(todo_jobs), + sizeof_fmt(total_bytes), + self.args.num_parallel_transfers, + ) + logger.info("Missing MD5 files:\n%s", "\n".join(map(lambda j: j.path_local, todo_jobs))) + counter = Value(c_ulonglong, 0) + with tqdm.tqdm(total=total_bytes, unit="B", unit_scale=True) as t: + if self.args.num_parallel_transfers == 0: # pragma: nocover + for job in todo_jobs: + compute_md5sum(job, counter, t) + else: + pool = ThreadPool(processes=self.args.num_parallel_transfers) + for job in todo_jobs: + pool.apply_async(compute_md5sum, args=(job, counter, t)) + pool.close() + pool.join() + + # Finally, determine file sizes after done. + done_jobs = [ + TransferJob( + path_local=j.path_local, + path_remote=j.path_remote, + ) + for j in todo_jobs + ] + return tuple(sorted(done_jobs + ok_jobs, key=lambda x: x.path_local)) + def download_folder(job: TransferJob, counter: Value, t: tqdm.tqdm): """Perform one piece of work and update the global counter.""" - irsync_argv = ["irsync", "-r", "-a", "-K", "i:%s" % job.path_src, job.path_dest] + irsync_argv = ["irsync", "-r", "-a", "-K", "i:%s" % job.path_remote, job.path_local] logger.debug("Transferring file: %s", " ".join(irsync_argv)) try: check_output(irsync_argv) diff --git a/tests/test_sodar_ingest_fastq.py b/tests/test_sodar_ingest_fastq.py index 608d1e8c..9ba0c8bd 100644 --- a/tests/test_sodar_ingest_fastq.py +++ b/tests/test_sodar_ingest_fastq.py @@ -128,7 +128,7 @@ def test_run_sodar_ingest_fastq_smoke_test(mocker, requests_mock): # --- setup arguments irods_path = "/irods/dest" landing_zone_uuid = "466ab946-ce6a-4c78-9981-19b79e7bbe86" - dest_path = "target/folder/generic_file.fq.gz" + dest_path = "target/folder/" fake_base_path = "/base/path" argv = [ "--verbose", @@ -179,16 +179,18 @@ def test_run_sodar_ingest_fastq_smoke_test(mocker, requests_mock): ) mock_check_output = mock.MagicMock(return_value=0) - mocker.patch("cubi_tk.snappy.itransfer_common.check_output", mock_check_output) + mocker.patch("cubi_tk.irods_common.iRODSTransfer.put", mock_check_output) mock_check_call = mock.MagicMock(return_value=0) mocker.patch("cubi_tk.snappy.itransfer_common.check_call", mock_check_call) + mocker.patch("cubi_tk.sodar.ingest_fastq.check_call", mock_check_call) mocker.patch("cubi_tk.sodar.ingest_fastq.pathlib", fake_pl) mocker.patch("cubi_tk.sodar.ingest_fastq.os", fake_os) fake_open = fake_filesystem.FakeFileOpen(fs) mocker.patch("cubi_tk.snappy.itransfer_common.open", fake_open) + mocker.patch("cubi_tk.sodar.ingest_fastq.open", fake_open) # necessary because independent test fail mock_value = mock.MagicMock() @@ -218,20 +220,16 @@ def test_run_sodar_ingest_fastq_smoke_test(mocker, requests_mock): assert not res - # TODO: make mock check_output actually create the file? - # assert fs.exists(fake_file_paths[3]) - assert mock_check_call.call_count == 1 assert mock_check_call.call_args[0] == (["md5sum", "sample1-N1-DNA1-WES1.fq.gz"],) - assert mock_check_output.call_count == len(fake_file_paths) * 3 - remote_path = os.path.join(irods_path, dest_path) - for path in fake_file_paths: - expected_mkdir_argv = ["imkdir", "-p", os.path.dirname(remote_path)] - ext = ".md5" if path.split(".")[-1] == "md5" else "" - expected_irsync_argv = ["irsync", "-a", "-K", path, ("i:%s" + ext) % remote_path] - expected_ils_argv = ["ils", os.path.dirname(remote_path)] - - assert ((expected_mkdir_argv,),) in mock_check_output.call_args_list - assert ((expected_irsync_argv,),) in mock_check_output.call_args_list - assert ((expected_ils_argv,), {"stderr": -2}) in mock_check_output.call_args_list + # The upload logic for multiple transfers/files has been moved into the iRODScommon classes + # We just need one call to that here + assert mock_check_output.call_count == 1 + + # Test that the TransferJob contain all files (setting the remote_dest with this mock setup does not work) + parser, _subparsers = setup_argparse() + args = parser.parse_args(argv) + ingestfastq = SodarIngestFastq(args) + lz, actual = ingestfastq.build_jobs() + assert len(actual) == len(fake_file_paths)