Skip to content

Commit

Permalink
#925: The first half of this task. CSLC Query now will check for comp…
Browse files Browse the repository at this point in the history
…ressed cslc satiety. If not satified, the download job parameters are stored in GRQ ES for future evaluation and execution
  • Loading branch information
philipjyoon committed Jul 20, 2024
1 parent e2f62cb commit 8965171
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 73 deletions.
36 changes: 7 additions & 29 deletions data_subscriber/asf_cslc_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from data_subscriber.cmr import Collection
from data_subscriber.cslc.cslc_static_catalog import CSLCStaticProductCatalog
from data_subscriber.download import SessionWithHeaderRedirection
from data_subscriber.cslc_utils import parse_cslc_burst_id, build_cslc_static_native_ids, determine_k_cycle
from data_subscriber.cslc_utils import parse_cslc_burst_id, build_cslc_static_native_ids, determine_k_cycle, get_dependent_compressed_cslcs
from data_subscriber.asf_rtc_download import AsfDaacRtcDownload
from data_subscriber.cslc.cslc_static_query import CslcStaticCmrQuery
from data_subscriber.url import cslc_unique_id
Expand Down Expand Up @@ -181,37 +181,15 @@ def run_download(self, args, token, es_conn, netloc, username, password, cmr, jo
os.remove(iono_file)

# Determine M Compressed CSLCs by querying compressed cslc GRQ ES -------------->
# Uses ccslc_m_index field which looks like T100-213459-IW3_417 (burst_id_acquisition-cycle-index)
k, m = es_conn.get_k_and_m(args.batch_ids[0])
logger.info(f"{k=}, {m=}")

''' Search for all previous M compressed CSLCs
prev_day_indices: The acquisition cycle indices of all collects that show up in disp_burst_map previous of
the latest acq cycle index
'''
prev_day_indices = get_prev_day_indices(latest_acq_cycle_index, frame_id, self.disp_burst_map, args, token, cmr, settings)

# special case for early sensing time series. Reduce m if there aren't enough sensing times in the database in the first place
# For example, if k was 4 and m was 3, but there are only 4 previous sensing times in the database, then m should be 2
if len(prev_day_indices) < k * (m - 1):
m = (len(prev_day_indices) // k) + 1

for mm in range(0, m-1): # m parameter is inclusive of the current frame at hand
for burst_id in burst_id_set:
ccslc_m_index = get_dependent_ccslc_index(prev_day_indices, mm, k, burst_id) #looks like t034_071112_iw3_461
logger.info("Retrieving Compressed CSLCs for ccslc_m_index: %s", ccslc_m_index)
ccslcs = es_conn.es.query(
index=_C_CSLC_ES_INDEX_PATTERNS,
body={"query": { "bool": { "must": [
{"term": {"metadata.ccslc_m_index.keyword": ccslc_m_index}}]}}})

# Should have exactly one compressed cslc per acq cycle per burst
if len(ccslcs) != 1:
raise Exception(f"Expected 1 Compressed CSLC for {ccslc_m_index}, got {len(ccslcs)}")

for ccslc in ccslcs:
c_cslc_s3paths.extend(ccslc["_source"]["metadata"]["product_s3_paths"])
# <------------------------- Compressed CSLC look up
ccslcs = get_dependent_compressed_cslcs(frame_id, latest_acq_cycle_index, k, m, args, self.disp_burst_map, es_conn.es)
if ccslcs is False:
raise Exception(f"Failed to get compressed cslc for frame {frame_id} and day index {latest_acq_cycle_index}")

for ccslc in ccslcs:
c_cslc_s3paths.extend(ccslc["_source"]["metadata"]["product_s3_paths"])

# Look up bounding box for frame
bounding_box = get_bounding_box_for_frame(int(frame_id), self.frame_geo_map)
Expand Down
61 changes: 61 additions & 0 deletions data_subscriber/cslc_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

DISP_FRAME_BURST_MAP_HIST = 'opera-disp-s1-consistent-burst-ids-with-datetimes.json'
FRAME_GEO_SIMPLE_JSON = 'frame-geometries-simple.geojson'
_C_CSLC_ES_INDEX_PATTERNS = "grq_1_l2_cslc_s1_compressed*"
_BLOCKED_CSLC_DOWNLOADS_ES_INDEX_NAME = "grq_1_l2_cslc_s1_blocked_downloads"

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -270,6 +272,65 @@ def parse_cslc_native_id(native_id, burst_to_frames, frame_to_bursts):

return burst_id, acquisition_dts, acquisition_cycles, frame_ids

def compressed_cslc_satisfied(frame_id, day_index, k, m, args, frame_to_bursts, eu):

if get_dependent_compressed_cslcs(frame_id, day_index, k, m, args, frame_to_bursts, eu) == False:
return False
return True

def get_dependent_compressed_cslcs(frame_id, day_index, k, m, args, disp_burst_map, eu):
''' Search for all previous M compressed CSLCs
prev_day_indices: The acquisition cycle indices of all collects that show up in disp_burst_map previous of
the latest acq cycle index
'''

prev_day_indices = get_prev_day_indices(day_index, frame_id, disp_burst_map, args, None, None,
None)

ccslcs = []

#special case for early sensing time series
if len(prev_day_indices) < k * (m-1):
m = (len(prev_day_indices) // k ) + 1

# Uses ccslc_m_index field which looks like T100-213459-IW3_417 (burst_id_acquisition-cycle-index)
for mm in range(0, m - 1): # m parameter is inclusive of the current frame at hand
for burst_id in disp_burst_map[frame_id].burst_ids:
ccslc_m_index = get_dependent_ccslc_index(prev_day_indices, mm, k, burst_id)
ccslcs = eu.query(
index=_C_CSLC_ES_INDEX_PATTERNS,
body={"query": {"bool": {"must": [
{"term": {"metadata.ccslc_m_index.keyword": ccslc_m_index}}]}}})

# Should have exactly one compressed cslc per acq cycle per burst
if len(ccslcs) != 1:
logger.info("Compressed CSLCs for ccslc_m_index: %s was not found in GRQ ES", ccslc_m_index)
return False

logger.info("All Compresseed CSLSs for frame %s at day index %s found in GRQ ES", frame_id, day_index)
return ccslcs

def save_blocked_download_job(eu, product_type, params, job_queue, job_name, frame_id, acq_index, k, m):
"""Save the blocked download job in the ES index"""

eu.index_document(
index=_BLOCKED_CSLC_DOWNLOADS_ES_INDEX_NAME,
id = job_name,
body= {
"doc":{
"job_name": job_name,
"job_queue": job_queue,
"job_params": params,
"job_ts": datetime.now().isoformat(timespec="seconds").replace("+00:00", "Z"),
"product_type": product_type,
"frame_id": frame_id,
"acq_index": acq_index,
"k": k,
"m": m
}
}
)

def parse_cslc_burst_id(native_id):

burst_id, _ = parse_cslc_file_name(native_id)
Expand Down
18 changes: 15 additions & 3 deletions data_subscriber/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
download_from_s3)
from data_subscriber.hls.hls_catalog import HLSProductCatalog
from data_subscriber.rtc.rtc_download_job_submitter import submit_rtc_download_job_submissions_tasks
from data_subscriber.cslc_utils import split_download_batch_id
from data_subscriber.cslc_utils import split_download_batch_id, compressed_cslc_satisfied, save_blocked_download_job
from data_subscriber.url import form_batch_id, _slc_url_to_chunk_id
from hysds_commons.job_utils import submit_mozart_job
from util.conf_util import SettingsConf
Expand Down Expand Up @@ -254,17 +254,30 @@ def submit_download_job_submissions_tasks(self, batch_id_to_urls_map, query_time
logger.info(f"{payload_hash=}")
logger.debug(f"{chunk_urls=}")

params = self.create_download_job_params(query_timerange, chunk_batch_ids)

product_type = COLLECTION_TO_PRODUCT_TYPE_MAP[self.args.collection].lower()
if COLLECTION_TO_PRODUCT_TYPE_MAP[self.args.collection] == ProductType.CSLC:
frame_id = split_download_batch_id(chunk_batch_ids[0])[0]
acq_indices = [split_download_batch_id(chunk_batch_id)[1] for chunk_batch_id in chunk_batch_ids]
job_name = f"job-WF-{product_type}_download-frame-{frame_id}-acq_indices-{min(acq_indices)}-to-{max(acq_indices)}"

# See if all the compressed cslcs are satisfied. If not, do not submit the job. Instead, save all the job info in ES
# and wait for the next query to come in. Any acquisition index will work because all batches
# require the same compressed cslcs
if not compressed_cslc_satisfied(frame_id, acq_indices[0], self.args.k, self.args.m, self.args,
self.disp_burst_map_hist, self.es_conn.es):
logger.info(f"Not all compressed CSLCs are satisfied so this download job is blocked until they are satisfied")
save_blocked_download_job(self.es_conn.es, product_type, params, self.args.job_queue, job_name,
frame_id, acq_indices[0], self.args.k, self.args.m)
continue

else:
job_name = f"job-WF-{product_type}_download-{chunk_batch_ids[0]}"

download_job_id = submit_download_job(release_version=self.settings["RELEASE_VERSION"],
product_type=product_type,
params=self.create_download_job_params(query_timerange, chunk_batch_ids),
params=params,
job_queue=self.args.job_queue,
job_name = job_name,
payload_hash = payload_hash
Expand All @@ -278,7 +291,6 @@ def submit_download_job_submissions_tasks(self, batch_id_to_urls_map, query_time

return job_submission_tasks


def create_download_job_params(self, query_timerange, chunk_batch_ids):
args = self.args
download_job_params = [
Expand Down
14 changes: 7 additions & 7 deletions tests/tools/test_run_disp_s1_historical_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def test_form_job_params_basic():
p = generate_p()
p.frame_states = generate_initial_frame_states(p.frames)
do_submit, job_name, job_spec, job_params, job_tags, next_frame_sensing_position, finished = \
form_job_params(p, 831, 0)
form_job_params(p, 831, 0, None, None)

assert do_submit == True
assert job_name == "data-subscriber-query-timer-historical1_f831-2017-02-15T22:35:24-2017-03-23T23:35:24"
Expand All @@ -72,7 +72,7 @@ def test_form_job_params_basic():
assert job_params["exclude_regions"] == f'--exclude-regions={EXCLUDE_REGIONS}'
assert job_params["frame_id"] == f'--frame-id=831'
assert job_params["k"] == f'--k=4'
assert job_params["m"] == f'--m=2'
assert job_params["m"] == f'--m=1'

assert next_frame_sensing_position == 4
assert finished == False
Expand All @@ -84,7 +84,7 @@ def test_form_job_params_early():
p.frame_states = generate_initial_frame_states(p.frames)
p.data_start_date = '2018-07-01T00:00:00'
do_submit, job_name, job_spec, job_params, job_tags, next_frame_sensing_position, finished = \
form_job_params(p, 831, 0)
form_job_params(p, 831, 0, None, None)

assert next_frame_sensing_position == 4
assert do_submit == False
Expand All @@ -97,7 +97,7 @@ def test_form_job_params_late():
p.frame_states = generate_initial_frame_states(p.frames)
p.data_end_date = '2015-07-01T00:00:00'
do_submit, job_name, job_spec, job_params, job_tags, next_frame_sensing_position, finished = \
form_job_params(p, 831, 0)
form_job_params(p, 831, 0, None, None)

assert do_submit == False
assert finished == True
Expand All @@ -106,14 +106,14 @@ def test_form_job_params_no_ccslc(monkeypatch):
'''If compressed cslcs are not found, don't process this round and don't increment the position'''

mock_ccslc = MagicMock(return_value=False)
monkeypatch.setattr(tools.run_disp_s1_historical_processing,
tools.run_disp_s1_historical_processing.compressed_cslc_satisfied.__name__, mock_ccslc)
monkeypatch.setattr(cslc_utils,
cslc_utils.compressed_cslc_satisfied.__name__, mock_ccslc)

p = generate_p()
p.frame_states = generate_initial_frame_states(p.frames)
p.data_end_date = '2015-07-01T00:00:00'
do_submit, job_name, job_spec, job_params, job_tags, next_frame_sensing_position, finished = \
form_job_params(p, 831, 0)
form_job_params(p, 831, 0, None, None)

assert do_submit == False
assert next_frame_sensing_position == 0
41 changes: 7 additions & 34 deletions tools/run_disp_s1_historical_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@

disp_burst_map, burst_to_frames, day_indices_to_frames = cslc_utils.localize_disp_frame_burst_hist(cslc_utils.DISP_FRAME_BURST_MAP_HIST)

def proc_once(eu, procs, dryrun = False):
def proc_once(eu, procs, args):
dryrun = args.dry_run
job_success = True

for proc in procs:
Expand Down Expand Up @@ -73,7 +74,7 @@ def proc_once(eu, procs, dryrun = False):

# Compute job parameters, whether to process or not, and if we're finished
do_submit, job_name, job_spec, job_params, job_tags, next_frame_pos, finished = \
form_job_params(p, int(frame_id), last_frame_processed)
form_job_params(p, int(frame_id), last_frame_processed, args, eu)

proc_finished = proc_finished & finished # All frames must be finished for this batch proc to be finished

Expand Down Expand Up @@ -121,7 +122,7 @@ def proc_once(eu, procs, dryrun = False):

return job_success

def form_job_params(p, frame_id, sensing_time_position_zero_based):
def form_job_params(p, frame_id, sensing_time_position_zero_based, args, eu):

data_start_date = datetime.strptime(p.data_start_date, ES_DATETIME_FORMAT)
data_end_date = datetime.strptime(p.data_end_date, ES_DATETIME_FORMAT)
Expand Down Expand Up @@ -181,9 +182,9 @@ def form_job_params(p, frame_id, sensing_time_position_zero_based):

#Query GRQ ES for the previous sensing time day index compressed cslc. If this doesn't exist, we can't process
# this frame sensing time yet. So we will not submit job and increment next_sensing_time_position
if compressed_cslc_satisfied(frame_id,
if cslc_utils.compressed_cslc_satisfied(frame_id,
disp_burst_map[frame_id].sensing_datetime_days_index[sensing_time_position_zero_based],
p.k, p.m):
p.k, p.m, args, disp_burst_map, eu):
next_sensing_time_position = sensing_time_position_zero_based + p.k
else:
do_submit = False
Expand Down Expand Up @@ -310,34 +311,6 @@ def generate_initial_frame_states(frames):

return frame_states

def compressed_cslc_satisfied(frame_id, day_index, k, m):
'''Look for the compressed cslc records needed to process this frame at day index in GRQ ES'''

#TODO: This code is mostly identical to asf_cslc_download.py lines circa 200. Refactor into one place
prev_day_indices = cslc_utils.get_prev_day_indices(day_index, frame_id, disp_burst_map, args, None, None,
None)

#special case for early sensing time series
if len(prev_day_indices) < k * (m-1):
m = (len(prev_day_indices) // k ) + 1

_C_CSLC_ES_INDEX_PATTERNS = "grq_1_l2_cslc_s1_compressed*"
for mm in range(0, m - 1): # m parameter is inclusive of the current frame at hand
for burst_id in disp_burst_map[frame_id].burst_ids:
ccslc_m_index = cslc_utils.get_dependent_ccslc_index(prev_day_indices, mm, k, burst_id)
ccslcs = eu.query(
index=_C_CSLC_ES_INDEX_PATTERNS,
body={"query": {"bool": {"must": [
{"term": {"metadata.ccslc_m_index.keyword": ccslc_m_index}}]}}})

# Should have exactly one compressed cslc per acq cycle per burst
if len(ccslcs) != 1:
logger.info("Compressed CSLCs for ccslc_m_index: %s was not found in GRQ ES", ccslc_m_index)
return False

logger.info("All Compresseed CSLSs for frame %s at day index %s found in GRQ ES", frame_id, day_index)
return True

def convert_datetime(datetime_obj, strformat=DATETIME_FORMAT):
"""
Converts from a datetime string to a datetime object or vice versa
Expand Down Expand Up @@ -377,5 +350,5 @@ def convert_datetime(datetime_obj, strformat=DATETIME_FORMAT):

while (True):
batch_procs = eu.query(index=ES_INDEX) # TODO: query for only enabled docs
proc_once(eu, batch_procs, args.dry_run)
proc_once(eu, batch_procs, args)
time.sleep(int(args.sleep_secs))

0 comments on commit 8965171

Please sign in to comment.