Skip to content

Commit

Permalink
Merge branch 'main' into 96-move-isa-templates
Browse files Browse the repository at this point in the history
  • Loading branch information
sellth committed Sep 18, 2023
2 parents 24c370c + 3939f83 commit 51768a0
Show file tree
Hide file tree
Showing 23 changed files with 563 additions and 183 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Tooling for connecting GitLab, pipelines, and SODAR at CUBI.
Prerequisites when using conda:

```bash
$ conda create -n cubi-tk python=3.7
$ conda create -n cubi-tk python=3.10
$ conda activate cubi-tk
```

Expand Down
33 changes: 26 additions & 7 deletions cubi_tk/irods/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
import typing

from irods.collection import iRODSCollection
from irods.column import Like
from irods.data_object import iRODSDataObject
from irods.models import Collection as CollectionModel
from irods.models import DataObject as DataObjectModel
from irods.session import iRODSSession
from logzero import logger
import tqdm
Expand Down Expand Up @@ -103,16 +106,32 @@ def get_irods_error(cls, e: Exception):
es = str(e)
return es if es != "None" else e.__class__.__name__

def get_data_objs(self, root_coll: iRODSCollection):
def get_data_objs(
self, root_coll: iRODSCollection
) -> typing.Dict[
str, typing.Union[typing.Dict[str, iRODSDataObject], typing.List[iRODSDataObject]]
]:
"""Get data objects recursively under the given iRODS path."""
data_objs = dict(files=[], checksums={})
ignore_schemes = [k.lower() for k in HASH_SCHEMES if k != self.args.hash_scheme.upper()]
for res in root_coll.walk():
for obj in res[2]:
if obj.path.endswith("." + self.args.hash_scheme.lower()):
data_objs["checksums"][obj.path] = obj
elif obj.path.split(".")[-1] not in ignore_schemes:
data_objs["files"].append(obj)
irods_sess = root_coll.manager.sess

query = irods_sess.query(DataObjectModel, CollectionModel).filter(
Like(CollectionModel.name, f"{root_coll.path}%")
)

for res in query:
# If the 'res' dict is not split into Colllection&Object the resulting iRODSDataObject is not fully functional, likely because a name/path/... attribute is overwritten somewhere
coll_res = {k: v for k, v in res.items() if k.icat_id >= 500}
obj_res = {k: v for k, v in res.items() if k.icat_id < 500}
coll = iRODSCollection(root_coll.manager, coll_res)
obj = iRODSDataObject(irods_sess.data_objects, parent=coll, results=[obj_res])

if obj.path.endswith("." + self.args.hash_scheme.lower()):
data_objs["checksums"][obj.path] = obj
elif obj.path.split(".")[-1] not in ignore_schemes:
data_objs["files"].append(obj)

return data_objs

def check_args(self, _args):
Expand Down
10 changes: 5 additions & 5 deletions cubi_tk/snappy/check_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def coordinate_run(self, check_name):
# Restrict dictionary to directories associated with step
subset_remote_files_dict = {}
for key in self.remote_files_dict:
if all((check_name in dat.irods_path for dat in self.remote_files_dict[key])):
if all((check_name in dat.path for dat in self.remote_files_dict[key])):
subset_remote_files_dict[key] = self.remote_files_dict.get(key)

# Parse local files - remove library reference
Expand Down Expand Up @@ -333,7 +333,7 @@ def compare_md5_files(remote_dict, in_both_set):
# Compare to remote MD5
for irods_dat in remote_dict.get(original_file_name):
if local_md5 != irods_dat.FILE_MD5SUM:
different_md5_list.append((md5_file.replace(".md5", ""), irods_dat.irods_path))
different_md5_list.append((md5_file.replace(".md5", ""), irods_dat.path))
else:
same_md5_list.append(md5_file.replace(".md5", ""))
# BONUS - check remote replicas
Expand All @@ -345,7 +345,7 @@ def compare_md5_files(remote_dict, in_both_set):
):
logger.error(
f"iRODS metadata checksum not consistent with checksum file...\n"
f"File: {irods_dat.irods_path}\n"
f"File: {irods_dat.path}\n"
f"File checksum: {irods_dat.FILE_MD5SUM}\n"
f"Metadata checksum: {', '.join(irods_dat.REPLICAS_MD5SUM)}\n"
)
Expand Down Expand Up @@ -393,7 +393,7 @@ def compare_local_and_remote_files(local_dict, remote_dict):
# Only remote
for file_ in all_remote_files_set - all_local_files_set:
for irods_dat in remote_dict[file_]:
only_remote_set.add(irods_dat.irods_path)
only_remote_set.add(irods_dat.path)

