Skip to content

Commit

Permalink
feat: irods download refactoring and new generic sodar downloader wit…
Browse files Browse the repository at this point in the history
…h preset for dragen data (#226 ) (#227)
  • Loading branch information
Nicolai-vKuegelgen authored Apr 8, 2024
1 parent 0f7f5b2 commit 3fc38af
Show file tree
Hide file tree
Showing 12 changed files with 794 additions and 219 deletions.
105 changes: 20 additions & 85 deletions cubi_tk/irods/check.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,46 @@
"""``cubi-tk irods check``: Check target iRODS collection (all md5 files? metadata md5 consistent? enough replicas?)."""

import argparse
from contextlib import contextmanager
import json
from multiprocessing.pool import ThreadPool
import os
import re
import typing

from irods.collection import iRODSCollection
from irods.column import Like
from irods.data_object import iRODSDataObject
from irods.models import Collection as CollectionModel
from irods.models import DataObject as DataObjectModel
from irods.session import iRODSSession
from logzero import logger
import tqdm

from ..irods_common import DEFAULT_HASH_SCHEME, HASH_SCHEMES, iRODSRetrieveCollection

MIN_NUM_REPLICAS = 2
NUM_PARALLEL_TESTS = 4
NUM_DISPLAY_FILES = 20
HASH_SCHEMES = {
"MD5": {"regex": re.compile(r"[0-9a-fA-F]{32}")},
"SHA256": {"regex": re.compile(r"[0-9a-fA-F]{64}")},
}
DEFAULT_HASH_SCHEME = "MD5"


class IrodsCheckCommand:
class IrodsCheckCommand(iRODSRetrieveCollection):
"""Implementation of iRDOS check command."""

command_name = "check"

def __init__(self, args):
#: Command line arguments.
self.args = args
def __init__(self, args, hash_scheme=DEFAULT_HASH_SCHEME, ask=False, irods_env_path=None):
"""Constructor.
#: Path to iRODS environment file
self.irods_env_path = os.path.join(
os.path.expanduser("~"), ".irods", "irods_environment.json"
)
:param args: argparse object with command line arguments.
:type args: argparse.Namespace
#: iRODS environment
self.irods_env = None
:param hash_scheme: iRODS hash scheme, default MD5.
:type hash_scheme: str, optional
def _init_irods(self):
"""Connect to iRODS."""
try:
return iRODSSession(irods_env_file=self.irods_env_path)
except Exception as e:
logger.error("iRODS connection failed: %s", self.get_irods_error(e))
logger.error("Are you logged in? try 'iinit'")
raise
:param ask: Confirm with user before certain actions.
:type ask: bool, optional
@contextmanager
def _get_irods_sessions(self, count=NUM_PARALLEL_TESTS):
if count < 1:
count = 1
irods_sessions = [self._init_irods() for _ in range(count)]
try:
yield irods_sessions
finally:
for irods in irods_sessions:
irods.cleanup()
:param irods_env_path: Path to irods_environment.json
:type irods_env_path: pathlib.Path, optional
"""
super.__init__(hash_scheme, ask, irods_env_path)
#: Command line arguments.
self.args = args

@classmethod
def setup_argparse(cls, parser: argparse.ArgumentParser) -> None:
Expand Down Expand Up @@ -100,40 +78,6 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None:
)
parser.add_argument("irods_path", help="Path to an iRODS collection.")

@classmethod
def get_irods_error(cls, e: Exception):
"""Return logger friendly iRODS exception."""
es = str(e)
return es if es != "None" else e.__class__.__name__

def get_data_objs(
self, root_coll: iRODSCollection
) -> typing.Dict[
str, typing.Union[typing.Dict[str, iRODSDataObject], typing.List[iRODSDataObject]]
]:
"""Get data objects recursively under the given iRODS path."""
data_objs = dict(files=[], checksums={})
ignore_schemes = [k.lower() for k in HASH_SCHEMES if k != self.args.hash_scheme.upper()]
irods_sess = root_coll.manager.sess

query = irods_sess.query(DataObjectModel, CollectionModel).filter(
Like(CollectionModel.name, f"{root_coll.path}%")
)

for res in query:
# If the 'res' dict is not split into Colllection&Object the resulting iRODSDataObject is not fully functional, likely because a name/path/... attribute is overwritten somewhere
coll_res = {k: v for k, v in res.items() if k.icat_id >= 500}
obj_res = {k: v for k, v in res.items() if k.icat_id < 500}
coll = iRODSCollection(root_coll.manager, coll_res)
obj = iRODSDataObject(irods_sess.data_objects, parent=coll, results=[obj_res])

