Skip to content

Commit

Permalink
More fixes for IODC downloading (#9)
Browse files Browse the repository at this point in the history
* Update filtering

* Set to specific assets

* More updates to configuration
  • Loading branch information
jacobbieker authored Sep 12, 2023
1 parent 41c0611 commit c32c28b
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
12 changes: 10 additions & 2 deletions sat/assets/eumetsat/common.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""
EO:EUM:DAT:MSG:HRSEVIRI-IODC
"""
from satip.eumetsat import DownloadManager
from satip.eumetsat import DownloadManager, eumetsat_filename_to_datetime
from satip.utils import filter_dataset_ids_on_current_files
import pandas as pd
import os

from dagster import Config

Expand All @@ -22,12 +23,19 @@ def download_product_range(api_key: str, api_secret: str, data_dir: str, product
date_range = pd.date_range(start=start_str,
end=end_str,
freq="30min")
filenames_downloaded = []
for filename in os.listdir(data_dir):
filenames_downloaded.append(filename.split("/")[-1])
for date in date_range:
start_date = pd.Timestamp(date) - pd.Timedelta("1min")
end_date = pd.Timestamp(date) + pd.Timedelta("1min")
datasets = download_manager.identify_available_datasets(
start_date=start_date.tz_localize(None).strftime("%Y-%m-%d-%H-%M-%S"),
end_date=end_date.tz_localize(None).strftime("%Y-%m-%d-%H-%M-%S"),
)
datasets = filter_dataset_ids_on_current_files(datasets, data_dir)
filtered_datasets = []
for dataset in datasets:
if dataset["id"] not in filenames_downloaded:
filtered_datasets.append(dataset)
datasets = filtered_datasets
download_manager.download_datasets(datasets, product_id=product_id)
17 changes: 10 additions & 7 deletions sat/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@
from dagster import AssetSelection, define_asset_job, EnvVar

from sat.assets.eumetsat.common import EumetsatConfig
from sat.assets import download_eumetsat_iodc_data

base_path = "/mnt/storage_c/IODC/"


config = EumetsatConfig(api_key=EnvVar("EUMETSAT_API_KEY"),
api_secret=EnvVar("EUMETSAT_API_SECRET"),
data_dir=base_path,
start_date="2017-02-01",
end_date=pd.Timestamp().utcnow().strftime('%Y-%m-%d'))

asset_jobs = []
asset_job = define_asset_job(f"download_iodc_raw_files", AssetSelection.all(),
asset_job = define_asset_job(f"download_iodc_raw_files", AssetSelection.assets(download_eumetsat_iodc_data),
config={
'ops': {"download_eumetsat_iodc_data": {"config": EumetsatConfig(api_key=EnvVar("EUMETSAT_API_KEY"),
api_secret=EnvVar("EUMETSAT_API_SECRET"),
data_dir=base_path,
start_date="2017-02-01",
end_date=str(pd.Timestamp().utcnow())).to_fields_dict()},},})
'ops': {"download_eumetsat_iodc_data": {
"config": {"api_key": config.api_key, "api_secret": config.api_secret, "data_dir": config.data_dir,
"start_date": config.start_date, "end_date": config.end_date}}, }, })

asset_jobs.append(asset_job)

0 comments on commit c32c28b

Please sign in to comment.