diff --git a/data_subscriber/asf_cslc_download.py b/data_subscriber/asf_cslc_download.py index 5274d661..111037b9 100644 --- a/data_subscriber/asf_cslc_download.py +++ b/data_subscriber/asf_cslc_download.py @@ -12,7 +12,7 @@ from data_subscriber.cmr import Collection from data_subscriber.cslc.cslc_static_catalog import CSLCStaticProductCatalog from data_subscriber.download import SessionWithHeaderRedirection -from data_subscriber.cslc_utils import parse_cslc_burst_id, build_cslc_static_native_ids, determine_k_cycle +from data_subscriber.cslc_utils import parse_cslc_burst_id, build_cslc_static_native_ids, determine_k_cycle, get_dependent_compressed_cslcs from data_subscriber.asf_rtc_download import AsfDaacRtcDownload from data_subscriber.cslc.cslc_static_query import CslcStaticCmrQuery from data_subscriber.url import cslc_unique_id @@ -181,37 +181,15 @@ def run_download(self, args, token, es_conn, netloc, username, password, cmr, jo os.remove(iono_file) # Determine M Compressed CSLCs by querying compressed cslc GRQ ES --------------> - # Uses ccslc_m_index field which looks like T100-213459-IW3_417 (burst_id_acquisition-cycle-index) k, m = es_conn.get_k_and_m(args.batch_ids[0]) logger.info(f"{k=}, {m=}") - ''' Search for all previous M compressed CSLCs - prev_day_indices: The acquisition cycle indices of all collects that show up in disp_burst_map previous of - the latest acq cycle index - ''' - prev_day_indices = get_prev_day_indices(latest_acq_cycle_index, frame_id, self.disp_burst_map, args, token, cmr, settings) - - # special case for early sensing time series. Reduce m if there aren't enough sensing times in the database in the first place - # For example, if k was 4 and m was 3, but there are only 4 previous sensing times in the database, then m should be 2 - if len(prev_day_indices) < k * (m - 1): - m = (len(prev_day_indices) // k) + 1 - - for mm in range(0, m-1): # m parameter is inclusive of the current frame at hand - for burst_id in burst_id_set: - ccslc_m_index = get_dependent_ccslc_index(prev_day_indices, mm, k, burst_id) #looks like t034_071112_iw3_461 - logger.info("Retrieving Compressed CSLCs for ccslc_m_index: %s", ccslc_m_index) - ccslcs = es_conn.es.query( - index=_C_CSLC_ES_INDEX_PATTERNS, - body={"query": { "bool": { "must": [ - {"term": {"metadata.ccslc_m_index.keyword": ccslc_m_index}}]}}}) - - # Should have exactly one compressed cslc per acq cycle per burst - if len(ccslcs) != 1: - raise Exception(f"Expected 1 Compressed CSLC for {ccslc_m_index}, got {len(ccslcs)}") - - for ccslc in ccslcs: - c_cslc_s3paths.extend(ccslc["_source"]["metadata"]["product_s3_paths"]) - # <------------------------- Compressed CSLC look up + ccslcs = get_dependent_compressed_cslcs(frame_id, latest_acq_cycle_index, k, m, args, self.disp_burst_map, es_conn.es) + if ccslcs is False: + raise Exception(f"Failed to get compressed cslc for frame {frame_id} and day index {latest_acq_cycle_index}") + + for ccslc in ccslcs: + c_cslc_s3paths.extend(ccslc["_source"]["metadata"]["product_s3_paths"]) # Look up bounding box for frame bounding_box = get_bounding_box_for_frame(int(frame_id), self.frame_geo_map) diff --git a/data_subscriber/cslc_utils.py b/data_subscriber/cslc_utils.py index 5cfcce84..eaf256b9 100644 --- a/data_subscriber/cslc_utils.py +++ b/data_subscriber/cslc_utils.py @@ -15,6 +15,8 @@ DISP_FRAME_BURST_MAP_HIST = 'opera-disp-s1-consistent-burst-ids-with-datetimes.json' FRAME_GEO_SIMPLE_JSON = 'frame-geometries-simple.geojson' +_C_CSLC_ES_INDEX_PATTERNS = "grq_1_l2_cslc_s1_compressed*" +_BLOCKED_CSLC_DOWNLOADS_ES_INDEX_NAME = "grq_1_l2_cslc_s1_blocked_downloads" logger = logging.getLogger(__name__) @@ -270,6 +272,65 @@ def parse_cslc_native_id(native_id, burst_to_frames, frame_to_bursts): return burst_id, acquisition_dts, acquisition_cycles, frame_ids +def compressed_cslc_satisfied(frame_id, day_index, k, m, args, frame_to_bursts, eu): + + if get_dependent_compressed_cslcs(frame_id, day_index, k, m, args, frame_to_bursts, eu) == False: + return False + return True + +def get_dependent_compressed_cslcs(frame_id, day_index, k, m, args, disp_burst_map, eu): + ''' Search for all previous M compressed CSLCs + prev_day_indices: The acquisition cycle indices of all collects that show up in disp_burst_map previous of + the latest acq cycle index + ''' + + prev_day_indices = get_prev_day_indices(day_index, frame_id, disp_burst_map, args, None, None, + None) + + ccslcs = [] + + #special case for early sensing time series + if len(prev_day_indices) < k * (m-1): + m = (len(prev_day_indices) // k ) + 1 + + # Uses ccslc_m_index field which looks like T100-213459-IW3_417 (burst_id_acquisition-cycle-index) + for mm in range(0, m - 1): # m parameter is inclusive of the current frame at hand + for burst_id in disp_burst_map[frame_id].burst_ids: + ccslc_m_index = get_dependent_ccslc_index(prev_day_indices, mm, k, burst_id) + ccslcs = eu.query( + index=_C_CSLC_ES_INDEX_PATTERNS, + body={"query": {"bool": {"must": [ + {"term": {"metadata.ccslc_m_index.keyword": ccslc_m_index}}]}}}) + + # Should have exactly one compressed cslc per acq cycle per burst + if len(ccslcs) != 1: + logger.info("Compressed CSLCs for ccslc_m_index: %s was not found in GRQ ES", ccslc_m_index) + return False + + logger.info("All Compresseed CSLSs for frame %s at day index %s found in GRQ ES", frame_id, day_index) + return ccslcs + +def save_blocked_download_job(eu, product_type, params, job_queue, job_name, frame_id, acq_index, k, m): + """Save the blocked download job in the ES index""" + + eu.index_document( + index=_BLOCKED_CSLC_DOWNLOADS_ES_INDEX_NAME, + id = job_name, + body= { + "doc":{ + "job_name": job_name, + "job_queue": job_queue, + "job_params": params, + "job_ts": datetime.now().isoformat(timespec="seconds").replace("+00:00", "Z"), + "product_type": product_type, + "frame_id": frame_id, + "acq_index": acq_index, + "k": k, + "m": m + } + } + ) + def parse_cslc_burst_id(native_id): burst_id, _ = parse_cslc_file_name(native_id) diff --git a/data_subscriber/query.py b/data_subscriber/query.py index 0258603e..92947462 100644 --- a/data_subscriber/query.py +++ b/data_subscriber/query.py @@ -19,7 +19,7 @@ download_from_s3) from data_subscriber.hls.hls_catalog import HLSProductCatalog from data_subscriber.rtc.rtc_download_job_submitter import submit_rtc_download_job_submissions_tasks -from data_subscriber.cslc_utils import split_download_batch_id +from data_subscriber.cslc_utils import split_download_batch_id, compressed_cslc_satisfied, save_blocked_download_job from data_subscriber.url import form_batch_id, _slc_url_to_chunk_id from hysds_commons.job_utils import submit_mozart_job from util.conf_util import SettingsConf @@ -254,17 +254,30 @@ def submit_download_job_submissions_tasks(self, batch_id_to_urls_map, query_time logger.info(f"{payload_hash=}") logger.debug(f"{chunk_urls=}") + params = self.create_download_job_params(query_timerange, chunk_batch_ids) + product_type = COLLECTION_TO_PRODUCT_TYPE_MAP[self.args.collection].lower() if COLLECTION_TO_PRODUCT_TYPE_MAP[self.args.collection] == ProductType.CSLC: frame_id = split_download_batch_id(chunk_batch_ids[0])[0] acq_indices = [split_download_batch_id(chunk_batch_id)[1] for chunk_batch_id in chunk_batch_ids] job_name = f"job-WF-{product_type}_download-frame-{frame_id}-acq_indices-{min(acq_indices)}-to-{max(acq_indices)}" + + # See if all the compressed cslcs are satisfied. If not, do not submit the job. Instead, save all the job info in ES + # and wait for the next query to come in. Any acquisition index will work because all batches + # require the same compressed cslcs + if not compressed_cslc_satisfied(frame_id, acq_indices[0], self.args.k, self.args.m, self.args, + self.disp_burst_map_hist, self.es_conn.es): + logger.info(f"Not all compressed CSLCs are satisfied so this download job is blocked until they are satisfied") + save_blocked_download_job(self.es_conn.es, product_type, params, self.args.job_queue, job_name, + frame_id, acq_indices[0], self.args.k, self.args.m) + continue + else: job_name = f"job-WF-{product_type}_download-{chunk_batch_ids[0]}" download_job_id = submit_download_job(release_version=self.settings["RELEASE_VERSION"], product_type=product_type, - params=self.create_download_job_params(query_timerange, chunk_batch_ids), + params=params, job_queue=self.args.job_queue, job_name = job_name, payload_hash = payload_hash @@ -278,7 +291,6 @@ def submit_download_job_submissions_tasks(self, batch_id_to_urls_map, query_time return job_submission_tasks - def create_download_job_params(self, query_timerange, chunk_batch_ids): args = self.args download_job_params = [ diff --git a/tests/tools/test_run_disp_s1_historical_processing.py b/tests/tools/test_run_disp_s1_historical_processing.py index 976c8c49..92b7e56e 100644 --- a/tests/tools/test_run_disp_s1_historical_processing.py +++ b/tests/tools/test_run_disp_s1_historical_processing.py @@ -58,7 +58,7 @@ def test_form_job_params_basic(): p = generate_p() p.frame_states = generate_initial_frame_states(p.frames) do_submit, job_name, job_spec, job_params, job_tags, next_frame_sensing_position, finished = \ - form_job_params(p, 831, 0) + form_job_params(p, 831, 0, None, None) assert do_submit == True assert job_name == "data-subscriber-query-timer-historical1_f831-2017-02-15T22:35:24-2017-03-23T23:35:24" @@ -72,7 +72,7 @@ def test_form_job_params_basic(): assert job_params["exclude_regions"] == f'--exclude-regions={EXCLUDE_REGIONS}' assert job_params["frame_id"] == f'--frame-id=831' assert job_params["k"] == f'--k=4' - assert job_params["m"] == f'--m=2' + assert job_params["m"] == f'--m=1' assert next_frame_sensing_position == 4 assert finished == False @@ -84,7 +84,7 @@ def test_form_job_params_early(): p.frame_states = generate_initial_frame_states(p.frames) p.data_start_date = '2018-07-01T00:00:00' do_submit, job_name, job_spec, job_params, job_tags, next_frame_sensing_position, finished = \ - form_job_params(p, 831, 0) + form_job_params(p, 831, 0, None, None) assert next_frame_sensing_position == 4 assert do_submit == False @@ -97,7 +97,7 @@ def test_form_job_params_late(): p.frame_states = generate_initial_frame_states(p.frames) p.data_end_date = '2015-07-01T00:00:00' do_submit, job_name, job_spec, job_params, job_tags, next_frame_sensing_position, finished = \ - form_job_params(p, 831, 0) + form_job_params(p, 831, 0, None, None) assert do_submit == False assert finished == True @@ -106,14 +106,14 @@ def test_form_job_params_no_ccslc(monkeypatch): '''If compressed cslcs are not found, don't process this round and don't increment the position''' mock_ccslc = MagicMock(return_value=False) - monkeypatch.setattr(tools.run_disp_s1_historical_processing, - tools.run_disp_s1_historical_processing.compressed_cslc_satisfied.__name__, mock_ccslc) + monkeypatch.setattr(cslc_utils, + cslc_utils.compressed_cslc_satisfied.__name__, mock_ccslc) p = generate_p() p.frame_states = generate_initial_frame_states(p.frames) p.data_end_date = '2015-07-01T00:00:00' do_submit, job_name, job_spec, job_params, job_tags, next_frame_sensing_position, finished = \ - form_job_params(p, 831, 0) + form_job_params(p, 831, 0, None, None) assert do_submit == False assert next_frame_sensing_position == 0 \ No newline at end of file diff --git a/tools/run_disp_s1_historical_processing.py b/tools/run_disp_s1_historical_processing.py index 33447a1a..c853a07a 100755 --- a/tools/run_disp_s1_historical_processing.py +++ b/tools/run_disp_s1_historical_processing.py @@ -31,7 +31,8 @@ disp_burst_map, burst_to_frames, day_indices_to_frames = cslc_utils.localize_disp_frame_burst_hist(cslc_utils.DISP_FRAME_BURST_MAP_HIST) -def proc_once(eu, procs, dryrun = False): +def proc_once(eu, procs, args): + dryrun = args.dry_run job_success = True for proc in procs: @@ -73,7 +74,7 @@ def proc_once(eu, procs, dryrun = False): # Compute job parameters, whether to process or not, and if we're finished do_submit, job_name, job_spec, job_params, job_tags, next_frame_pos, finished = \ - form_job_params(p, int(frame_id), last_frame_processed) + form_job_params(p, int(frame_id), last_frame_processed, args, eu) proc_finished = proc_finished & finished # All frames must be finished for this batch proc to be finished @@ -121,7 +122,7 @@ def proc_once(eu, procs, dryrun = False): return job_success -def form_job_params(p, frame_id, sensing_time_position_zero_based): +def form_job_params(p, frame_id, sensing_time_position_zero_based, args, eu): data_start_date = datetime.strptime(p.data_start_date, ES_DATETIME_FORMAT) data_end_date = datetime.strptime(p.data_end_date, ES_DATETIME_FORMAT) @@ -181,9 +182,9 @@ def form_job_params(p, frame_id, sensing_time_position_zero_based): #Query GRQ ES for the previous sensing time day index compressed cslc. If this doesn't exist, we can't process # this frame sensing time yet. So we will not submit job and increment next_sensing_time_position - if compressed_cslc_satisfied(frame_id, + if cslc_utils.compressed_cslc_satisfied(frame_id, disp_burst_map[frame_id].sensing_datetime_days_index[sensing_time_position_zero_based], - p.k, p.m): + p.k, p.m, args, disp_burst_map, eu): next_sensing_time_position = sensing_time_position_zero_based + p.k else: do_submit = False @@ -310,34 +311,6 @@ def generate_initial_frame_states(frames): return frame_states -def compressed_cslc_satisfied(frame_id, day_index, k, m): - '''Look for the compressed cslc records needed to process this frame at day index in GRQ ES''' - - #TODO: This code is mostly identical to asf_cslc_download.py lines circa 200. Refactor into one place - prev_day_indices = cslc_utils.get_prev_day_indices(day_index, frame_id, disp_burst_map, args, None, None, - None) - - #special case for early sensing time series - if len(prev_day_indices) < k * (m-1): - m = (len(prev_day_indices) // k ) + 1 - - _C_CSLC_ES_INDEX_PATTERNS = "grq_1_l2_cslc_s1_compressed*" - for mm in range(0, m - 1): # m parameter is inclusive of the current frame at hand - for burst_id in disp_burst_map[frame_id].burst_ids: - ccslc_m_index = cslc_utils.get_dependent_ccslc_index(prev_day_indices, mm, k, burst_id) - ccslcs = eu.query( - index=_C_CSLC_ES_INDEX_PATTERNS, - body={"query": {"bool": {"must": [ - {"term": {"metadata.ccslc_m_index.keyword": ccslc_m_index}}]}}}) - - # Should have exactly one compressed cslc per acq cycle per burst - if len(ccslcs) != 1: - logger.info("Compressed CSLCs for ccslc_m_index: %s was not found in GRQ ES", ccslc_m_index) - return False - - logger.info("All Compresseed CSLSs for frame %s at day index %s found in GRQ ES", frame_id, day_index) - return True - def convert_datetime(datetime_obj, strformat=DATETIME_FORMAT): """ Converts from a datetime string to a datetime object or vice versa @@ -377,5 +350,5 @@ def convert_datetime(datetime_obj, strformat=DATETIME_FORMAT): while (True): batch_procs = eu.query(index=ES_INDEX) # TODO: query for only enabled docs - proc_once(eu, batch_procs, args.dry_run) + proc_once(eu, batch_procs, args) time.sleep(int(args.sleep_secs)) \ No newline at end of file