if obj.path.endswith("." + self.args.hash_scheme.lower()):
data_objs["checksums"][obj.path] = obj
elif obj.path.split(".")[-1] not in ignore_schemes:
data_objs["files"].append(obj)

return data_objs

def check_args(self, _args):
# Check hash scheme
if _args.hash_scheme.upper() not in HASH_SCHEMES:
Expand Down Expand Up @@ -170,18 +114,9 @@ def execute(self):
logger.info("iRODS environment: %s", irods_env)

# Connect to iRODS
with self._get_irods_sessions(self.args.num_parallel_tests) as irods_sessions:
try:
root_coll = irods_sessions[0].collections.get(self.args.irods_path)
logger.info(
"{} iRODS connection{} initialized".format(
len(irods_sessions), "s" if len(irods_sessions) != 1 else ""
)
)
except Exception as e:
logger.error("Failed to retrieve iRODS path: %s", self.get_irods_error(e))
raise

with self.session as irods_session:
root_coll = irods_session.collections.get(self.args.irods_path)
logger.info("1 iRODS connection initialized")
# Get files and run checks
logger.info("Querying for data objects")
data_objs = self.get_data_objs(root_coll)
Expand Down
128 changes: 125 additions & 3 deletions cubi_tk/irods_common.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
from collections import defaultdict
import getpass
import os.path
from pathlib import Path
from typing import Iterable
import re
from typing import Iterable, Union

import attrs
from irods.collection import iRODSCollection
from irods.column import Like
from irods.data_object import iRODSDataObject
from irods.exception import (
CAT_INVALID_AUTHENTICATION,
CAT_INVALID_USER,
CAT_PASSWORD_EXPIRED,
PAM_AUTH_PASSWORD_FAILED,
)
from irods.keywords import FORCE_FLAG_KW
from irods.models import Collection as CollectionModel
from irods.models import DataObject as DataObjectModel
from irods.password_obfuscation import encode
from irods.session import NonAnonymousLoginWithoutPassword, iRODSSession
import logzero
Expand All @@ -20,6 +28,13 @@
formatter = logzero.LogFormatter(fmt="%(message)s")
output_logger = logzero.setup_logger(formatter=formatter)

#: Default hash scheme. Although iRODS provides alternatives, the whole of `snappy` pipeline uses MD5.
HASH_SCHEMES = {
"MD5": {"regex": re.compile(r"[0-9a-fA-F]{32}")},
"SHA256": {"regex": re.compile(r"[0-9a-fA-F]{64}")},
}
DEFAULT_HASH_SCHEME = "MD5"


@attrs.frozen(auto_attribs=True)
class TransferJob:
Expand Down Expand Up @@ -219,14 +234,18 @@ def chksum(self):
logger.error("Problem during iRODS checksumming.")
logger.error(self.get_irods_error(e))

def get(self):
def get(self, force_overwrite: bool = False):
"""Download files from SODAR."""
with self.session as session:
self.__jobs = [
attrs.evolve(job, bytes=session.data_objects.get(job.path_remote).size)
for job in self.__jobs
]
self.__total_bytes = sum([job.bytes for job in self.__jobs])

