Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sat-etl): Only process native files in date range #136

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions containers/sat/download_process_sat.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,14 @@ def download_scans(

return files

def _fname_to_scantime(fname: str) -> dt.datetime:
"""Converts a filename to a datetime.

Files are of the form:
`MSG2-SEVI-MSG15-0100-NA-20230910221240.874000000Z-NA.nat`
So determine the time from the first element split by '.'.
"""
return dt.datetime.strptime(fname.split(".")[0][-14:], "%Y%m%d%H%M%S")

def process_scans(
sat_config: Config,
Expand Down Expand Up @@ -256,15 +264,16 @@ def process_scans(

# Get native files in order
native_files: list[pathlib.Path] = list(folder.glob("*.nat"))
log.info(f"Found {len(native_files)} native files at {folder.as_posix()}")
native_files.sort()
wanted_files = [f for f in native_files if start <= _fname_to_scantime(f.name).date() < end]
log.info(f"Found {len(wanted_files)} native files within date range at {folder.as_posix()}")

# Convert native files to xarray datasets
# * Append to the monthly zarr in hourly chunks
datasets: list[xr.Dataset] = []
i: int
f: pathlib.Path
for i, f in enumerate(native_files):
for i, f in enumerate(wanted_files):
try:
# TODO: This method of passing the zarr times to the open function leaves a lot to be desired
# Firstly, if the times are not passed in sorted order then the created 12-dataset chunks
Expand Down Expand Up @@ -301,7 +310,7 @@ def process_scans(
)
datasets = []

log.info(f"Process loop [{dstype}]: {i+1}/{len(native_files)}")
log.info(f"Process loop [{dstype}]: {i+1}/{len(wanted_files)}")

# Consolidate zarr metadata
if pathlib.Path(zarr_path).exists():
Expand Down Expand Up @@ -635,7 +644,10 @@ def run(args: argparse.Namespace) -> None:
# Estimate average runtime
secs_per_scan: int = 90
expected_runtime = pd.Timedelta(secs_per_scan * len(scan_times), "seconds")
log.info(f"Downloading {len(scan_times)} scans. Expected runtime: {expected_runtime!s}")
log.info(
f"Downloading {len(scan_times)} scans ({start} - {end}). "
f"Expected runtime: {expected_runtime!s}"
)

# Download data
# We only parallelize if we have a number of files larger than the cpu count
Expand Down
Loading