Skip to content

Commit

Permalink
fix(sat-etl): Reduce EUMDAC throttling messages (#131)
Browse files Browse the repository at this point in the history
* fix(sat-etl): Reduce EUMDAC throttling messages

* fix(sat-etl): More descriptive logging
  • Loading branch information
devsjc authored Sep 17, 2024
1 parent ba8b310 commit 17187c9
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions containers/sat/download_process_sat.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@
for logger in [
"cfgrib",
"charset_normalizer",
"eumdac",
"eumdac", # If you want to know about throttling, set this to WARNING
"native_msg",
"pyorbital",
"pyresample",
"requests",
"satpy",
"urllib3",
]:
logging.getLogger(logger).setLevel(logging.WARNING)
logging.getLogger(logger).setLevel(logging.ERROR)

log = logging.getLogger("sat-etl")

Expand Down Expand Up @@ -197,13 +197,15 @@ def download_scans(
log.error(f"Error finding products: {e}")
return []

log.debug(f"Found {len(products)} products for {scan_time}")
if len(products) == 0:
log.warning(f"No products found for {scan_time}.")

for product in products:
for entry in list(filter(lambda p: p.endswith(".nat"), product.entries)):
filepath: pathlib.Path = folder / entry
# Prevent downloading existing files
if filepath.exists():
log.debug("Skipping existing file: {filepath}")
files.append(filepath)
continue
# Try download a few times
Expand Down Expand Up @@ -264,9 +266,14 @@ def process_scans(
f: pathlib.Path
for i, f in enumerate(native_files):
try:
# TODO: This method of passing the zarr times to the open function leaves a lot to be desired
# Firstly, if the times are not passed in sorted order then the created 12-dataset chunks
# may have missed times in them. Secondly, determining the time still requires opening and
# converting the file which is probably slow. Better to skip search for files whose times
# are already in the Zarr store in the first place and bypass the entire pipeline.
dataset: xr.Dataset | None = _open_and_scale_data(zarr_times, f.as_posix(), dstype)
except Exception as e:
log.error(f"Exception: {e}")
log.error(f"Error opening/scaling data for file {f}: {e}")
continue
if dataset is not None:
dataset = _preprocess_function(dataset)
Expand Down Expand Up @@ -442,8 +449,11 @@ def _open_and_scale_data(
"""
# The reader is the same for each satellite as the sensor is the same
# * Hence "seviri" in all cases
scene = Scene(filenames={"seviri_l1b_native": [f]})
scene.load([c.variable for c in CHANNELS[dstype]])
try:
scene = Scene(filenames={"seviri_l1b_native": [f]})
scene.load([c.variable for c in CHANNELS[dstype]])
except Exception as e:
raise OSError(f"Error loading scene from file {f}: {e}") from e

try:
da: xr.DataArray = _convert_scene_to_dataarray(
Expand Down Expand Up @@ -653,7 +663,7 @@ def run(args: argparse.Namespace) -> None:
# Process the HRV and non-HRV data concurrently if possible
completed_types: list[str] = []
for t in ["hrv", "nonhrv"]:
log.info("Processing {t} data.")
log.info(f"Processing {t} data.")
completed_type = process_scans(sat_config, folder, start, end, t)
completed_types.append(completed_type)
for completed_type in completed_types:
Expand Down

0 comments on commit 17187c9

Please sign in to comment.