Skip to content

Commit

Permalink
Merge pull request #249 from kjsanger/feature/pacbio-metadata-updates
Browse files Browse the repository at this point in the history
Add PacBio metadata and ACL updates
  • Loading branch information
kjsanger authored Feb 14, 2024
2 parents 4093817 + 85c6e7c commit 078c2fd
Show file tree
Hide file tree
Showing 15 changed files with 1,074 additions and 274 deletions.
304 changes: 200 additions & 104 deletions src/npg_irods/cli/locate_data_objects.py

Large diffs are not rendered by default.

33 changes: 32 additions & 1 deletion src/npg_irods/cli/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import logging
import logging.config
from argparse import ArgumentParser, ArgumentTypeError
from datetime import datetime
from datetime import datetime, timedelta, timezone
from typing import Iterable

import dateutil.parser
Expand All @@ -48,6 +48,37 @@ def rods_path(path):
return path


def add_date_range_arguments(parser: argparse, begin_delta=14):
"""Add --begin-date and --end-date arguments to the argument parser.
Args:
parser: The parser to modify
begin_delta: The time delta for the defaullt begin date relative to the default
end date (which is today). Defaults to 14 days i.e. --begin-date is 14 days ago.
Returns:
The parser.
"""
parser.add_argument(
"--begin-date",
"--begin_date",
help="Limit to after this date. Defaults to 14 days ago. The argument must "
"be an ISO8601 UTC date or date and time "
"e.g. 2022-01-30, 2022-01-30T11:11:03Z",
type=parse_iso_date,
default=datetime.now(timezone.utc) - timedelta(days=begin_delta),
)
parser.add_argument(
"--end-date",
"--end_date",
help="Limit to before this date. Defaults to the current time. The argument "
"must be an ISO8601 UTC date or date and time "
"e.g. 2022-01-30, 2022-01-30T11:11:03Z",
type=parse_iso_date,
default=datetime.now(),
)


def add_logging_arguments(parser: ArgumentParser) -> ArgumentParser:
"""Adds standard CLI logging arguments to a parser.
Expand Down
2 changes: 1 addition & 1 deletion src/npg_irods/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def is_ultima_genomics(path: PathLike | str) -> bool:
path: An iRODS path.
Returns:
True if Ultime data.
True if Ultima data.
"""
return re.match(r"/seq/ug\b", str(path)) is not None

Expand Down
2 changes: 2 additions & 0 deletions src/npg_irods/db/mlwh.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from sqlalchemy import (
BigInteger,
Boolean,
DateTime,
Enum,
ForeignKey,
Expand Down Expand Up @@ -322,6 +323,7 @@ class PacBioProductMetrics(Base):
ForeignKey("pac_bio_run.id_pac_bio_tmp"), nullable=False, index=True
)
id_pac_bio_product = mapped_column(String(64), nullable=False, unique=True)
qc = mapped_column(Boolean, nullable=False)