# Only local
for file_ in all_local_files_set - all_remote_files_set:
Expand All @@ -419,7 +419,7 @@ def report_multiple_file_versions_in_sodar(remote_dict):
inner_dict = {}
for file_, irods_list in remote_dict.items():
if len(irods_list) > 1:
inner_dict[file_] = [dat.irods_path for dat in irods_list]
inner_dict[file_] = [dat.path for dat in irods_list]
# Format and display information - if any
if len(inner_dict) > 0:
pairs_str = ""
Expand Down
53 changes: 16 additions & 37 deletions cubi_tk/snappy/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Common functionality for SNAPPY."""

import pathlib
import typing

Expand All @@ -8,26 +7,6 @@
from logzero import logger
import yaml

#: Dependencies between the SNAPPY steps.
DEPENDENCIES: typing.Dict[str, typing.Tuple[str, ...]] = {
"ngs_mapping": (),
"roh_calling": ("variant_calling",),
"variant_calling": ("ngs_mapping",),
"variant_export": ("variant_calling",),
"variant_export_external": (),
"targeted_seq_cnv_calling": ("ngs_mapping",),
"targeted_seq_cnv_annotation": ("targeted_seq_cnv_calling",),
"targeted_seq_cnv_export": ("targeted_seq_cnv_annotation",),
"wgs_sv_calling": ("ngs_mapping",),
"wgs_sv_annotation": ("wgs_sv_calling",),
"wgs_sv_export": ("wgs_sv_annotation",),
"wgs_sv_export_external": (),
"wgs_cnv_calling": ("ngs_mapping",),
"wgs_cnv_annotation": ("wgs_cnv_calling",),
"wgs_cnv_export": ("wgs_cnv_annotation",),
"wgs_cnv_export_external": (),
}


class CouldNotFindPipelineRoot(Exception):
"""Raised when ``.snappy_pipeline`` could not be found."""
Expand All @@ -37,6 +16,22 @@ class CouldNotFindBioMedSheet(Exception):
"""Raised when BioMedSheet could not be found in configuration file."""


def load_sheet_tsv(path_tsv, tsv_shortcut="germline"):
"""Load sample sheet.
:param path_tsv: Path to sample sheet TSV file.
:type path_tsv: pathlib.Path
:param tsv_shortcut: Sample sheet type. Default: 'germline'.
:type tsv_shortcut: str
:return: Returns Sheet model.
"""
load_tsv = getattr(io_tsv, "read_%s_tsv_sheet" % tsv_shortcut)
with open(path_tsv, "rt") as f:
return load_tsv(f, naming_scheme=NAMING_ONLY_SECONDARY_ID)


def find_snappy_root_dir(
start_path: typing.Union[str, pathlib.Path], more_markers: typing.Iterable[str] = ()
):
Expand All @@ -63,22 +58,6 @@ def find_snappy_root_dir(
raise CouldNotFindPipelineRoot()


def load_sheet_tsv(path_tsv, tsv_shortcut="germline"):
"""Load sample sheet.
:param path_tsv: Path to sample sheet TSV file.
:type path_tsv: pathlib.Path
:param tsv_shortcut: Sample sheet type. Default: 'germline'.
:type tsv_shortcut: str
:return: Returns Sheet model.
"""
load_tsv = getattr(io_tsv, "read_%s_tsv_sheet" % tsv_shortcut)
with open(path_tsv, "rt") as f:
return load_tsv(f, naming_scheme=NAMING_ONLY_SECONDARY_ID)


def get_biomedsheet_path(start_path, uuid):
"""Get biomedsheet path, i.e., sample sheet.
Expand Down
62 changes: 51 additions & 11 deletions cubi_tk/snappy/kickoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,46 +12,83 @@
from cubi_tk.exceptions import ParseOutputException

from . import common
from .snappy_workflows import SnappyWorkflowManager


class SnappyMissingPackageException(Exception):
def __str__(self):
return "snappy-pipeline is not installed. This function will not work."


class SnappyMissingDependencyException(Exception):
"""Raised if dependencies of steps do not exist in the current workflow."""

def __init__(
self, step_name: str, step_dependencies: typing.List[str], existing_steps: typing.List[str]
):
self.step_name = step_name
self.step_dependencies = step_dependencies
self.existing_steps = existing_steps

