From 17187c904bf888bad3ee87958df79ddad9ffd354 Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Tue, 17 Sep 2024 11:00:44 +0100 Subject: [PATCH] fix(sat-etl): Reduce EUMDAC throttling messages (#131) * fix(sat-etl): Reduce EUMDAC throttling messages * fix(sat-etl): More descriptive logging --- containers/sat/download_process_sat.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/containers/sat/download_process_sat.py b/containers/sat/download_process_sat.py index 439c8b4..d4ba525 100644 --- a/containers/sat/download_process_sat.py +++ b/containers/sat/download_process_sat.py @@ -60,7 +60,7 @@ for logger in [ "cfgrib", "charset_normalizer", - "eumdac", + "eumdac", # If you want to know about throttling, set this to WARNING "native_msg", "pyorbital", "pyresample", @@ -68,7 +68,7 @@ "satpy", "urllib3", ]: - logging.getLogger(logger).setLevel(logging.WARNING) + logging.getLogger(logger).setLevel(logging.ERROR) log = logging.getLogger("sat-etl") @@ -197,13 +197,15 @@ def download_scans( log.error(f"Error finding products: {e}") return [] - log.debug(f"Found {len(products)} products for {scan_time}") + if len(products) == 0: + log.warning(f"No products found for {scan_time}.") for product in products: for entry in list(filter(lambda p: p.endswith(".nat"), product.entries)): filepath: pathlib.Path = folder / entry # Prevent downloading existing files if filepath.exists(): + log.debug("Skipping existing file: {filepath}") files.append(filepath) continue # Try download a few times @@ -264,9 +266,14 @@ def process_scans( f: pathlib.Path for i, f in enumerate(native_files): try: + # TODO: This method of passing the zarr times to the open function leaves a lot to be desired + # Firstly, if the times are not passed in sorted order then the created 12-dataset chunks + # may have missed times in them. Secondly, determining the time still requires opening and + # converting the file which is probably slow. Better to skip search for files whose times + # are already in the Zarr store in the first place and bypass the entire pipeline. dataset: xr.Dataset | None = _open_and_scale_data(zarr_times, f.as_posix(), dstype) except Exception as e: - log.error(f"Exception: {e}") + log.error(f"Error opening/scaling data for file {f}: {e}") continue if dataset is not None: dataset = _preprocess_function(dataset) @@ -442,8 +449,11 @@ def _open_and_scale_data( """ # The reader is the same for each satellite as the sensor is the same # * Hence "seviri" in all cases - scene = Scene(filenames={"seviri_l1b_native": [f]}) - scene.load([c.variable for c in CHANNELS[dstype]]) + try: + scene = Scene(filenames={"seviri_l1b_native": [f]}) + scene.load([c.variable for c in CHANNELS[dstype]]) + except Exception as e: + raise OSError(f"Error loading scene from file {f}: {e}") from e try: da: xr.DataArray = _convert_scene_to_dataarray( @@ -653,7 +663,7 @@ def run(args: argparse.Namespace) -> None: # Process the HRV and non-HRV data concurrently if possible completed_types: list[str] = [] for t in ["hrv", "nonhrv"]: - log.info("Processing {t} data.") + log.info(f"Processing {t} data.") completed_type = process_scans(sat_config, folder, start, end, t) completed_types.append(completed_type) for completed_type in completed_types: