Skip to content

Commit

Permalink
#948: Basic feature is done. Need to 1) create unit test 2) address c…
Browse files Browse the repository at this point in the history
…ase when some of the submitted granules are lost from GRQ ES
  • Loading branch information
philipjyoon committed Aug 15, 2024
1 parent bde5e8b commit 1bb9cfc
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 1 deletion.
13 changes: 12 additions & 1 deletion data_subscriber/cslc/cslc_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from data_subscriber.cmr import async_query_cmr, CMR_TIME_FORMAT
from data_subscriber.cslc_utils import (localize_disp_frame_burst_hist, build_cslc_native_ids, parse_cslc_native_id,
process_disp_frame_burst_hist, download_batch_id_forward_reproc, split_download_batch_id,
parse_cslc_file_name, CSLCDependency)
parse_cslc_file_name, CSLCDependency, determine_submitted_retrigger)
from data_subscriber.query import CmrQuery, DateTimeRange
from data_subscriber.url import cslc_unique_id
from data_subscriber.cslc.cslc_catalog import KCSLCProductCatalog
Expand Down Expand Up @@ -182,6 +182,17 @@ def determine_download_granules(self, granules):
download_granules.append(granule)
self.download_batch_ids[batch_id].add(batch_id)'''

# Rule #4: If we have a new granule for a batch that'd already been submitted,
# submit again using the new granule only for the bursts that match, and only if the production time is later
for batch_id, download_batch in by_download_batch_id.items():
submitted_granules = self.es_conn.get_submitted_granules(batch_id) # batch_id uniquely identifies a triggered evaluation
frame_id, _ = split_download_batch_id(batch_id)
len_burst_ids = len(self.disp_burst_map_hist[frame_id].burst_ids)
trigger_this_batch, batch_download_granules = determine_submitted_retrigger(submitted_granules, download_batch, len_burst_ids)
if trigger_this_batch:
download_granules.extend(batch_download_granules.values())
self.download_batch_ids[batch_id].add(batch_id)

for granule in unsubmitted:
logger.info(f"Merging in unsubmitted granule {granule['unique_id']}: {granule['granule_id']} for triggering consideration")
download_batch = by_download_batch_id[granule["download_batch_id"]]
Expand Down
27 changes: 27 additions & 0 deletions data_subscriber/cslc_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,33 @@ def mark_pending_download_job_submitted(es, doc_id, download_job_id):
}
)

def determine_submitted_retrigger(submitted_granules, download_batch, batch_id, len_burst_ids):
'''Determine if we should retrigger a previously submitted batch '''

if len(submitted_granules) == 0:
return
trigger_this_batch = False
new_bursts = {}
batch_download_granules = {}
for download in download_batch.values():
new_bursts[download["burst_id"]] = download
for submitted_grn in submitted_granules:
burst_id = submitted_grn["burst_id"]
if burst_id in new_bursts:
if new_bursts[burst_id]["creation_ts"] > submitted_grn["creation_ts"]:
trigger_this_batch = True
batch_download_granules[burst_id] = new_bursts[burst_id]
else:
batch_download_granules[burst_id] = submitted_grn

if trigger_this_batch == True:
if len(batch_download_granules) < len_burst_ids:
logger.info(
f"Some of the granules used in previously triggering {batch_id=} are missing. Will retrieve them from CMR")
# TODO above

return trigger_this_batch, batch_download_granules

def parse_cslc_burst_id(native_id):

burst_id, _ = parse_cslc_file_name(native_id)
Expand Down
3 changes: 3 additions & 0 deletions tests/data_subscriber/test_cslc_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ def test_get_prev_day_indices():
2376, 2388, 2412, 2424, 2436, 2448, 2460, 2472, 2484, 2496, 2508, 2520, 2532, 2544, 2556, 2568, 2580, 2592, 2604,
2616, 2628, 2640, 2652, 2664, 2676, 2736, 2724, 2712, 2700]'''

def test_determine_submitted_retrigger():
pass

def test_get_dependent_ccslc_index():
prev_day_indices = [0, 24, 48, 72]
assert "t041_086868_iw1_72" == cslc_utils.get_dependent_ccslc_index(prev_day_indices, 0, 2, "t041_086868_iw1")
Expand Down

0 comments on commit 1bb9cfc

Please sign in to comment.