def __str__(self):
return f"{self.step_name} requires {self.step_dependencies}, but only {self.existing_steps} exist in workflow directory."


def run(
args, _parser: argparse.ArgumentParser, _subparser: argparse.ArgumentParser
) -> typing.Optional[int]:
logger.info("Try to find SNAPPY pipeline directory...")
try:
path = common.find_snappy_root_dir(args.path or os.getcwd(), common.DEPENDENCIES.keys())
path = common.find_snappy_root_dir(args.path or os.getcwd())
except common.CouldNotFindPipelineRoot:
return 1

# TODO: this assumes standard naming which is a limitation...
logger.info("Looking for pipeline directories (assuming standard naming)...")
logger.info("Looking for pipeline directories (needs to contain snappy config.yaml)...")
logger.debug("Looking in %s", path)
step_set = {name for name in common.DEPENDENCIES if (path / name).exists()}

manager = SnappyWorkflowManager.from_snappy()

if manager is None:
raise SnappyMissingPackageException

step_dependencies = {}
folder_steps = manager.get_snappy_step_directories(path)
for step_name, step_path in folder_steps.items():
dependencies = manager.get_workflow_step_dependencies(step_path)
if not all(dep in folder_steps for dep in dependencies):
raise SnappyMissingDependencyException(
step_name, dependencies, list(folder_steps.keys())
)

step_dependencies[step_name] = dependencies

steps: typing.List[str] = []
for names in toposort({k: set(v) for k, v in common.DEPENDENCIES.items()}):
steps += [name for name in names if name in step_set]
for names in toposort({k: set(v) for k, v in step_dependencies.items()}):
steps += names
logger.info("Will run the steps: %s", ", ".join(steps))

logger.info("Submitting with sbatch...")
jids: typing.Dict[str, str] = {}

for step in steps:
path_cache = path / step / ".snappy_path_cache"
step_path = folder_steps[step]
path_cache = step_path / ".snappy_path_cache"
if step == "ngs_mapping" and path_cache.exists():
age_cache = time.time() - path_cache.stat().st_mtime
max_age = 24 * 60 * 60 # 1d
if age_cache > max_age:
logger.info("Cache older than %d - purging", max_age)
path_cache.unlink()
dep_jids = [jids[dep] for dep in common.DEPENDENCIES[step] if dep in jids]
dep_jids = [jids[dep] for dep in step_dependencies[step] if dep in jids]
cmd = ["sbatch"]
if dep_jids:
cmd += ["--dependency", "afterok:%s" % ":".join(map(str, dep_jids))]
cmd += ["pipeline_job.sh"]
logger.info("Submitting step %s: %s", step, " ".join(cmd))
logger.info("Submitting step %s (./%s): %s", step, step_path.name, " ".join(cmd))
if args.dry_run:
jid = "<%s>" % step
else:
stdout_raw = subprocess.check_output(cmd, cwd=str(path / step), timeout=args.timeout)
stdout_raw = subprocess.check_output(cmd, cwd=str(step_path), timeout=args.timeout)
stdout = stdout_raw.decode("utf-8")
if not stdout.startswith("Submitted batch job "):
raise ParseOutputException("Did not understand sbatch output: %s" % stdout)
Expand All @@ -75,7 +112,10 @@ def setup_argparse(parser: argparse.ArgumentParser) -> None:
)

parser.add_argument(
"--timeout", default=10, type=int, help="Number of seconds to wait for commands."
"--timeout",
default=10,
type=int,
help="Number of seconds to wait for commands.",
)

parser.add_argument(
Expand Down
20 changes: 13 additions & 7 deletions cubi_tk/snappy/pull_data_common.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
import os
from pathlib import Path
from types import SimpleNamespace

Expand Down Expand Up @@ -30,7 +31,7 @@ def filter_irods_collection(self, identifiers, remote_files_dict, file_type):
:type identifiers: list
:param remote_files_dict: Dictionary with iRODS collection information. Key: file name as string (e.g.,
'P001-N1-DNA1-WES1.vcf.gz'); Value: iRODS data (``IrodsDataObject``).
'P001-N1-DNA1-WES1.vcf.gz'); Value: iRODS data (``iRODSDataObject``).
:type remote_files_dict: dict
:param file_type: File type, example: 'bam' or 'vcf'.
Expand All @@ -52,7 +53,7 @@ def filter_irods_collection(self, identifiers, remote_files_dict, file_type):
# Note: if a file with the same name is present in both assay and in a common file, it will be ignored.
in_common_links = False
for irods_obj in value:
in_common_links = self._irods_path_in_common_links(irods_obj.irods_path)
in_common_links = self._irods_path_in_common_links(irods_obj.path)
if in_common_links:
break

