diff --git a/data_subscriber/asf_cslc_download.py b/data_subscriber/asf_cslc_download.py index 5c4495e7..8974b789 100644 --- a/data_subscriber/asf_cslc_download.py +++ b/data_subscriber/asf_cslc_download.py @@ -288,19 +288,30 @@ def get_downloads(self, args, es_conn): all_downloads = [] - # Download CSLC granules - downloads = es_conn.get_download_granule_revision(batch_ids[-1]) - logger.info(f"Got {len(downloads)=} cslc granules downloads for batch_id={batch_ids[-1]}") - assert len(downloads) > 0, f"No downloads found for batch_id={batch_ids[-1]}!" - all_downloads.extend(downloads) - - # Download K-CSLC granules - for batch_id in batch_ids[:-1]: - downloads = k_es_conn.get_download_granule_revision(batch_id) - logger.info(f"Got {len(downloads)=} k cslc downloads for {batch_id=}") - assert len(downloads) > 0, f"No downloads found for batch_id={batch_id}!" + # Historical mode stores all granules in normal cslc_catalog + if "proc_mode" in args and args.proc_mode == "historical": + logger.info("Downloading cslc files for historical mode") + for batch_id in batch_ids: + downloads = es_conn.get_download_granule_revision(batch_id) + logger.info(f"Got {len(downloads)=} cslc downloads for {batch_id=}") + assert len(downloads) > 0, f"No downloads found for batch_id={batch_id}!" + all_downloads.extend(downloads) + + # Forward and reprocessing modes store all granules in k_cslc_catalog + else: + logger.info("Downloading cslc files for forward/reprocessing mode") + downloads = es_conn.get_download_granule_revision(batch_ids[-1]) + logger.info(f"Got {len(downloads)=} cslc granules downloads for batch_id={batch_ids[-1]}") + assert len(downloads) > 0, f"No downloads found for batch_id={batch_ids[-1]}!" all_downloads.extend(downloads) + # Download K-CSLC granules + for batch_id in batch_ids[:-1]: + downloads = k_es_conn.get_download_granule_revision(batch_id) + logger.info(f"Got {len(downloads)=} k cslc downloads for {batch_id=}") + assert len(downloads) > 0, f"No downloads found for batch_id={batch_id}!" + all_downloads.extend(downloads) + return all_downloads def query_cslc_static_files_for_cslc_batch(self, cslc_files, args, token, job_id, settings): diff --git a/data_subscriber/cslc/cslc_query.py b/data_subscriber/cslc/cslc_query.py index c31e4e29..7292c3e9 100644 --- a/data_subscriber/cslc/cslc_query.py +++ b/data_subscriber/cslc/cslc_query.py @@ -437,7 +437,6 @@ def query_cmr(self, args, token, cmr, settings, timerange, now): def create_download_job_params(self, query_timerange, chunk_batch_ids): '''Same as base class except inject batch_ids for k granules''' - assert len(chunk_batch_ids) == 1 chunk_batch_ids.extend(list(self.k_batch_ids[chunk_batch_ids[0]])) return super().create_download_job_params(query_timerange, chunk_batch_ids) @@ -478,16 +477,25 @@ def get_download_chunks(self, batch_id_to_urls_map): '''For CSLC chunks we must group them by the batch_id that were determined at the time of triggering''' chunk_map = defaultdict(list) + if len(list(batch_id_to_urls_map)) == 0: + return chunk_map.values() + + frame_id, _ = split_download_batch_id(list(batch_id_to_urls_map)[0]) + for batch_chunk in batch_id_to_urls_map.items(): - chunk_map[batch_chunk[0]].append(batch_chunk) # We don't actually care about the URLs, we only care about the batch_id - - '''indices = self.download_batch_ids[batch_chunk[0]] - for index in indices: - chunk_map[index].append(batch_chunk) - if (len(chunk_map[index]) > self.args.k): - logger.error([chunk for chunk, data in chunk_map[index]]) - err_str = f"Number of download batches {len(chunk_map[index])} for frame {index} is greater than K {self.args.k}." - raise AssertionError(err_str)''' + + # Chunking is done differently between historical and forward/reprocessing + if self.proc_mode == "historical": + chunk_map[frame_id].append(batch_chunk) + else: + chunk_map[batch_chunk[0]].append( + batch_chunk) # We don't actually care about the URLs, we only care about the batch_id + + if self.proc_mode == "historical": + if (len(chunk_map[frame_id]) != self.args.k): + logger.error([chunk for chunk, data in chunk_map[frame_id]]) + err_str = f"Number of download batches {len(chunk_map[frame_id])} for frame {frame_id} does not equal K {self.args.k}." + raise AssertionError(err_str) return chunk_map.values()