pac_bio_run_well_metrics: Mapped["PacBioRunWellMetrics"] = relationship(
"PacBioRunWellMetrics", back_populates="pac_bio_product_metrics"
Expand Down
72 changes: 35 additions & 37 deletions src/npg_irods/illumina.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@

log = get_logger(__package__)

SQL_CHUNK_SIZE = 1000


@unique
class TagIndex(Enum):
Expand Down Expand Up @@ -189,39 +191,39 @@ def ensure_secondary_metadata_updated(
Returns:
True if updated.
"""
zone = infer_zone(item)
secondary_metadata, acl = [], []

def empty_acl(*args):
return []
if managed_access := requires_managed_access(item):
log.debug("Requires managed access", path=item)
else:
log.debug("Does not require managed access", path=item)

if requires_full_metadata(item):
if full_metadata := requires_full_metadata(item):
log.debug("Requires full metadata", path=item)
sample_fn, study_fn = make_sample_metadata, make_study_metadata
else:
log.debug("Requires reduced metadata", path=item)
sample_fn, study_fn = make_reduced_sample_metadata, make_reduced_study_metadata

if requires_managed_access(item):
log.debug("Requires managed access", path=item)
acl_fn = make_sample_acl
else:
log.debug("Does not require managed access", path=item)
acl_fn = empty_acl

# Each component may be associated with multiple flowcells
components = find_associated_components(item)
log.debug("Found associated components", path=item, comp=components)

zone = infer_zone(item)
secondary_metadata, acl = [], []
for c in components:
flowcells = find_flowcells_by_component(
mlwh_session, c, include_controls=include_controls
)
log.debug("Found associated flowcells", path=item, flowcells=flowcells, comp=c)
for fc in flowcells:
secondary_metadata.extend(sample_fn(fc.sample))
secondary_metadata.extend(study_fn(fc.study))
acl.extend(acl_fn(fc.sample, fc.study, subset=c.subset, zone=zone))
if full_metadata:
secondary_metadata.extend(make_sample_metadata(fc.sample))
secondary_metadata.extend(make_study_metadata(fc.study))
else:
secondary_metadata.extend(make_reduced_sample_metadata(fc.sample))
secondary_metadata.extend(make_reduced_study_metadata(fc.study))

if managed_access:
acl.extend(
make_sample_acl(fc.sample, fc.study, subset=c.subset, zone=zone)
)

# Remove duplicates
secondary_metadata = sorted(set(secondary_metadata))
Expand Down Expand Up @@ -342,10 +344,10 @@ def find_associated_components(item: DataObject | Collection) -> list[Component]
need to extend the split_name() function to handle the new file type.
Args:
item:
item: A Collection or DataObject.
Returns:
A list of components.
"""
errmsg = "Failed to find an associated data object bearing component metadata"

Expand Down Expand Up @@ -411,8 +413,7 @@ def requires_full_metadata(obj: DataObject) -> bool:
metadata storage because the iRODS metadata link table is already >3 billion rows,
which is impacting query performance.
"""
full = [".bam", ".cram"]
return any(suffix in full for suffix in PurePath(obj.name).suffixes)
return any(suffix in [".bam", ".cram"] for suffix in PurePath(obj.name).suffixes)


def requires_managed_access(obj: DataObject) -> bool:
Expand All @@ -436,11 +437,6 @@ def requires_managed_access(obj: DataObject) -> bool:
return any(suffix in managed for suffix in PurePath(obj.name).suffixes)


def has_component_metadata(item: Collection | DataObject) -> bool:
"""Return True if the given item has Illumina component metadata."""
return len(item.metadata(SeqConcept.COMPONENT)) > 0


def find_qc_collection(path: Collection | DataObject) -> Collection:
qc = Collection(path.path / "qc")
if not qc.exists():
Expand All @@ -455,7 +451,7 @@ def find_flowcells_by_component(
Args:
sess: An open SQL session.
component: A component
component: A component.
include_controls: If False, include query arguments to exclude spiked-in
controls in the result. Defaults to False.
Expand Down Expand Up @@ -499,21 +495,21 @@ def find_updated_components(
sess: Session, since: datetime, until: datetime
) -> Iterator[Component]:
"""Find in the ML warehouse any Illumina sequence components whose tracking
metadata has been changed within a specified time range
metadata has been changed within the given time range.
A change is defined as the "recorded_at" column (Sample, Study, IseqFlowcell) or
"last_changed" colum (IseqProductMetrics) having a timestamp more recent than the
given time.
"last_changed" column (IseqProductMetrics) having a timestamp within the range.
Args:
sess: An open SQL session.
since: A datetime.
until: A datetime.
sess: An open ML warehouse session.
since: The start of the time range.
until: The end of the time range.
Returns:
An iterator over Components whose tracking metadata have changed.
"""
for rpt in (

query = (
sess.query(
IseqProductMetrics.id_run, IseqFlowcell.position, IseqFlowcell.tag_index
)
Expand All @@ -532,5 +528,7 @@ def find_updated_components(
asc(IseqFlowcell.position),
asc(IseqFlowcell.tag_index),
)
):
yield Component(*rpt)
)

for id_run, position, tag_index in query.yield_per(SQL_CHUNK_SIZE):
yield Component(id_run, position, tag_index=tag_index)
12 changes: 6 additions & 6 deletions src/npg_irods/metadata/lims.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,14 @@ def make_sample_acl(
Returns:
An ACL
"""
if subset is not None and subset is subset.XAHUMAN:
irods_group = f"{STUDY_IDENTIFIER_PREFIX}{study.id_study_lims}"
perm = Permission.NULL if sample.consent_withdrawn else Permission.READ

if subset is SeqSubset.XAHUMAN:
return []

if subset is not None and subset is subset.HUMAN:
irods_group = f"{STUDY_IDENTIFIER_PREFIX}{study.id_study_lims}_human"
else:
irods_group = f"{STUDY_IDENTIFIER_PREFIX}{study.id_study_lims}"
perm = Permission.NULL if sample.consent_withdrawn else Permission.READ
if subset is SeqSubset.HUMAN:
return [AC(irods_group + "_human", perm, zone=zone)]

return [AC(irods_group, perm, zone=zone)]

Expand Down
23 changes: 21 additions & 2 deletions src/npg_irods/metadata/pacbio.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,40 @@ class Instrument(AsValueEnum):
PLATE_NUMBER = "plate_number"
RUN_NAME = "run"
WELL_LABEL = "well"
SOURCE = "source"
TAG_IDENTIFIER = "tag_index"
TAG_SEQUENCE = "tag_sequence"


def add_well_padding(well_label: str) -> str:
"""
Add padding to well label string.
Args:
well_label: The unpadded well label string.
Returns: The padded well label string as it is in iRODS metadata.
"""
match = re.search(r"^([A-Z])(\d+)$", well_label)
if match is None:
raise ValueError(f"Invalid well label: '{well_label}'")

return match.group(1) + match.group(2).rjust(2, "0")


def remove_well_padding(well_label: str) -> str:
"""
Remove padding from well label string.
Args:
well_label: The padded well label as it is in iRODS metadata
well_label: The padded well label as it is in iRODS metadata.
Returns: The unpadded well label string
"""
match = re.search(r"^([A-Z])(\d+)$", well_label)
if match is None:
raise ValueError(f"Invalid well label: '{well_label}'")

return match.group(1) + match.group(2).lstrip("0")


Expand Down Expand Up @@ -170,7 +190,6 @@ def backfill_id_products(
num_clients: The number of baton clients to use. Defaults to 1.
Returns: bool
"""
rv = True
writer = LocationWriter(PACBIO, path=out_path)
Expand Down
5 changes: 4 additions & 1 deletion src/npg_irods/ont.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ class Component:
"""The tag identifier, if the reads are from a multiplexed pool."""

def __init__(
self, experiment_name: str, instrument_slot: int, tag_identifier: str = None
self,
experiment_name: str,
instrument_slot: int,
tag_identifier: Optional[str] = None,
):
self.experiment_name = experiment_name
self.instrument_slot = instrument_slot
Expand Down
Loading

0 comments on commit 078c2fd

Please sign in to comment.