diff --git a/cubi_tk/sodar/__init__.py b/cubi_tk/sodar/__init__.py index 95321569..88575ad0 100644 --- a/cubi_tk/sodar/__init__.py +++ b/cubi_tk/sodar/__init__.py @@ -37,6 +37,9 @@ Upload external files to SODAR (defaults for fastq files). +``ingest`` + Upload arbitrary files to SODAR + ``check-remote`` Check if or which local files with md5 sums are already deposited in iRODs/Sodar @@ -53,6 +56,7 @@ from .add_ped import setup_argparse as setup_argparse_add_ped from .check_remote import setup_argparse as setup_argparse_check_remote from .download_sheet import setup_argparse as setup_argparse_download_sheet +from .ingest import setup_argparse as setup_argparse_ingest from .ingest_fastq import setup_argparse as setup_argparse_ingest_fastq from .lz_create import setup_argparse as setup_argparse_lz_create from .lz_list import setup_argparse as setup_argparse_lz_list @@ -87,6 +91,7 @@ def setup_argparse(parser: argparse.ArgumentParser) -> None: "ingest-fastq", help="Upload external files to SODAR (defaults for fastq)" ) ) + setup_argparse_ingest(subparsers.add_parser("ingest", help="Upload arbitrary files to SODAR")) setup_argparse_check_remote( subparsers.add_parser( "check-remote", help="Compare local files with md5 sum against SODAR/iRODS" diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py new file mode 100644 index 00000000..b18d08be --- /dev/null +++ b/cubi_tk/sodar/ingest.py @@ -0,0 +1,297 @@ +"""``cubi-tk sodar ingest``: upload arbitrary files and folders into a specific SODAR landing zone collection""" + +import argparse +import os +from pathlib import Path +import sys +import typing + +import attrs +import logzero +from logzero import logger +from sodar_cli import api + +from cubi_tk.irods_common import TransferJob, iRODSCommon, iRODSTransfer + +from ..common import compute_md5_checksum, is_uuid, load_toml_config, sizeof_fmt + +# for testing +logger.propagate = True + +# no-frills logger +formatter = logzero.LogFormatter(fmt="%(message)s") +output_logger = logzero.setup_logger(formatter=formatter) + + +@attrs.frozen(auto_attribs=True) +class Config: + """Configuration for the ingest command.""" + + config: str = attrs.field(default=None) + sodar_server_url: str = attrs.field(default=None) + sodar_api_token: str = attrs.field(default=None, repr=lambda value: "***") # type: ignore + + +class SodarIngest: + """Implementation of sodar ingest command.""" + + def __init__(self, args): + # Command line arguments. + self.args = args + + # Path to iRODS environment file + self.irods_env_path = Path(Path.home(), ".irods", "irods_environment.json") + if not self.irods_env_path.exists(): + logger.error("iRODS environment file is missing.") + sys.exit(1) + + # Get SODAR API info + toml_config = load_toml_config(Config()) + if toml_config: # pragma: no cover + config_url = toml_config.get("global", {}).get("sodar_server_url") + if self.args.sodar_url == "https://sodar.bihealth.org/" and config_url: + self.args.sodar_url = config_url + if not self.args.sodar_api_token: + self.args.sodar_api_token = toml_config.get("global", {}).get("sodar_api_token") + if not self.args.sodar_api_token: + logger.error("SODAR API token missing.") + sys.exit(1) + + @classmethod + def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: + parser.add_argument( + "--hidden-cmd", dest="sodar_cmd", default=cls.run, help=argparse.SUPPRESS + ) + group_sodar = parser.add_argument_group("SODAR-related") + group_sodar.add_argument( + "--sodar-url", + default=os.environ.get("SODAR_URL", "https://sodar.bihealth.org/"), + help="URL to SODAR, defaults to SODAR_URL environment variable or fallback to https://sodar.bihealth.org/", + ) + group_sodar.add_argument( + "--sodar-api-token", + default=os.environ.get("SODAR_API_TOKEN", None), + help="SODAR API token. Defaults to SODAR_API_TOKEN environment variable.", + ) + parser.add_argument( + "-r", + "--recursive", + default=False, + action="store_true", + help="Recursively match files in subdirectories. Creates iRODS sub-collections to match directory structure.", + ) + parser.add_argument( + "-s", + "--sync", + default=False, + action="store_true", + help="Skip upload of files already present in remote collection.", + ) + parser.add_argument( + "-e", + "--exclude", + nargs="+", + default="", + type=str, + help="Exclude files by defining one or multiple glob-style patterns.", + ) + parser.add_argument( + "-K", + "--remote-checksums", + default=False, + action="store_true", + help="Trigger checksum computation on the iRODS side.", + ) + parser.add_argument( + "-y", + "--yes", + default=False, + action="store_true", + help="Don't ask for permission. Does not skip manual target collection selection.", + ) + parser.add_argument( + "--collection", + type=str, + help="Target iRODS collection. Skips manual target collection selection.", + ) + parser.add_argument( + "sources", help="One or multiple files/directories to ingest.", nargs="+" + ) + parser.add_argument("destination", help="UUID or iRODS path of SODAR landing zone.") + + @classmethod + def run( + cls, args, _parser: argparse.ArgumentParser, _subparser: argparse.ArgumentParser + ) -> typing.Optional[int]: + """Entry point into the command.""" + return cls(args).execute() + + def execute(self): + """Execute ingest.""" + # Retrieve iRODS path if destination is UUID + if is_uuid(self.args.destination): + try: + lz_info = api.landingzone.retrieve( + sodar_url=self.args.sodar_url, + sodar_api_token=self.args.sodar_api_token, + landingzone_uuid=self.args.destination, + ) + except Exception as e: # pragma: no cover + logger.error("Failed to retrieve landing zone information.") + logger.exception(e) + sys.exit(1) + + # TODO: Replace with status_locked check once implemented in sodar_cli + if lz_info.status in ["ACTIVE", "FAILED"]: + self.lz_irods_path = lz_info.irods_path + logger.info(f"Target iRods path: {self.lz_irods_path}") + else: + logger.error("Target landing zone is not ACTIVE.") + sys.exit(1) + else: + self.lz_irods_path = self.args.destination # pragma: no cover + + # Build file list + source_paths = self.build_file_list() + if len(source_paths) == 0: + logger.info("Nothing to do. Quitting.") + sys.exit(0) + + # Initiate iRODS session + irods_session = iRODSCommon().session + + # Query target collection + logger.info("Querying landing zone collections…") + collections = [] + try: + with irods_session as i: + coll = i.collections.get(self.lz_irods_path) + for c in coll.subcollections: + collections.append(c.name) + except Exception as e: # pragma: no cover + logger.error( + f"Failed to query landing zone collections: {iRODSCommon().get_irods_error(e)}" + ) + sys.exit(1) + + # Query user for target sub-collection + if not collections: + self.target_coll = self.lz_irods_path + logger.info("No subcollections found. Moving on.") + elif self.args.collection is None: + user_input = "" + input_valid = False + input_message = "####################\nPlease choose target collection:\n" + for index, item in enumerate(collections): + input_message += f"{index+1}) {item}\n" + input_message += "Select by number: " + + while not input_valid: + user_input = input(input_message) + if user_input.isdigit(): + user_input = int(user_input) + if 0 < user_input <= len(collections): + input_valid = True + + self.target_coll = f"{self.lz_irods_path}/{collections[user_input - 1]}" + + elif self.args.collection in collections: + self.target_coll = f"{self.lz_irods_path}/{self.args.collection}" + else: # pragma: no cover + logger.error("Selected target collection does not exist in landing zone.") + sys.exit(1) + + # Build transfer jobs and add missing md5 files + jobs = self.build_jobs(source_paths) + jobs = sorted(jobs, key=lambda x: x.path_local) + + # Final go from user & transfer + itransfer = iRODSTransfer(jobs, ask=not self.args.yes) + logger.info("Planning to transfer the following files:") + for job in jobs: + output_logger.info(job.path_local) + logger.info(f"With a total size of {sizeof_fmt(itransfer.size)}") + logger.info("Into this iRODS collection:") + output_logger.info(f"{self.target_coll}/") + + if not self.args.yes: + if not input("Is this OK? [y/N] ").lower().startswith("y"): # pragma: no cover + logger.info("Aborting at your request.") + sys.exit(0) + + itransfer.put(recursive=self.args.recursive, sync=self.args.sync) + logger.info("File transfer complete.") + + # Compute server-side checksums + if self.args.remote_checksums: # pragma: no cover + logger.info("Computing server-side checksums.") + itransfer.chksum() + + def build_file_list(self) -> typing.List[typing.Dict[Path, Path]]: + """ + Build list of source files to transfer. + iRODS paths are relative to target collection. + """ + + source_paths = [Path(src) for src in self.args.sources] + output_paths = list() + + for src in source_paths: + try: + abspath = src.resolve(strict=True) + except FileNotFoundError: + logger.warning(f"File not found: {src.name}") + continue + except RuntimeError: + logger.warning(f"Symlink loop: {src.name}") + continue + + excludes = self.args.exclude + if src.is_dir(): + paths = abspath.glob("**/*" if self.args.recursive else "*") + for p in paths: + if excludes and any([p.match(e) for e in excludes]): + continue + if p.is_file() and not p.suffix.lower() == ".md5": + output_paths.append({"spath": p, "ipath": p.relative_to(abspath)}) + else: + if not any([src.match(e) for e in excludes if e]): + output_paths.append({"spath": src, "ipath": Path(src.name)}) + return output_paths + + def build_jobs(self, source_paths: typing.Iterable[Path]) -> typing.Tuple[TransferJob]: + """Build file transfer jobs.""" + + transfer_jobs = [] + + for p in source_paths: + path_remote = f"{self.target_coll}/{str(p['ipath'])}" + md5_path = p["spath"].parent / (p["spath"].name + ".md5") + + if md5_path.exists(): + logger.info(f"Found md5 hash on disk for {p['spath']}") + else: + md5sum = compute_md5_checksum(p["spath"]) + with md5_path.open("w", encoding="utf-8") as f: + f.write(f"{md5sum} {p['spath'].name}") + + transfer_jobs.append( + TransferJob( + path_local=str(p["spath"]), + path_remote=path_remote, + ) + ) + + transfer_jobs.append( + TransferJob( + path_local=str(md5_path), + path_remote=path_remote + ".md5", + ) + ) + + return tuple(transfer_jobs) + + +def setup_argparse(parser: argparse.ArgumentParser) -> None: + """Setup argument parser for ``cubi-tk sodar ingest``.""" + return SodarIngest.setup_argparse(parser) diff --git a/docs_manual/index.rst b/docs_manual/index.rst index 18a80015..c5bfe1a9 100644 --- a/docs_manual/index.rst +++ b/docs_manual/index.rst @@ -14,8 +14,9 @@ Manual | :ref:`Creating ISA-tab files ` | :ref:`Annotating ISA-tab files ` - | :ref:`Upload raw data to SODAR ` - | :ref:`Upload raw data to SODAR ` + | :ref:`Upload data to SODAR ` + | :ref:`Upload fastq files to SODAR ` + | :ref:`Upload results of the Seasnap pipeline to SODAR ` | :ref:`Create a sample info file for Sea-snap ` | :ref:`Tools for archiving old projects ` @@ -51,6 +52,7 @@ Project Info man_isa_tpl man_isa_tab + man_sodar_ingest man_ingest_fastq man_itransfer_results man_write_sample_info diff --git a/docs_manual/man_ingest_fastq.rst b/docs_manual/man_ingest_fastq.rst index 49d6ee1f..b64ce4d3 100644 --- a/docs_manual/man_ingest_fastq.rst +++ b/docs_manual/man_ingest_fastq.rst @@ -1,7 +1,7 @@ .. _man_ingest_fastq: =========================== -Manual for ``ingest-fastq`` +Manual for ``sodar ingest-fastq`` =========================== The ``cubi-tk sodar ingest-fastq`` command lets you upload raw data files to SODAR. diff --git a/docs_manual/man_sodar_ingest.rst b/docs_manual/man_sodar_ingest.rst new file mode 100644 index 00000000..4d0accb7 --- /dev/null +++ b/docs_manual/man_sodar_ingest.rst @@ -0,0 +1,75 @@ +.. _man_sodar_ingest: + +=========================== +Manual for ``sodar ingest`` +=========================== + +The ``cubi-tk sodar ingest`` command can be used to upload arbitrary data files to SODAR. +It facilitates transfer of one or multiple sources into one SODAR landing zone, while optionally recursively matching and preserving the sub-folder structure. + +---------------- +Basic usage +---------------- + +.. code-block:: bash + + $ cubi-tk sodar ingest [OPTION]... SOURCE [SOURCE ...] DESTINATION + +Where each ``SOURCE`` is a path to a folder containing files and ``DESTINATION`` is either a SODAR iRODS path or a *landing zone* UUID. + +For seamless usage `~/.irods/irods_environment.json `_ and :ref:`~/.cubitkrc.toml` should be present. +This command automatically handles your iRODS session and authentication (i.e. `iinit`). + +---------------- +Parameters +---------------- + +- ``-r, --recursive``: Recursively find files in subdirectories and create iRODS sub-collections to match directory structure. +- ``-e, --exclude``: Exclude files matching the given pattern. +- ``-s, --sync``: Skip upload of files that already exist in iRODS. +- ``-K, --remote-checksums``: Instruct iRODS to compute MD5 checksums of uploaded files for SODAR validation step. +- ``-y, --yes``: Don't stop for user permission. Enables scripting with this command. +- ``--collection``: Set target iRODS collection in landing zone. Skips user input for this selection. + +.. _sodar-auth: + +-------------------- +SODAR authentication +-------------------- + +To be able to access the SODAR API (which is only required, if you specify a landing zone UUID instead of an iRODS path), you also need an API token. +For token management in SODAR, the following docs can be used: + +- https://sodar.bihealth.org/manual/ui_user_menu.html +- https://sodar.bihealth.org/manual/ui_api_tokens.html + +There are three options how to supply the token. +Only one is needed. +The options are the following: + +1. configure ``~/.cubitkrc.toml``. + + .. code-block:: toml + + [global] + + sodar_server_url = "https://sodar.bihealth.org/" + sodar_api_token = "" + +2. pass via command line. + + .. code-block:: bash + + $ cubi-tk sodar ingest-fastq --sodar-url "https://sodar.bihealth.org/" --sodar-api-token "" + +3. set as environment variable. + + .. code-block:: bash + + $ SODAR_API_TOKEN="" + +---------------- +More Information +---------------- + +Also see ``cubi-tk sodar ingest`` :ref:`CLI documentation ` and ``cubi-tk sodar ingest --help`` for more information. diff --git a/requirements/base.txt b/requirements/base.txt index 1a12c65a..4627bb2f 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -66,4 +66,4 @@ simplejson sodar-cli ==0.1.0 # Python iRODS client -python-irodsclient==1.1.3 +python-irodsclient==1.1.8 diff --git a/tests/test_sodar_ingest.py b/tests/test_sodar_ingest.py new file mode 100644 index 00000000..d74c1eb8 --- /dev/null +++ b/tests/test_sodar_ingest.py @@ -0,0 +1,252 @@ +"""Tests for ``cubi_tk.sodar.ingest``.""" + +from argparse import ArgumentParser +from pathlib import Path +from unittest.mock import MagicMock, PropertyMock, call, patch + +import pytest + +from cubi_tk.__main__ import main, setup_argparse +from cubi_tk.sodar.ingest import SodarIngest + + +def test_run_sodar_ingest_help(capsys): + parser, _subparsers = setup_argparse() + with pytest.raises(SystemExit) as e: + parser.parse_args(["sodar", "ingest", "--help"]) + + assert e.value.code == 0 + + res = capsys.readouterr() + assert res.out + assert not res.err + + +def test_run_sodar_ingest_nothing(capsys): + parser, _subparsers = setup_argparse() + + with pytest.raises(SystemExit) as e: + parser.parse_args(["sodar", "ingest"]) + + assert e.value.code == 2 + + res = capsys.readouterr() + assert not res.out + assert res.err + + +@pytest.fixture +def ingest(fs): + fs.create_dir(Path.home().joinpath(".irods")) + fs.create_file(Path.home().joinpath(".irods", "irods_environment.json")) + + argv = [ + "--recursive", + "--sodar-url", + "sodar_url", + "--sodar-api-token", + "token", + "testdir", + "target", + ] + + parser = ArgumentParser() + SodarIngest.setup_argparse(parser) + args = parser.parse_args(argv) + + obj = SodarIngest(args) + obj.lz_irods_path = "/irodsZone" + obj.target_coll = "targetCollection" + return obj + + +def test_sodar_ingest_build_file_list(fs, caplog): + class DummyArgs(object): + pass + + fs.create_symlink("/not_existing", "/broken_link") + fs.create_symlink("/loop_src", "/loop_src2") + fs.create_symlink("/loop_src2", "/loop_src") + + args = DummyArgs() + args.sources = ["broken_link", "not_here", "loop_src", "testdir", "testdir", "file5", "file6"] + args.recursive = True + args.exclude = ["file4", "file5"] + dummy = MagicMock() + args_mock = PropertyMock(return_value=args) + type(dummy).args = args_mock + + fs.create_dir("/testdir/subdir") + fs.create_file("/testdir/file1") + fs.create_file("/testdir/file1.md5") + fs.create_file("/testdir/subdir/file2") + fs.create_file("/file3") + fs.create_file("/testdir/file4") + fs.create_file("/file5") + fs.create_file("/file6") + fs.create_symlink("/testdir/file3", "/file3") + + paths = SodarIngest.build_file_list(dummy) + + # Sources + assert "File not found: broken_link" in caplog.messages + assert "File not found: not_here" in caplog.messages + assert "Symlink loop: loop_src" in caplog.messages + + # Files + assert {"spath": Path("/testdir/file1"), "ipath": Path("file1")} in paths + assert { + "spath": Path("/testdir/file1.md5"), + "ipath": Path("file1.md5"), + } not in paths + assert { + "spath": Path("/testdir/subdir/file2"), + "ipath": Path("subdir/file2"), + } in paths + assert {"spath": Path("/testdir/file3"), "ipath": Path("file3")} in paths + + # Re-run without recursive search + args.recursive = False + paths = SodarIngest.build_file_list(dummy) + assert {"spath": Path("/testdir/file1"), "ipath": Path("file1")} in paths + assert { + "spath": Path("/testdir/file1.md5"), + "ipath": Path("file1.md5"), + } not in paths + assert { + "spath": Path("/testdir/subdir/file2"), + "ipath": Path("subdir/file2"), + } not in paths + assert {"spath": Path("/testdir/file3"), "ipath": Path("file3")} in paths + assert {"spath": Path("/testdir/file4"), "ipath": Path("file4")} not in paths + assert {"spath": Path("file5"), "ipath": Path("file5")} not in paths + assert {"spath": Path("file6"), "ipath": Path("file6")} in paths + + +@patch("cubi_tk.sodar.ingest.TransferJob") +def test_sodar_ingest_build_jobs(mockjob, ingest, fs): + paths = [ + {"spath": Path("myfile.csv"), "ipath": Path("dest_dir/myfile.csv")}, + {"spath": Path("folder/file.csv"), "ipath": Path("dest_dir/folder/file.csv")}, + ] + for path in paths: + fs.create_file(path["spath"]) + fs.create_file("myfile.csv.md5") + + ingest.build_jobs(paths) + + for p in paths: + mockjob.assert_any_call( + path_local=str(p["spath"]), + path_remote=f"{ingest.target_coll}/{str(p['ipath'])}", + ) + mockjob.assert_any_call( + path_local=str(p["spath"]) + ".md5", + path_remote=f"{ingest.target_coll}/{str(p['ipath']) + '.md5'}", + ) + + +@patch("cubi_tk.sodar.ingest.TransferJob") +@patch("cubi_tk.sodar.ingest.iRODSTransfer") +@patch("cubi_tk.sodar.ingest.iRODSCommon.session") +@patch("cubi_tk.sodar.ingest.api.landingzone.retrieve") +def test_sodar_ingest_smoketest(mockapi, mocksession, mocktransfer, mockjob, fs): + class DummyAPI(object): + pass + + class DummyColl(object): + pass + + fs.create_dir("/source/subdir") + fs.create_dir("/target/coll/") + fs.create_file("/source/file1") + fs.create_file("/source/subdir/file2") + lz_uuid = "f46b4fc3-0927-449d-b725-9ffed231507b" + argv = [ + "sodar", + "ingest", + "--sodar-url", + "sodar_url", + "--sodar-api-token", + "token", + "--collection", + "coll", + "--yes", + "--recursive", + "source", + lz_uuid, + ] + + # to make it sortable + mockjob.return_value.path_local = 1 + + # Test env file missing + with pytest.raises(SystemExit): + main(argv) + + fs.create_dir(Path.home().joinpath(".irods")) + fs.create_file(Path.home().joinpath(".irods", "irods_environment.json")) + + # Test args no api token + with pytest.raises(SystemExit): + argv2 = argv.copy() + argv2.remove("--sodar-api-token") + argv2.remove("token") + main(argv2) + + # Test cancel no invalid LZ + api_return = DummyAPI() + api_return.status = "DELETED" + api_return.irods_path = "target" + mockapi.return_value = api_return + + with pytest.raises(SystemExit): + main(argv) + mockapi.assert_called_with( + sodar_url="sodar_url", sodar_api_token="token", landingzone_uuid=lz_uuid + ) + + # Test cancel if no files to transfer + api_return.status = "ACTIVE" + with pytest.raises(SystemExit): + argv2 = argv.copy() + argv2[-2] = "empty" + main(argv2) + + # Test user input for subcollection + dcoll = DummyColl() + dcoll.subcollections = [] + mocki = MagicMock() # returned by the session context manager + mocksession.__enter__.return_value = mocki + mocki.collections.get.return_value = dcoll + mocktransfer.return_value.size = 1234 + argv2 = argv.copy() + argv2.remove("--collection") + argv2.remove("coll") + + with patch("builtins.input", side_effect=["a", "100", "1"]) as mockinput: + # Test for no subcollections + main(argv2) + mockinput.assert_not_called() + + # Test for 1 subcollection + dcoll.subcollections = [ + DummyColl(), + ] + dcoll.subcollections[0].name = "coll" + main(argv2) + assert mockinput.call_count == 3 + mockjob.assert_called() + + # Test upload logic + mockjob.reset_mock() + main(argv) + assert call.collections.get("target") in mocki.mock_calls + mockjob.assert_any_call(path_local="/source/file1", path_remote="target/coll/file1") + mockjob.assert_any_call(path_local="/source/file1.md5", path_remote="target/coll/file1.md5") + mockjob.assert_any_call( + path_local="/source/subdir/file2", path_remote="target/coll/subdir/file2" + ) + mockjob.assert_any_call( + path_local="/source/subdir/file2.md5", path_remote="target/coll/subdir/file2.md5" + )