Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Satellite archive script #257

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
fix(archive): Cache last processed time
  • Loading branch information
devsjc committed Apr 25, 2024
commit f90b2d28989f23ac69212e80d61c2c798993ef6d
88 changes: 49 additions & 39 deletions scripts/archival_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,30 +103,6 @@ class Channel:
],
}

parser = argparse.ArgumentParser(
prog="EUMETSTAT Pipeline",
description="Downloads and processes data from EUMETSTAT",
)
parser.add_argument(
'sat',
help="Which satellite to download data from",
type=str,
choices=list(CONFIGS.keys()),
)
parser.add_argument(
"--start_date",
help="Date to download from (YYYY-MM-DD)",
type=dt.date.fromisoformat,
required=True,
)
parser.add_argument(
"--end_date",
help="Date to download to (YYYY-MM-DD)",
type=dt.date.fromisoformat,
required=False,
default=str(dt.datetime.utcnow().date()),
)

def download_scans(
sat_config: Config,
scan_times: list[pd.Timestamp],
Expand Down Expand Up @@ -365,17 +341,48 @@ def _rewrite_zarr_times(output_name):
json.dump(data, f)


parser = argparse.ArgumentParser(
prog="EUMETSTAT Pipeline",
description="Downloads and processes data from EUMETSTAT",
)
parser.add_argument(
'sat',
help="Which satellite to download data from",
type=str,
choices=list(CONFIGS.keys()),
)
parser.add_argument(
"--start_date",
help="Date to download from (YYYY-MM-DD)",
type=dt.date.fromisoformat,
required=False,
)
parser.add_argument(
"--end_date",
help="Date to download to (YYYY-MM-DD)",
type=dt.date.fromisoformat,
required=False,
default=str(dt.datetime.utcnow().date()),
)

if __name__ == "__main__":
# Prevent logs interfering with progressbar
logging_redirect_tqdm(loggers=[log])

# Get running args
args = parser.parse_args()
prog_start = dt.datetime.utcnow()
log.info(f"{str(prog_start)}: Running with args: {args}")

# Parse running args
args = parser.parse_args()
# Create a reusable cache
cache = dc.Cache('tmp')
cache = dc.Cache(f'/mnt/disks/sat/.cache/{args.sat}')

if args.start_date is None:
# Try to get the start date from the last cached datetime
try:
args.start_date = dt.date.fromisoformat(cache.get('latest_time'))
except Exception as e:
raise Exception("Can't get last runtime from cache. Pass start_date in manually.")

log.info(f"{str(prog_start)}: Running with args: {args}")

# Get config for desired satellite
sat_config = CONFIGS[args.sat]
Expand All @@ -386,7 +393,7 @@ def _rewrite_zarr_times(output_name):
scan_times: list[pd.Timestamp] = pd.date_range(start=start, end=end, freq=sat_config.cadence).tolist()

# Get average runtime from cache
secs_per_scan = cache.get('secs_per_scan', default=65)
secs_per_scan = cache.get('secs_per_scan', default=90)
expected_runtime = pd.Timedelta(secs_per_scan * len(scan_times), 'seconds')
log.info(f"Downloading {len(scan_times)} scans. Expected runtime: {str(expected_runtime)}")

Expand All @@ -407,14 +414,7 @@ def _rewrite_zarr_times(output_name):
for result in results:
log.info(f"Completed download with {len(result)} failed scan times.")


# Calculate the new average time per timestamp
runtime: dt.timedelta = dt.datetime.utcnow() - prog_start
new_average_secs_per_scan: int = int((secs_per_scan + (runtime.total_seconds() / len(scan_times))) / 2)
cache.set('secs_per_scan', new_average_secs_per_scan)
log.info(f"Completed download for args: {args} in {str(runtime)} (avg {new_average_secs_per_scan} secs per scan)")

log.info("Converting raw data to HRV and non-HRV Zarr Stores")
log.info("Converting raw data to HRV and non-HRV Zarr Stores.")

# Process the HRV and non-HRV data
pool = multiprocessing.Pool()
Expand All @@ -428,4 +428,14 @@ def _rewrite_zarr_times(output_name):
pool.close()
pool.join()
for result in results:
log.info(f"Processed {result} data")
log.info(f"Processed {result} data.")

# Save the last processed time to cache
cache.set('latest_time', end.isoformat())

# Calculate the new average time per timestamp
runtime: dt.timedelta = dt.datetime.utcnow() - prog_start
new_average_secs_per_scan: int = int((secs_per_scan + (runtime.total_seconds() / len(scan_times))) / 2)
cache.set('secs_per_scan', new_average_secs_per_scan)
log.info(f"Completed archive for args: {args}. ({new_average_secs_per_scan} seconds per scan).")

Loading