Skip to content

Commit

Permalink
fix(gfs-etl): Enbale run selection
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Jun 12, 2024
1 parent 151a375 commit 0e8cf86
Showing 1 changed file with 58 additions and 58 deletions.
116 changes: 58 additions & 58 deletions containers/gfs/download_combine_gfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import pathlib
import shutil
import sys
import tempfile
import uuid
from glob import glob
from multiprocessing import Pool, cpu_count
Expand Down Expand Up @@ -169,81 +168,75 @@ def _combine_datasets(dsp1: str, dsp2: str) -> str:
shutil.rmtree(dsp2, ignore_errors=True)
return cdsp.as_posix()

def run(path: str, config: Config, date: dt.date) -> str:
"""Download GFS data, combine, and save for a single date."""
def run(path: str, config: Config, date: dt.date, run: str) -> str:
"""Download GFS data, combine, and save for a single run."""
# Dowload files first
for hour in ["00", "06", "12", "18"]:
if not pathlib.Path(f"{path}/{date:%Y%m%d}/{hour}/").exists():
pathlib.Path(f"{path}/{date:%Y%m%d}/{hour}/").mkdir(parents=True, exist_ok=True)
results: list[str] = []
not_done = True
while not_done:
try:
urls = find_file_names(
it=dt.datetime.combine(date, dt.time(int(hour))),
config=config,
)

# Only paralellize if there are more files than cpus
if len(urls) > cpu_count():
pool = Pool(cpu_count())
results = pool.starmap(
download_url,
[(url, f"{path}/{date:%Y%m%d}/{hour}/") for url in urls],
)
pool.close()
pool.join()
else:
results: list[str] = []
for url in urls:
result = download_url(url, f"{path}/{date:%Y%m%d}/{hour}/")
if result is not None:
results.append(result)

not_done = False
except Exception as e:
log.error(e)
continue
if not pathlib.Path(f"{path}/{date:%Y%m%d}/{run}/").exists():
pathlib.Path(f"{path}/{date:%Y%m%d}/{run}/").mkdir(parents=True, exist_ok=True)
results: list[str] = []
not_done = True
while not_done:
try:
urls = find_file_names(
it=dt.datetime.combine(date, dt.time(int(run))),
config=config,
)

log.info(f"Downloaded {len(results)} files for {date}:{hour}")
# Only paralellize if there are more files than cpus
if len(urls) > cpu_count():
pool = Pool(cpu_count())
results = pool.starmap(
download_url,
[(url, f"{path}/{date:%Y%m%d}/{run}/") for url in urls],
)
pool.close()
pool.join()
else:
results: list[str] = []
for url in urls:
result = download_url(url, f"{path}/{date:%Y%m%d}/{run}/")
if result is not None:
results.append(result)

not_done = False
except Exception as e:
log.error(e)
continue

log.info(f"Downloaded {len(results)} files for {date}:{run}")

# Write files to zarr
log.info("Converting files")

run_datasets: list[str] = []
for hour in ["00", "06", "12", "18"]:
dataset_paths: list[str] = []
for file in list(glob(f"{path}/{date:%Y%m%d}/{hour}/*{hour}.*.grib2")):
log.debug(f"Converting {file}")
ds_path = convert_file(file=file, outfolder=path + "/.work")
if ds_path is not None:
dataset_paths.append(ds_path)
log.debug(f"Converted {len(dataset_paths)} files for {date}:{hour}")
run_files: list[str] = list(glob(f"{path}/{date:%Y%m%d}/{run}/*{run}.*.grib2"))
dataset_paths: list[str] = []
for file in run_files:
log.debug(f"Converting {file}")
ds_path = convert_file(file=file, outfolder=path + "/.work")
if ds_path is not None:
dataset_paths.append(ds_path)
log.debug(f"Converted {len(dataset_paths)} files for {date}:{run}")

hour_ds_path: str = functools.reduce(_combine_datasets, dataset_paths)
log.debug(f"Combined {len(dataset_paths)} datasets for {date}:{hour}")
run_datasets.append(hour_ds_path)
run_ds_path: str = functools.reduce(_combine_datasets, dataset_paths)
log.debug(f"Combined {len(dataset_paths)} datasets for {date}:{run}")

log.info("Combining run datasets and applying compression")
day_ds_path: str = functools.reduce(_combine_datasets, run_datasets)
day_ds: xr.Dataset = xr.open_zarr(day_ds_path)
encoding = {var: {"compressor": Blosc2("zstd", clevel=9)} for var in day_ds.data_vars}
run_ds: xr.Dataset = xr.open_zarr(run_ds_path)
encoding = {var: {"compressor": Blosc2("zstd", clevel=9)} for var in run_ds.data_vars}
encoding["init_time"] = {"units": "nanoseconds since 1970-01-01"}
outpath = f"{path}/{date:%Y%m%d}/{date:%Y%m%d}.zarr.zip"
outpath = f"{path}/{date:%Y%m%d}/{date:%Y%m%d}{run}.zarr.zip"
try:
with zarr.ZipStore(path=outpath, mode="w") as store:
day_ds.to_zarr(
run_ds.to_zarr(
store,
encoding=encoding,
compute=True,
)
log.info(f"Saved dataset for {date:%Y%m%d}{run} to {outpath}")
shutil.rmtree(path + "/.work", ignore_errors=True)
except Exception as e:
log.error(f"Error saving dataset for {date:%Y%m%d}: {e}")
return None
log.error(f"Error saving dataset for {date:%Y%m%d}{run}: {e}")

log.info(f"Saved dataset for {date:%Y%m%d} to {outpath}")
shutil.rmtree(day_ds_path, ignore_errors=True)
return outpath

if __name__ == "__main__":
Expand All @@ -265,10 +258,17 @@ def run(path: str, config: Config, date: dt.date) -> str:
default=str(dt.datetime.now(tz=dt.UTC).date()),
help="Date to download data for (YYYY-MM-DD)",
)
parser.add_argument(
"--run",
type=str,
default="00",
choices=["00", "06", "12", "18"],
help="Run to download data for (HH)",
)

args = parser.parse_args()
log.info(f"{prog_start!s}: Running with args: {args}")
out = run(path=args.path, config=DEFAULT_CONFIG, date=args.date)
out = run(path=args.path, config=DEFAULT_CONFIG, date=args.date, run=args.run)
ds = xr.open_zarr(out)
prog_end = dt.datetime.now(tz=dt.UTC)

Expand Down

0 comments on commit 0e8cf86

Please sign in to comment.