From 9f8c320d91faaad0805fb19ae3036f58ba9e8ca9 Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Tue, 1 Oct 2024 09:29:27 +0100 Subject: [PATCH] fix(sat-etl): Only process native files in date range --- containers/sat/download_process_sat.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/containers/sat/download_process_sat.py b/containers/sat/download_process_sat.py index d4ba525..d618b60 100644 --- a/containers/sat/download_process_sat.py +++ b/containers/sat/download_process_sat.py @@ -228,6 +228,14 @@ def download_scans( return files +def _fname_to_scantime(fname: str) -> dt.datetime: + """Converts a filename to a datetime. + + Files are of the form: + `MSG2-SEVI-MSG15-0100-NA-20230910221240.874000000Z-NA.nat` + So determine the time from the first element split by '.'. + """ + return dt.datetime.strptime(fname.split(".")[0][-14:], "%Y%m%d%H%M%S") def process_scans( sat_config: Config, @@ -256,15 +264,16 @@ def process_scans( # Get native files in order native_files: list[pathlib.Path] = list(folder.glob("*.nat")) - log.info(f"Found {len(native_files)} native files at {folder.as_posix()}") native_files.sort() + wanted_files = [f for f in native_files if start <= _fname_to_scantime(f.name).date() < end] + log.info(f"Found {len(wanted_files)} native files within date range at {folder.as_posix()}") # Convert native files to xarray datasets # * Append to the monthly zarr in hourly chunks datasets: list[xr.Dataset] = [] i: int f: pathlib.Path - for i, f in enumerate(native_files): + for i, f in enumerate(wanted_files): try: # TODO: This method of passing the zarr times to the open function leaves a lot to be desired # Firstly, if the times are not passed in sorted order then the created 12-dataset chunks @@ -301,7 +310,7 @@ def process_scans( ) datasets = [] - log.info(f"Process loop [{dstype}]: {i+1}/{len(native_files)}") + log.info(f"Process loop [{dstype}]: {i+1}/{len(wanted_files)}") # Consolidate zarr metadata if pathlib.Path(zarr_path).exists(): @@ -635,7 +644,10 @@ def run(args: argparse.Namespace) -> None: # Estimate average runtime secs_per_scan: int = 90 expected_runtime = pd.Timedelta(secs_per_scan * len(scan_times), "seconds") - log.info(f"Downloading {len(scan_times)} scans. Expected runtime: {expected_runtime!s}") + log.info( + f"Downloading {len(scan_times)} scans ({start} - {end}). " + f"Expected runtime: {expected_runtime!s}" + ) # Download data # We only parallelize if we have a number of files larger than the cpu count