Expand Down Expand Up @@ -93,7 +94,9 @@ def get_assay_uuid(self, sodar_url, sodar_api_token, project_uuid):
:return: Returns assay UUID.
"""
investigation = api.samplesheet.retrieve(
sodar_url=sodar_url, sodar_api_token=sodar_api_token, project_uuid=project_uuid
sodar_url=sodar_url,
sodar_api_token=sodar_api_token,
project_uuid=project_uuid,
)
for study in investigation.studies.values():
for _assay_uuid in study.assays:
Expand Down Expand Up @@ -123,11 +126,14 @@ def get_irods_files(self, irods_local_path_pairs, force_overwrite=False):
file_name = pair[0].split("/")[-1]
irods_path = pair[0]
local_out_path = pair[1]
logger.info(f"Retrieving '{file_name}' from: {irods_path}")
# Create output directory if necessary
Path(local_out_path).parent.mkdir(parents=True, exist_ok=True)
# Get file
irods_sessions[0].data_objects.get(irods_path, local_out_path, **kw_options)
if os.path.exists(local_out_path) and not force_overwrite:
logger.info(f"{file_name} already exists. Force_overwrite to re-download.")
else:
logger.info(f"Retrieving '{file_name}' from: {irods_path}")
irods_sessions[0].data_objects.get(irods_path, local_out_path, **kw_options)

except OVERWRITE_WITHOUT_FORCE_FLAG:
logger.error(
Expand Down Expand Up @@ -169,7 +175,7 @@ def sort_irods_object_by_date_in_path(self, irods_obj_list):
/sodarZone/projects/../<PROJECT_UUID>/.../assay_<ASSAY_UUID>/<LIBRARY_NAME>/.../<DATE>/...
:param irods_obj_list: List of iRODS objects derived from collection in SODAR.
:type irods_obj_list: List[IrodsDataObject]
:type irods_obj_list: List[iRODSDataObject]
:return: Returns inputted list sorted from latest to earliest iRODS object.
"""
Expand All @@ -178,7 +184,7 @@ def sort_irods_object_by_date_in_path(self, irods_obj_list):
return irods_obj_list
return sorted(
irods_obj_list,
key=lambda irods_obj: self._find_date_in_path(irods_obj.irods_path),
key=lambda irods_obj: self._find_date_in_path(irods_obj.path),
reverse=True,
)

Expand Down
11 changes: 5 additions & 6 deletions cubi_tk/snappy/pull_processed_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def pair_ipath_with_outdir(self, remote_files_dict, output_dir, assay_uuid, retr
"""Pair iRODS path with local output directory
:param remote_files_dict: Dictionary with iRODS collection information. Key: file name as string (e.g.,
'P001-N1-DNA1-WES1'); Value: iRODS data (``IrodsDataObject``).
'P001-N1-DNA1-WES1'); Value: iRODS data (``iRODSDataObject``).
:type remote_files_dict: dict
:param output_dir: Output directory path.
Expand Down Expand Up @@ -328,19 +328,18 @@ def pair_ipath_with_outdir(self, remote_files_dict, output_dir, assay_uuid, retr
# /sodarZone/projects/../<PROJECT_UUID>/sample_data/study_<STUDY_UUID>/assay_<ASSAY_UUID>/<LIBRARY_NAME>
try:
irods_dir_structure = os.path.dirname(
str(irods_obj.irods_path).split(f"assay_{assay_uuid}/")[1]
str(irods_obj.path).split(f"assay_{assay_uuid}/")[1]
)
_out_path = os.path.join(output_dir, irods_dir_structure, irods_obj.file_name)
_out_path = os.path.join(output_dir, irods_dir_structure, irods_obj.name)
except IndexError:
logger.warning(
f"Provided Assay UUID '{assay_uuid}' is not present in SODAR path, "
f"hence directory structure won't be preserved.\n"
f"All files will be stored in root of output directory: {output_list}"
)
_out_path = os.path.join(output_dir, irods_obj.file_name)
_out_path = os.path.join(output_dir, irods_obj.name)
# Update output
output_list.append((irods_obj.irods_path, _out_path))
output_list.append((irods_obj.irods_path + ".md5", _out_path + ".md5"))
output_list.append((irods_obj.path, _out_path))

return output_list

Expand Down
Loading

0 comments on commit 51768a0

Please sign in to comment.