From 1bb9cfcbe813089bb716b854c2d653cb920096ec Mon Sep 17 00:00:00 2001 From: Philip Yoon Date: Wed, 14 Aug 2024 17:03:24 -0700 Subject: [PATCH] #948: Basic feature is done. Need to 1) create unit test 2) address case when some of the submitted granules are lost from GRQ ES --- data_subscriber/cslc/cslc_query.py | 13 +++++++++++- data_subscriber/cslc_utils.py | 27 +++++++++++++++++++++++++ tests/data_subscriber/test_cslc_util.py | 3 +++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/data_subscriber/cslc/cslc_query.py b/data_subscriber/cslc/cslc_query.py index c31e4e29..8d9dfa15 100644 --- a/data_subscriber/cslc/cslc_query.py +++ b/data_subscriber/cslc/cslc_query.py @@ -9,7 +9,7 @@ from data_subscriber.cmr import async_query_cmr, CMR_TIME_FORMAT from data_subscriber.cslc_utils import (localize_disp_frame_burst_hist, build_cslc_native_ids, parse_cslc_native_id, process_disp_frame_burst_hist, download_batch_id_forward_reproc, split_download_batch_id, - parse_cslc_file_name, CSLCDependency) + parse_cslc_file_name, CSLCDependency, determine_submitted_retrigger) from data_subscriber.query import CmrQuery, DateTimeRange from data_subscriber.url import cslc_unique_id from data_subscriber.cslc.cslc_catalog import KCSLCProductCatalog @@ -182,6 +182,17 @@ def determine_download_granules(self, granules): download_granules.append(granule) self.download_batch_ids[batch_id].add(batch_id)''' + # Rule #4: If we have a new granule for a batch that'd already been submitted, + # submit again using the new granule only for the bursts that match, and only if the production time is later + for batch_id, download_batch in by_download_batch_id.items(): + submitted_granules = self.es_conn.get_submitted_granules(batch_id) # batch_id uniquely identifies a triggered evaluation + frame_id, _ = split_download_batch_id(batch_id) + len_burst_ids = len(self.disp_burst_map_hist[frame_id].burst_ids) + trigger_this_batch, batch_download_granules = determine_submitted_retrigger(submitted_granules, download_batch, len_burst_ids) + if trigger_this_batch: + download_granules.extend(batch_download_granules.values()) + self.download_batch_ids[batch_id].add(batch_id) + for granule in unsubmitted: logger.info(f"Merging in unsubmitted granule {granule['unique_id']}: {granule['granule_id']} for triggering consideration") download_batch = by_download_batch_id[granule["download_batch_id"]] diff --git a/data_subscriber/cslc_utils.py b/data_subscriber/cslc_utils.py index 66d16ff9..0bf967cb 100644 --- a/data_subscriber/cslc_utils.py +++ b/data_subscriber/cslc_utils.py @@ -406,6 +406,33 @@ def mark_pending_download_job_submitted(es, doc_id, download_job_id): } ) +def determine_submitted_retrigger(submitted_granules, download_batch, batch_id, len_burst_ids): + '''Determine if we should retrigger a previously submitted batch ''' + + if len(submitted_granules) == 0: + return + trigger_this_batch = False + new_bursts = {} + batch_download_granules = {} + for download in download_batch.values(): + new_bursts[download["burst_id"]] = download + for submitted_grn in submitted_granules: + burst_id = submitted_grn["burst_id"] + if burst_id in new_bursts: + if new_bursts[burst_id]["creation_ts"] > submitted_grn["creation_ts"]: + trigger_this_batch = True + batch_download_granules[burst_id] = new_bursts[burst_id] + else: + batch_download_granules[burst_id] = submitted_grn + + if trigger_this_batch == True: + if len(batch_download_granules) < len_burst_ids: + logger.info( + f"Some of the granules used in previously triggering {batch_id=} are missing. Will retrieve them from CMR") + # TODO above + + return trigger_this_batch, batch_download_granules + def parse_cslc_burst_id(native_id): burst_id, _ = parse_cslc_file_name(native_id) diff --git a/tests/data_subscriber/test_cslc_util.py b/tests/data_subscriber/test_cslc_util.py index 42ab344d..195cfef4 100644 --- a/tests/data_subscriber/test_cslc_util.py +++ b/tests/data_subscriber/test_cslc_util.py @@ -151,6 +151,9 @@ def test_get_prev_day_indices(): 2376, 2388, 2412, 2424, 2436, 2448, 2460, 2472, 2484, 2496, 2508, 2520, 2532, 2544, 2556, 2568, 2580, 2592, 2604, 2616, 2628, 2640, 2652, 2664, 2676, 2736, 2724, 2712, 2700]''' +def test_determine_submitted_retrigger(): + pass + def test_get_dependent_ccslc_index(): prev_day_indices = [0, 24, 48, 72] assert "t041_086868_iw1_72" == cslc_utils.get_dependent_ccslc_index(prev_day_indices, 0, 2, "t041_086868_iw1")