diff --git a/scripts/apply-ont-metadata b/scripts/apply-ont-metadata index 5999796..ef104ca 100755 --- a/scripts/apply-ont-metadata +++ b/scripts/apply-ont-metadata @@ -41,9 +41,9 @@ secondary metadata (like `update-secondary-metadata` does), but that is purely a optimisation to make the data available without waiting for an `update-secondary-metadata` to be scheduled. -Only runs whose ML warehouse records have been updated recently are updated. The default -window for detecting changes is the 14 days prior to the time when the script is run. -This can be changed using the --begin-date CLI option. +Only runs whose ML warehouse records have been updated within the specified date range. +The default window for detecting changes is the 14 days prior to the time when the +script is run. This can be changed using the --begin-date and --end-date CLI options. """ parser = argparse.ArgumentParser( @@ -54,12 +54,21 @@ add_logging_arguments(parser) parser.add_argument( "--begin-date", "--begin_date", - help="Limit runs found to those changed after this date. Defaults to 14 days ago. " - "The argument must be an ISO8601 UTC date or date and time e.g. 2022-01-30, " - "2022-01-30T11:11:03Z", + help="Limit runs found to those changed at, or after this date. Defaults to " + "14 days ago. The argument must be an ISO8601 UTC date or date and time " + "e.g. 2022-01-30, 2022-01-30T11:11:03Z", type=parse_iso_date, default=datetime.now() - timedelta(days=14), ) +parser.add_argument( + "--end-date", + "--end_date", + help="Limit runs found to those changed at, or before this date. Defaults to " + "the current time. The argument must be an ISO8601 UTC date or date and time " + "e.g. 2022-01-30, 2022-01-30T11:11:03Z", + type=parse_iso_date, + default=datetime.now(), +) parser.add_argument( "--zone", help="Specify a federated iRODS zone in which to find " @@ -70,6 +79,8 @@ parser.add_argument( parser.add_argument( "--database-config", "--database_config", + "--db-config", + "--db_config", help="Configuration file for database connection.", type=argparse.FileType("r"), required=True, diff --git a/scripts/locate-data-objects b/scripts/locate-data-objects index c9561d0..384e36f 100755 --- a/scripts/locate-data-objects +++ b/scripts/locate-data-objects @@ -97,6 +97,8 @@ add_logging_arguments(parser) parser.add_argument( "--database-config", "--database_config", + "--db-config", + "--db_config", help="Configuration file for database connection", type=argparse.FileType("r"), required=True, @@ -166,12 +168,21 @@ ilup_parser = subparsers.add_parser( ilup_parser.add_argument( "--begin-date", "--begin_date", - help="Limit data objects found to those changed after this date. Defaults to 14 " - "days ago. The argument must be an ISO8601 UTC date or date and time e.g. " - "2022-01-30, 2022-01-30T11:11:03Z", + help="Limit data objects found to those whose metadata was changed in the ML " + "warehouse at, or after after this date. Defaults to 14 days ago. The argument " + "must be an ISO8601 UTC date or date and time e.g. 2022-01-30, 2022-01-30T11:11:03Z", type=parse_iso_date, default=datetime.now(timezone.utc) - timedelta(days=14), ) +ilup_parser.add_argument( + "--end-date", + "--end_date", + help="Limit data objects found to those whose metadata was changed in the ML " + "warehouse at, or before this date. Defaults to the current time. The argument " + "must be an ISO8601 UTC date or date and time e.g. 2022-01-30, 2022-01-30T11:11:03Z", + type=parse_iso_date, + default=datetime.now(), +) ilup_parser.add_argument( "--skip-absent-runs", "--skip_absent_runs", @@ -191,17 +202,22 @@ def illumina_updates(cli_args): with Session(engine) as session: num_processed = num_errors = 0 - iso_date = cli_args.begin_date.strftime("%Y-%m-%dT%H:%M:%SZ") + iso_begin = cli_args.begin_date.strftime("%Y-%m-%dT%H:%M:%SZ") + iso_end = cli_args.end_date.strftime("%Y-%m-%dT%H:%M:%SZ") skip_absent_runs = cli_args.skip_absent_runs attempts_per_run = defaultdict(int) success_per_run = defaultdict(int) for i, c in enumerate( - illumina.find_components_changed(session, since=cli_args.begin_date) + illumina.find_updated_components( + session, since=cli_args.begin_date, until=cli_args.end_date + ) ): num_processed += 1 - log.info("Finding data objects", item=i, component=c, since=iso_date) + log.info( + "Finding data objects", item=i, comp=c, since=iso_begin, until=iso_end + ) try: avus = [ @@ -218,8 +234,9 @@ def illumina_updates(cli_args): log.info( "Skipping run after unsuccessful attempts to find it", item=i, - component=c, - since=iso_date, + comp=c, + since=iso_begin, + until=iso_end, attempts=attempts_per_run[c.id_run], ) continue @@ -260,6 +277,15 @@ ontup_parser.add_argument( type=parse_iso_date, default=datetime.now(timezone.utc) - timedelta(days=14), ) +ontup_parser.add_argument( + "--end-date", + "--end_date", + help="Limit collections found to those changed before date. Defaults to the" + "current time. The argument must be an ISO8601 UTC date or date and time e.g." + " 2022-01-30, 2022-01-30T11:11:03Z", + type=parse_iso_date, + default=datetime.now(), +) ontup_parser.add_argument( "--report-tags", "--report_tags", @@ -273,16 +299,22 @@ def ont_updates(cli_args): engine = sqlalchemy.create_engine(dbconfig.url) with Session(engine) as session: num_processed = num_errors = 0 - iso_date = cli_args.begin_date.strftime("%Y-%m-%dT%H:%M:%SZ") + iso_begin = cli_args.begin_date.strftime("%Y-%m-%dT%H:%M:%SZ") + iso_end = cli_args.end_date.strftime("%Y-%m-%dT%H:%M:%SZ") report_tags = cli_args.report_tags for i, c in enumerate( - ont.find_components_changed( - session, include_tags=report_tags, since=cli_args.begin_date + ont.find_updated_components( + session, + include_tags=report_tags, + since=cli_args.begin_date, + until=cli_args.end_date, ) ): num_processed += 1 - log.info("Finding collections", item=i, component=c, since=iso_date) + log.info( + "Finding collections", item=i, comp=c, since=iso_begin, until=iso_end + ) try: avus = [ diff --git a/scripts/update-secondary-metadata b/scripts/update-secondary-metadata index 3872ceb..fb92e5f 100755 --- a/scripts/update-secondary-metadata +++ b/scripts/update-secondary-metadata @@ -56,6 +56,8 @@ add_logging_arguments(parser) parser.add_argument( "--database-config", "--database_config", + "--db-config", + "--db_config", help="Configuration file for database connection.", type=argparse.FileType("r"), required=True, @@ -102,7 +104,6 @@ parser.add_argument( parser.add_argument( "--version", help="Print the version and exit.", action="store_true" ) - parser.add_argument( "--zone", help="Specify a federated iRODS zone in which to find data objects and/or " diff --git a/src/npg_irods/illumina.py b/src/npg_irods/illumina.py index 72c2932..0bd526a 100644 --- a/src/npg_irods/illumina.py +++ b/src/npg_irods/illumina.py @@ -272,9 +272,11 @@ def find_flowcells_by_component( return query.order_by(asc(IseqFlowcell.id_iseq_flowcell_tmp)).all() -def find_components_changed(sess: Session, since: datetime) -> Iterator[Component]: +def find_updated_components( + sess: Session, since: datetime, until: datetime +) -> Iterator[Component]: """Find in the ML warehouse any Illumina sequence components whose tracking - metadata has been changed since a given time. + metadata has been changed within a specified time range A change is defined as the "recorded_at" column (Sample, Study, IseqFlowcell) or "last_changed" colum (IseqProductMetrics) having a timestamp more recent than the @@ -282,7 +284,8 @@ def find_components_changed(sess: Session, since: datetime) -> Iterator[Componen Args: sess: An open SQL session. - since: A datetime query argument. + since: A datetime. + until: A datetime. Returns: An iterator over Components whose tracking metadata have changed. @@ -296,10 +299,10 @@ def find_components_changed(sess: Session, since: datetime) -> Iterator[Componen .join(IseqFlowcell.study) .join(IseqFlowcell.iseq_product_metrics) .filter( - (Sample.recorded_at >= since) - | (Study.recorded_at >= since) - | (IseqFlowcell.recorded_at >= since) - | (IseqProductMetrics.last_changed >= since) + Sample.recorded_at.between(since, until) + | Study.recorded_at.between(since, until) + | IseqFlowcell.recorded_at.between(since, until) + | IseqProductMetrics.last_changed.between(since, until) ) .order_by(asc(IseqFlowcell.id_iseq_flowcell_tmp)) ): diff --git a/src/npg_irods/ont.py b/src/npg_irods/ont.py index 649be2c..3ebfb82 100644 --- a/src/npg_irods/ont.py +++ b/src/npg_irods/ont.py @@ -84,12 +84,13 @@ def apply_metadata( experiment_name=None, instrument_slot=None, since: datetime = None, + until: datetime = None, zone=None, ) -> (int, int, int): """Apply iRODS metadata on ONT run collections whose corresponding ML warehouse - records have been updated at, or more recently than, the specified time. This - function detects runs that are multiplexed and adds relevant tag identifier and - tag index primary metadata to the deplexed collections. + records have been updated within a specified time range. This function detects + runs that are multiplexed and adds relevant tag identifier and tag index primary + metadata to the deplexed collections. Collections to annotate are identified by having ont:experiment_name and ont:instrument_slot metadata already attached to them. This is done for example, @@ -101,6 +102,7 @@ def apply_metadata( instrument_slot: Limit updates to this instrument slot. Optional, requires an experiment_name to be supplied. since: A datetime. Limit updates to experiments changed at this time or later. + until: A datetime. Limit updates to experiments before at this time or earlier. zone: The iRODS zone to search for metadata to update. Returns: @@ -109,6 +111,8 @@ def apply_metadata( """ if since is None: since = datetime.fromtimestamp(0) # Everything since the Epoch + if until is None: + until = datetime.now() if experiment_name is None and instrument_slot is not None: raise ValueError( @@ -119,7 +123,9 @@ def apply_metadata( num_found, num_updated, num_errors = 0, 0, 0 for i, c in enumerate( - find_components_changed(mlwh_session, include_tags=False, since=since) + find_updated_components( + mlwh_session, include_tags=False, since=since, until=until + ) ): if experiment_name is not None and c.experiment_name != experiment_name: continue @@ -309,8 +315,8 @@ def find_recent_expt(sess: Session, since: datetime) -> list[str]: return [val for val, in rows] -def find_components_changed( - sess: Session, since: datetime, include_tags=True +def find_updated_components( + sess: Session, since: datetime, until: datetime, include_tags=True ) -> Iterator[Component]: """Return the components of runs whose ML warehouse metadata has been updated at or since the given date and time. @@ -318,6 +324,7 @@ def find_components_changed( Args: sess: An open SQL session. since: A datetime. + until: A datetime. include_tags: Resolve the components to the granularity of individual tags, rather than as whole runs. Optional, defaults to True. @@ -335,9 +342,9 @@ def find_components_changed( .join(OseqFlowcell.sample) .join(OseqFlowcell.study) .filter( - (Sample.recorded_at >= since) - | (Study.recorded_at >= since) - | (OseqFlowcell.recorded_at >= since) + Sample.recorded_at.between(since, until) + | Study.recorded_at.between(since, until) + | OseqFlowcell.recorded_at.between(since, until) ) .group_by(*columns) ) diff --git a/tests/conftest.py b/tests/conftest.py index 016ae3b..e354da9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -80,10 +80,12 @@ TEST_SQL_STALE_REPLICATE = "setObjectReplStale" TEST_SQL_INVALID_CHECKSUM = "setObjectChecksumInvalid" +# Counts of test fixture experiments NUM_SIMPLE_EXPTS = 5 NUM_MULTIPLEXED_EXPTS = 3 NUM_INSTRUMENT_SLOTS = 5 +# Dates when test fixture experiments were done BEGIN = datetime(year=2020, month=1, day=1, hour=0, minute=0, second=0) EARLY = datetime(year=2020, month=6, day=1, hour=0, minute=0, second=0) LATE = datetime(year=2020, month=6, day=14, hour=0, minute=0, second=0) diff --git a/tests/test_ml_warehouse_queries.py b/tests/test_ml_warehouse_queries.py index 335af64..6198638 100644 --- a/tests/test_ml_warehouse_queries.py +++ b/tests/test_ml_warehouse_queries.py @@ -24,7 +24,11 @@ from conftest import BEGIN, EARLY, LATE, LATEST, ont_tag_identifier from npg_irods.metadata import illumina from npg_irods.metadata.lims import TrackedSample, TrackedStudy -from npg_irods.ont import Component, find_components_changed, find_recent_expt +from npg_irods.ont import Component, find_updated_components, find_recent_expt + +# Even-numbered experiments were done EARLY. Odd-numbered experiments were done LATE +# if they were on an even instrument position, or LATEST if they were on an odd +# instrument position. @m.describe("Finding updated ONT experiments by datetime") @@ -44,7 +48,6 @@ def test_find_recent_expt(self, ont_synthetic_mlwh): ] assert find_recent_expt(ont_synthetic_mlwh, EARLY) == all_expts - # Odd-numbered experiments were done late or latest before_late = LATE - timedelta(days=1) odd_expts = [ "simple_experiment_001", @@ -60,10 +63,37 @@ def test_find_recent_expt(self, ont_synthetic_mlwh): assert none == [] @m.describe("Finding updated experiments and positions by datetime") - @m.context("When a query date is provided") + @m.context("When a query dates are provided") @m.it("Finds the correct experiment, slot tuples") - def test_find_recent_component(self, ont_synthetic_mlwh): + def test_find_updated_components(self, ont_synthetic_mlwh): before_late = LATE - timedelta(days=1) + even_expts = [ + Component(*args) + for args in [ + ("multiplexed_experiment_002", 1), + ("multiplexed_experiment_002", 2), + ("multiplexed_experiment_002", 3), + ("multiplexed_experiment_002", 4), + ("multiplexed_experiment_002", 5), + ("simple_experiment_002", 1), + ("simple_experiment_002", 2), + ("simple_experiment_002", 3), + ("simple_experiment_002", 4), + ("simple_experiment_002", 5), + ("simple_experiment_004", 1), + ("simple_experiment_004", 2), + ("simple_experiment_004", 3), + ("simple_experiment_004", 4), + ("simple_experiment_004", 5), + ] + ] + assert [ + c + for c in find_updated_components( + ont_synthetic_mlwh, EARLY, before_late, include_tags=False + ) + ] == even_expts + odd_expts = [ Component(*args) for args in [ @@ -96,8 +126,8 @@ def test_find_recent_component(self, ont_synthetic_mlwh): ] assert [ c - for c in find_components_changed( - ont_synthetic_mlwh, before_late, include_tags=False + for c in find_updated_components( + ont_synthetic_mlwh, before_late, LATEST, include_tags=False ) ] == odd_expts @@ -115,25 +145,26 @@ def test_find_recent_component(self, ont_synthetic_mlwh): ] assert [ c - for c in find_components_changed( - ont_synthetic_mlwh, before_latest, include_tags=False + for c in find_updated_components( + ont_synthetic_mlwh, before_latest, LATEST, include_tags=False ) ] == odd_positions - after_latest = LATEST + timedelta(days=1) + after_latest1 = LATEST + timedelta(days=1) + after_latest100 = LATEST + timedelta(days=100) assert [ c - for c in find_components_changed( - ont_synthetic_mlwh, after_latest, include_tags=False + for c in find_updated_components( + ont_synthetic_mlwh, after_latest1, after_latest100, include_tags=False ) ] == [] @m.describe( "Finding updated experiments, positions and tag identifiers by datetime" ) - @m.context("When a query date is provided") + @m.context("When query dates are provided") @m.it("Finds the correct experiment, slot, tag identifier tuples") - def test_find_recent_component_tag(self, ont_synthetic_mlwh): + def test_find_updated_components_tag(self, ont_synthetic_mlwh): before_latest = LATEST - timedelta(days=1) odd_positions = [] @@ -146,16 +177,17 @@ def test_find_recent_component_tag(self, ont_synthetic_mlwh): assert [ c - for c in find_components_changed( - ont_synthetic_mlwh, before_latest, include_tags=True + for c in find_updated_components( + ont_synthetic_mlwh, before_latest, LATEST, include_tags=True ) ] == odd_positions - after_latest = LATEST + timedelta(days=1) + after_latest1 = LATEST + timedelta(days=1) + after_latest100 = LATEST + timedelta(days=100) assert [ c - for c in find_components_changed( - ont_synthetic_mlwh, after_latest, include_tags=True + for c in find_updated_components( + ont_synthetic_mlwh, after_latest1, after_latest100, include_tags=True ) ] == []