Skip to content

Commit

Permalink
fix(gfs-etl): Use mounted directory for work
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Jun 12, 2024
1 parent 7780b94 commit 151a375
Showing 1 changed file with 21 additions and 18 deletions.
39 changes: 21 additions & 18 deletions containers/gfs/download_combine_gfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,32 +212,35 @@ def run(path: str, config: Config, date: dt.date) -> str:

run_datasets: list[str] = []
for hour in ["00", "06", "12", "18"]:

dataset_paths: list[str] = []
with tempfile.TemporaryDirectory() as tmpdir:
for file in list(glob(f"{path}/{date:%Y%m%d}/{hour}/*{hour}.*.grib2")):
log.debug(f"Converting {file}")
ds_path = convert_file(file=file, outfolder=tmpdir)
if ds_path is not None:
dataset_paths.append(ds_path)
log.debug(f"Converted {len(dataset_paths)} files for {date}:{hour}")

hour_ds_path: str = functools.reduce(_combine_datasets, dataset_paths)
log.debug(f"Combined {len(dataset_paths)} datasets for {date}:{hour}")
run_datasets.append(hour_ds_path)
for file in list(glob(f"{path}/{date:%Y%m%d}/{hour}/*{hour}.*.grib2")):
log.debug(f"Converting {file}")
ds_path = convert_file(file=file, outfolder=path + "/.work")
if ds_path is not None:
dataset_paths.append(ds_path)
log.debug(f"Converted {len(dataset_paths)} files for {date}:{hour}")

hour_ds_path: str = functools.reduce(_combine_datasets, dataset_paths)
log.debug(f"Combined {len(dataset_paths)} datasets for {date}:{hour}")
run_datasets.append(hour_ds_path)

log.info("Combining run datasets and applying compression")
day_ds_path: str = functools.reduce(_combine_datasets, run_datasets)
day_ds: xr.Dataset = xr.open_zarr(day_ds_path)
encoding = {var: {"compressor": Blosc2("zstd", clevel=9)} for var in day_ds.data_vars}
encoding["init_time"] = {"units": "nanoseconds since 1970-01-01"}
outpath = f"{path}/{date:%Y%m%d}/{date:%Y%m%d}.zarr.zip"
with zarr.ZipStore(path=outpath, mode="w") as store:
day_ds.to_zarr(
store,
encoding=encoding,
compute=True,
)
try:
with zarr.ZipStore(path=outpath, mode="w") as store:
day_ds.to_zarr(
store,
encoding=encoding,
compute=True,
)
shutil.rmtree(path + "/.work", ignore_errors=True)
except Exception as e:
log.error(f"Error saving dataset for {date:%Y%m%d}: {e}")
return None

log.info(f"Saved dataset for {date:%Y%m%d} to {outpath}")
shutil.rmtree(day_ds_path, ignore_errors=True)
Expand Down

0 comments on commit 151a375

Please sign in to comment.