kw_options = {}
if force_overwrite:
kw_options = {FORCE_FLAG_KW: None} # Keyword has no value, just needs to be present
# Double tqdm for currently transferred file info
with (
tqdm(
Expand All @@ -242,13 +261,116 @@ def get(self):
file_log.set_description_str(
f"File [{n + 1}/{len(self.__jobs)}]: {Path(job.path_local).name}"
)
if os.path.exists(job.path_local) and not force_overwrite: # pragma: no cover
logger.info(
f"{Path(job.path_local).name} already exists. Skipping, use force_overwrite to re-download."
)
continue
try:
Path(job.path_local).parent.mkdir(parents=True, exist_ok=True)
with self.session as session:
session.data_objects.get(job.path_remote, job.path_local)
session.data_objects.get(job.path_remote, job.path_local, **kw_options)
t.update(job.bytes)
except FileNotFoundError: # pragma: no cover
raise
except Exception as e: # pragma: no cover
logger.error(f"Problem during transfer of {job.path_remote}")
logger.error(self.get_irods_error(e))
t.clear()


class iRODSRetrieveCollection(iRODSCommon):
"""Class retrieves iRODS Collection associated with Assay"""

def __init__(
self, hash_scheme: str = DEFAULT_HASH_SCHEME, ask: bool = False, irods_env_path: Path = None
):
"""Constructor.
:param hash_scheme: iRODS hash scheme, default MD5.
:type hash_scheme: str, optional
:param ask: Confirm with user before certain actions.
:type ask: bool, optional
:param irods_env_path: Path to irods_environment.json
:type irods_env_path: pathlib.Path, optional
"""
super().__init__(ask, irods_env_path)
self.hash_scheme = hash_scheme

def retrieve_irods_data_objects(self, irods_path: str) -> dict[str, list[iRODSDataObject]]:
"""Retrieve data objects from iRODS.
:param irods_path: iRODS path.
:return: Returns dictionary representation of iRODS collection information. Key: File name in iRODS (str);
Value: list of iRODSDataObject (native python-irodsclient object).
"""

# Connect to iRODS
with self.session as session:
try:
root_coll = session.collections.get(irods_path)

# Get files and run checks
logger.info("Querying for data objects")

if root_coll is not None:
irods_data_objs = self._irods_query(session, root_coll)
irods_obj_dict = self.parse_irods_collection(irods_data_objs)
return irods_obj_dict

except Exception as e: # pragma: no cover
logger.error("Failed to retrieve iRODS path: %s", self.get_irods_error(e))
raise

return {}

def _irods_query(
self,
session: iRODSSession,
root_coll: iRODSCollection,
) -> dict[str, Union[dict[str, iRODSDataObject], list[iRODSDataObject]]]:
"""Get data objects recursively under the given iRODS path."""

ignore_schemes = [k.lower() for k in HASH_SCHEMES if k != self.hash_scheme.upper()]

query = session.query(DataObjectModel, CollectionModel).filter(
Like(CollectionModel.name, f"{root_coll.path}%")
)

data_objs = dict(files=[], checksums={})
for res in query:
# If the 'res' dict is not split into Colllection&Object the resulting iRODSDataObject is not fully functional,
# likely because a name/path/... attribute is overwritten somewhere
magic_icat_id_separator = 500
coll_res = {k: v for k, v in res.items() if k.icat_id >= magic_icat_id_separator}
obj_res = {k: v for k, v in res.items() if k.icat_id < magic_icat_id_separator}
coll = iRODSCollection(root_coll.manager, coll_res)
obj = iRODSDataObject(session.data_objects, parent=coll, results=[obj_res])

if obj.path.endswith("." + self.hash_scheme.lower()):
data_objs["checksums"][obj.path] = obj
elif obj.path.split(".")[-1] not in ignore_schemes:
data_objs["files"].append(obj)

return data_objs

@staticmethod
def parse_irods_collection(irods_data_objs) -> dict[str, list[iRODSDataObject]]:
"""Parse iRODS collection
:param irods_data_objs: iRODS collection.
:type irods_data_objs: dict
:return: Returns dictionary representation of iRODS collection information. Key: File name in iRODS (str);
Value: list of iRODSDataObject (native python-irodsclient object).
"""
# Initialise variables
output_dict = defaultdict(list)

for obj in irods_data_objs["files"]:
output_dict[obj.name].append(obj)

return output_dict
7 changes: 2 additions & 5 deletions cubi_tk/snappy/check_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
from collections import defaultdict
import os
from pathlib import Path
from types import SimpleNamespace
import typing

from biomedsheets import shortcuts
from logzero import logger

from ..common import load_toml_config
from ..sodar_common import RetrieveSodarCollection
from .common import get_biomedsheet_path, load_sheet_tsv
from .retrieve_irods_collection import DEFAULT_HASH_SCHEME, RetrieveIrodsCollection


class FindFilesCommon:
Expand Down Expand Up @@ -684,9 +683,7 @@ def execute(self) -> typing.Optional[int]:
variant_caller_class = VariantCallingChecker

# Find all remote files (iRODS)
pseudo_args = SimpleNamespace(hash_scheme=DEFAULT_HASH_SCHEME)
library_remote_files_dict = RetrieveIrodsCollection(
pseudo_args,
library_remote_files_dict = RetrieveSodarCollection(
self.args.sodar_url,
self.args.sodar_api_token,
self.args.assay_uuid,
Expand Down
Loading

0 comments on commit 3fc38af

Please sign in to comment.