diff --git a/data_subscriber/asf_download.py b/data_subscriber/asf_download.py deleted file mode 100644 index 569c86ca..00000000 --- a/data_subscriber/asf_download.py +++ /dev/null @@ -1,143 +0,0 @@ -import json -import logging -import os -from datetime import datetime -from pathlib import PurePath, Path - -import requests.utils - -from data_subscriber.download import DaacDownload -from data_subscriber.url import _has_url, _to_urls, _to_https_urls, _slc_url_to_chunk_id, form_batch_id - -logger = logging.getLogger(__name__) - -class DaacDownloadAsf(DaacDownload): - - def __init__(self, provider): - super().__init__(provider) - self.daac_s3_cred_settings_key = "SLC_DOWNLOAD" - - """This is practically an abstract class. You should never instantiate this.""" - def perform_download(self, - session: requests.Session, - es_conn, - downloads: list[dict], - args, - token, - job_id - ): - - for download in downloads: - if not _has_url(download): - continue - - if args.transfer_protocol == "https": - product_url = _to_https_urls(download) - else: - product_url = _to_urls(download) - - logger.info(f"Processing {product_url=}") - product_id = _slc_url_to_chunk_id(product_url, str(download['revision_id'])) - - product_download_dir = self.downloads_dir / product_id - product_download_dir.mkdir(exist_ok=True) - - # download product - if args.dry_run: - logger.info(f"{args.dry_run=}. Skipping download.") - continue - - if product_url.startswith("s3"): - product = product_filepath = self.download_product_using_s3( - product_url, - token, - target_dirpath=product_download_dir.resolve(), - args=args - ) - else: - product = product_filepath = self.download_asf_product( - product_url, token, product_download_dir - ) - - logger.info(f"{product_filepath=}") - - logger.info(f"Marking as downloaded. {download['id']=}") - es_conn.mark_product_as_downloaded(download['id'], job_id) - - logger.info(f"product_url_downloaded={product_url}") - - additional_metadata = {} - try: - additional_metadata['processing_mode'] = download['processing_mode'] - except: - logger.warning("processing_mode not found in the slc_catalog ES index") - - if download.get("intersects_north_america"): - logger.info("adding additional dataset metadata (intersects_north_america)") - additional_metadata["intersects_north_america"] = True - - dataset_dir = self.extract_one_to_one(product, self.cfg, working_dir=Path.cwd(), - extra_metadata=additional_metadata, - name_postscript='-r'+str(download['revision_id'])) - - self.update_pending_dataset_with_index_name(dataset_dir, '-r'+str(download['revision_id'])) - - # Rename the dataset_dir to match the pattern w revision_id - new_dataset_dir = dataset_dir.parent / form_batch_id(dataset_dir.name, str(download['revision_id'])) - logger.info(f"{new_dataset_dir}") - os.rename(str(dataset_dir), str(new_dataset_dir)) - - self.download_orbit_file(new_dataset_dir, product_filepath, additional_metadata) - - if (additional_metadata['processing_mode'] in ("historical", "reprocessing")): - logger.info( - f"Processing mode is {additional_metadata['processing_mode']}. " - f"Attempting to download ionosphere correction file." - ) - self.download_ionosphere_file(new_dataset_dir, product_filepath) - - logger.info(f"Removing {product_filepath}") - product_filepath.unlink(missing_ok=True) - - def download_orbit_file(self, dataset_dir, product_filepath, additional_metadata): - pass - - def download_ionosphere_file(self, dataset_dir, product_filepath): - pass - - def download_asf_product(self, product_url, token: str, target_dirpath: Path): - logger.info(f"Requesting from {product_url}") - - asf_response = self._handle_url_redirect(product_url, token) - asf_response.raise_for_status() - - product_filename = PurePath(product_url).name - product_download_path = target_dirpath / product_filename - with open(product_download_path, "wb") as file: - file.write(asf_response.content) - return product_download_path.resolve() - - def update_pending_dataset_metadata_with_ionosphere_metadata(self, dataset_dir: PurePath, ionosphere_metadata: dict): - pass - - def update_pending_dataset_with_index_name(self, dataset_dir: PurePath, postscript): - logger.info("Updating dataset's dataset.json with index name") - - with Path(dataset_dir / f"{dataset_dir.name}{postscript}.dataset.json").open("r") as fp: - dataset_json: dict = json.load(fp) - - with Path(dataset_dir / f"{dataset_dir.name}{postscript}.met.json").open("r") as fp: - met_dict: dict = json.load(fp) - - dataset_json.update({ - "index": { - "suffix": ("{version}_{dataset}-{date}".format( - version=dataset_json["version"], - dataset=met_dict["ProductType"], - date=datetime.utcnow().strftime("%Y.%m") - )).lower() # suffix index name with `-YYYY.MM - } - }) - - with Path(dataset_dir / f"{dataset_dir.name}.dataset.json").open("w") as fp: - json.dump(dataset_json, fp) \ No newline at end of file diff --git a/data_subscriber/asf_slc_download.py b/data_subscriber/asf_slc_download.py index ffe7415e..1c008926 100644 --- a/data_subscriber/asf_slc_download.py +++ b/data_subscriber/asf_slc_download.py @@ -1,25 +1,155 @@ + +import glob import json import netrc +import os from datetime import datetime, timedelta from pathlib import PurePath, Path +from os.path import abspath, getsize, join + +import requests from data_subscriber import ionosphere_download -from data_subscriber.asf_download import DaacDownloadAsf +from data_subscriber.download import DaacDownload +from data_subscriber.url import ( + _has_url, _to_urls, _to_https_urls, _slc_url_to_chunk_id, form_batch_id +) from tools import stage_orbit_file from tools.stage_ionosphere_file import IonosphereFileNotFoundException from tools.stage_orbit_file import (parse_orbit_time_range_from_safe, - NoQueryResultsException, - NoSuitableOrbitFileException, T_ORBIT, ORBIT_PAD) +from util.dataspace_util import (NoQueryResultsException, + NoSuitableOrbitFileException, + DEFAULT_DATASPACE_ENDPOINT) + + +class AsfDaacSlcDownload(DaacDownload): + + def __init__(self, provider): + super().__init__(provider) + self.daac_s3_cred_settings_key = "SLC_DOWNLOAD" + + def perform_download(self, session: requests.Session, es_conn, downloads: list[dict], args, token, job_id): + for download in downloads: + if not _has_url(download): + continue + + if args.transfer_protocol == "https": + product_url = _to_https_urls(download) + else: + product_url = _to_urls(download) + + self.logger.info("Processing product_url=%s", product_url) + product_id = _slc_url_to_chunk_id(product_url, str(download['revision_id'])) + + product_download_dir = self.downloads_dir / product_id + product_download_dir.mkdir(exist_ok=True) + + if args.dry_run: + self.logger.info("args.dry_run=%s. Skipping download.", args.dry_run) + continue + + if product_url.startswith("s3"): + product = product_filepath = self.download_product_using_s3( + product_url, token, target_dirpath=product_download_dir.resolve(), args=args + ) + else: + product = product_filepath = self.download_asf_product( + product_url, token, product_download_dir + ) + + self.logger.info("Marking %s as downloaded.", product_filepath) + self.logger.debug("download['id']=%s", download['id']) + + es_conn.mark_product_as_downloaded(download['id'], job_id) + + self.logger.debug(f"product_url_downloaded={product_url}") + + additional_metadata = {} + + try: + additional_metadata['processing_mode'] = download['processing_mode'] + except KeyError: + self.logger.warning("processing_mode not found in the slc_catalog ES index") + + if download.get("intersects_north_america"): + self.logger.info("Adding intersects_north_america to dataset metadata") + additional_metadata["intersects_north_america"] = True + + dataset_dir = self.extract_one_to_one(product, self.cfg, working_dir=Path.cwd(), + extra_metadata=additional_metadata, + name_postscript='-r'+str(download['revision_id'])) + self.update_pending_dataset_with_index_name(dataset_dir, '-r' + str(download['revision_id'])) -class AsfDaacSlcDownload(DaacDownloadAsf): - def download_orbit_file(self, dataset_dir, product_filepath, additional_metadata): + # Rename the dataset_dir to match the pattern w revision_id + new_dataset_dir = dataset_dir.parent / form_batch_id(dataset_dir.name, str(download['revision_id'])) + self.logger.debug("new_dataset_dir=%s", str(new_dataset_dir)) + + os.rename(str(dataset_dir), str(new_dataset_dir)) + + self.download_orbit_file(new_dataset_dir, product_filepath) + + # We've observed cases where the orbit file download seems to complete + # successfully, but the resulting files are empty, causing the PGE/SAS to crash. + # Check for any empty files now, so we can fail during this download job + # rather than during the SCIFLO job. + self.check_for_empty_orbit_files(new_dataset_dir) + + if additional_metadata['processing_mode'] in ("historical", "reprocessing"): + self.logger.info( + "Processing mode is %s. Attempting to download ionosphere correction file.", + additional_metadata['processing_mode'] + ) + self.download_ionosphere_file(new_dataset_dir, product_filepath) + + self.logger.info("Removing %s", product_filepath) + product_filepath.unlink(missing_ok=True) + + def download_asf_product(self, product_url, token: str, target_dirpath: Path): + self.logger.info("Requesting from %s", product_url) + + asf_response = self._handle_url_redirect(product_url, token) + asf_response.raise_for_status() + + product_filename = PurePath(product_url).name + product_download_path = target_dirpath / product_filename + + with open(product_download_path, "wb") as file: + file.write(asf_response.content) + + return product_download_path.resolve() + + def update_pending_dataset_with_index_name(self, dataset_dir: PurePath, postscript): + self.logger.info("Updating dataset's dataset.json with index name") + + with Path(dataset_dir / f"{dataset_dir.name}{postscript}.dataset.json").open("r") as fp: + dataset_json: dict = json.load(fp) + + with Path(dataset_dir / f"{dataset_dir.name}{postscript}.met.json").open("r") as fp: + met_dict: dict = json.load(fp) + + dataset_json.update( + { + "index": { + "suffix": ( + "{version}_{dataset}-{date}".format(version=dataset_json["version"], + dataset=met_dict["ProductType"], + date=datetime.utcnow().strftime("%Y.%m")) + ).lower() # suffix index name with `-YYYY.MM + } + } + ) + + with Path(dataset_dir / f"{dataset_dir.name}.dataset.json").open("w") as fp: + json.dump(dataset_json, fp) + + def download_orbit_file(self, dataset_dir, product_filepath): self.logger.info("Downloading associated orbit file") # Get the PCM username/password for authentication to Copernicus Dataspace - username, _, password = netrc.netrc().authenticators('dataspace.copernicus.eu') + username, _, password = netrc.netrc().authenticators(DEFAULT_DATASPACE_ENDPOINT) (_, safe_start_time, safe_stop_time) = parse_orbit_time_range_from_safe(product_filepath) safe_start_datetime = datetime.strptime(safe_start_time, "%Y%m%dT%H%M%S") @@ -101,6 +231,20 @@ def download_orbit_file(self, dataset_dir, product_filepath, additional_metadata self.logger.info("Added orbit file(s) to dataset") + def check_for_empty_orbit_files(self, dataset_dir): + self.logger.info("Checking for empty orbit file downloads within %s", dataset_dir) + + orbit_file_pattern = join(abspath(dataset_dir), "*.EOF") + + for orbit_file_path in glob.iglob(orbit_file_pattern): + if getsize(orbit_file_path) == 0: + raise RuntimeError( + f"Orbit file {orbit_file_path} was downloaded but empty.\n" + f"This download job will need to be retried once a valid orbit " + f"file is available." + ) + else: + self.logger.info("All downloaded orbit files are non-empty") def download_ionosphere_file(self, dataset_dir, product_filepath): try: output_ionosphere_filepath = ionosphere_download.download_ionosphere_correction_file( @@ -130,4 +274,3 @@ def update_pending_dataset_metadata_with_ionosphere_metadata(self, dataset_dir: with Path(dataset_dir / f"{dataset_dir.name}.met.json").open("w") as fp: json.dump(met_json, fp) - diff --git a/data_subscriber/download.py b/data_subscriber/download.py index c80d579c..d4879068 100644 --- a/data_subscriber/download.py +++ b/data_subscriber/download.py @@ -1,4 +1,4 @@ -import logging + import shutil from datetime import datetime from pathlib import PurePath, Path @@ -17,37 +17,12 @@ from data_subscriber.cmr import Provider, CMR_TIME_FORMAT from data_subscriber.query import DateTimeRange from data_subscriber.url import _to_batch_id, _to_orbit_number -from tools.stage_orbit_file import fatal_code +from util.backoff_util import fatal_code from util.conf_util import SettingsConf - -logger = logging.getLogger(__name__) +from util.edl_util import SessionWithHeaderRedirection AWS_REGION = "us-west-2" -class SessionWithHeaderRedirection(requests.Session): - """ - Borrowed from https://wiki.earthdata.nasa.gov/display/EL/How+To+Access+Data+With+Python - """ - - def __init__(self, username, password, auth_host): - super().__init__() - self.auth = (username, password) - self.auth_host = auth_host - - # Overrides from the library to keep headers when redirected to or from - # the NASA auth host. - def rebuild_auth(self, prepared_request, response): - headers = prepared_request.headers - url = prepared_request.url - - if "Authorization" in headers: - original_parsed = requests.utils.urlparse(response.request.url) - redirect_parsed = requests.utils.urlparse(url) - if (original_parsed.hostname != redirect_parsed.hostname) and \ - redirect_parsed.hostname != self.auth_host and \ - original_parsed.hostname != self.auth_host: - del headers["Authorization"] - class DaacDownload: diff --git a/tests/10TFP.geojson b/tests/10TFP.geojson index fdd42ea1..198eef46 120000 --- a/tests/10TFP.geojson +++ b/tests/10TFP.geojson @@ -1 +1 @@ -../geo/10TFP.geojson \ No newline at end of file +../geo/data/10TFP.geojson \ No newline at end of file diff --git a/tests/california_opera.geojson b/tests/california_opera.geojson index 2ac71083..0585add4 120000 --- a/tests/california_opera.geojson +++ b/tests/california_opera.geojson @@ -1 +1 @@ -../geo/california_opera.geojson \ No newline at end of file +../geo/data/california_opera.geojson \ No newline at end of file diff --git a/tests/calval_test_frame_only.geojson b/tests/calval_test_frame_only.geojson index 2f0ac2f6..b36f776e 120000 --- a/tests/calval_test_frame_only.geojson +++ b/tests/calval_test_frame_only.geojson @@ -1 +1 @@ -../geo/calval_test_frame_only.geojson \ No newline at end of file +../geo/data/calval_test_frame_only.geojson \ No newline at end of file diff --git a/tests/cslc-s1_priority_framebased.geojson b/tests/cslc-s1_priority_framebased.geojson index 02e5a976..47f824e3 120000 --- a/tests/cslc-s1_priority_framebased.geojson +++ b/tests/cslc-s1_priority_framebased.geojson @@ -1 +1 @@ -../geo/cslc-s1_priority_framebased.geojson \ No newline at end of file +../geo/data/cslc-s1_priority_framebased.geojson \ No newline at end of file diff --git a/tests/data_subscriber/test_daac_data_subscriber.py b/tests/data_subscriber/test_daac_data_subscriber.py index b6b9e764..49feeaa4 100644 --- a/tests/data_subscriber/test_daac_data_subscriber.py +++ b/tests/data_subscriber/test_daac_data_subscriber.py @@ -5,6 +5,7 @@ import pytest import smart_open +import util.edl_util try: import unittest.mock as umock @@ -64,7 +65,7 @@ async def test_full(monkeypatch): monkeypatch.setattr( download, - download.SessionWithHeaderRedirection.__name__, + util.edl_util.SessionWithHeaderRedirection.__name__, MagicMock() ) @@ -197,7 +198,7 @@ async def test_download(monkeypatch): monkeypatch.setattr( download, - download.SessionWithHeaderRedirection.__name__, + util.edl_util.SessionWithHeaderRedirection.__name__, MagicMock() ) @@ -325,7 +326,7 @@ async def test_download_https(monkeypatch): mock_create_merged_files(monkeypatch) monkeypatch.setattr( download, - download.SessionWithHeaderRedirection.__name__, + util.edl_util.SessionWithHeaderRedirection.__name__, MagicMock() ) diff --git a/tests/dissolved_cslc-s1_priority_framebased.geojson b/tests/dissolved_cslc-s1_priority_framebased.geojson index 1beacf06..cd6d33d0 120000 --- a/tests/dissolved_cslc-s1_priority_framebased.geojson +++ b/tests/dissolved_cslc-s1_priority_framebased.geojson @@ -1 +1 @@ -../geo/dissolved_cslc-s1_priority_framebased.geojson \ No newline at end of file +../geo/data/dissolved_cslc-s1_priority_framebased.geojson \ No newline at end of file diff --git a/tests/nevada_opera.geojson b/tests/nevada_opera.geojson index 1bba4514..92242bc3 120000 --- a/tests/nevada_opera.geojson +++ b/tests/nevada_opera.geojson @@ -1 +1 @@ -../geo/nevada_opera.geojson \ No newline at end of file +../geo/data/nevada_opera.geojson \ No newline at end of file diff --git a/tests/north_america_opera.geojson b/tests/north_america_opera.geojson index a506e36f..39c45b06 120000 --- a/tests/north_america_opera.geojson +++ b/tests/north_america_opera.geojson @@ -1 +1 @@ -../geo/north_america_opera.geojson \ No newline at end of file +../geo/data/north_america_opera.geojson \ No newline at end of file diff --git a/tests/unit/tools/test_stage_orbit_file.py b/tests/unit/tools/test_stage_orbit_file.py index 6079d0ac..a42ac93d 100644 --- a/tests/unit/tools/test_stage_orbit_file.py +++ b/tests/unit/tools/test_stage_orbit_file.py @@ -11,8 +11,8 @@ import tools.stage_orbit_file from tools.stage_orbit_file import (ORBIT_TYPE_POE, - ORBIT_TYPE_RES, - NoSuitableOrbitFileException) + ORBIT_TYPE_RES) +from util.dataspace_util import NoSuitableOrbitFileException class TestStageOrbitFile(unittest.TestCase): diff --git a/tools/dataspace_s1_download.py b/tools/dataspace_s1_download.py index 3fb2f1e1..d571775b 100644 --- a/tools/dataspace_s1_download.py +++ b/tools/dataspace_s1_download.py @@ -13,28 +13,19 @@ import json import os from concurrent.futures import ThreadPoolExecutor, as_completed -from datetime import datetime, timedelta +from datetime import datetime from getpass import getuser from pathlib import Path -from threading import Lock -from typing import Tuple import backoff import requests -from commons.logger import logger, LogLevels - -DEFAULT_QUERY_ENDPOINT = 'https://catalogue.dataspace.copernicus.eu/odata/v1/Products' -"""Default URL endpoint for the Copernicus Data Space Ecosystem (CDSE) query REST service""" - -DEFAULT_AUTH_ENDPOINT = 'https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token' -"""Default URL endpoint for performing user authentication with CDSE""" - -DEFAULT_DOWNLOAD_ENDPOINT = 'https://download.dataspace.copernicus.eu/odata/v1/Products' -"""Default URL endpoint for CDSE download REST service""" - -DEFAULT_SESSION_ENDPOINT = 'https://identity.dataspace.copernicus.eu/auth/realms/CDSE/account/sessions' -"""Default URL endpoint to manage authentication sessions with CDSE""" +from commons.logger import logger +from util.backoff_util import fatal_code, backoff_logger +from util.dataspace_util import (DEFAULT_QUERY_ENDPOINT, + DEFAULT_DOWNLOAD_ENDPOINT, + NoQueryResultsException, + DataspaceSession) ISO_TIME = '%Y-%m-%dT%H:%M:%SZ' """Temporal format required by ODATA API: yyyy-mm-ddTHH:MM:SSZ""" @@ -43,122 +34,11 @@ """Max number of files returned per query""" -def fatal_code(err: requests.exceptions.RequestException) -> bool: - """Only retry for common transient errors""" - return err.response.status_code not in [418, 429, 500, 502, 503, 504] - - def to_datetime(value) -> datetime: """Helper function to covert command-line arg to datetime object""" return datetime.strptime(value, ISO_TIME) -class NoQueryResultsException(Exception): - """Custom exception to identify empty results from a query""" - pass - - -class DataspaceSession: - """ - Context manager class to wrap credentialed operations in. - Creates a session and gets its token on entry, and deletes the session on exit. - """ - - def __init__(self, username, password): - self.__lock = Lock() - self.__token, self.__session, self.__refresh_token, self.__expires = self._get_token(username, password) - - @property - def token(self) -> str: - """Get the current access token for the Dataspace session. If it is expired, it will attempt to refresh""" - - with self.__lock: - if datetime.now() > self.__expires: - self.__token, self.__session, self.__refresh_token, self.__expires = self._refresh() - - return self.__token - - @backoff.on_exception(backoff.constant, requests.exceptions.RequestException, max_time=300, giveup=fatal_code, - interval=15) - def _refresh(self) -> Tuple[str, str, str, datetime]: - data = { - 'client_id': 'cdse-public', - 'refresh_token': self.__refresh_token, - 'grant_type': 'refresh_token', - } - - response = requests.post(DEFAULT_AUTH_ENDPOINT, data=data) - - logger.info(f'Refresh Dataspace token request: {response.status_code}') - - response.raise_for_status() - - try: - response_json = response.json() - - access_token = response_json['access_token'] - session_id = response_json['session_state'] - refresh_token = response_json['refresh_token'] - expires_in = response_json['expires_in'] - except KeyError as e: - raise RuntimeError( - f'Failed to parse expected field "{str(e)}" from authentication response.' - ) - - logger.info(f'Refreshed Dataspace token for session {self.__session}') - - return access_token, session_id, refresh_token, datetime.now() + timedelta(seconds=expires_in) - - @backoff.on_exception(backoff.constant, requests.exceptions.RequestException, max_time=300, giveup=fatal_code, - interval=15) - def _get_token(self, username: str, password: str) -> Tuple[str, str, str, datetime]: - data = { - 'client_id': 'cdse-public', - 'username': username, - 'password': password, - 'grant_type': 'password' - } - - response = requests.post(DEFAULT_AUTH_ENDPOINT, data=data) - - logger.info(f'Get Dataspace token for {username}: {response.status_code}') - - response.raise_for_status() - - try: - response_json = response.json() - - access_token = response_json['access_token'] - session_id = response_json['session_state'] - refresh_token = response_json['refresh_token'] - expires_in = response_json['expires_in'] - except KeyError as e: - raise RuntimeError( - f'Failed to parse expected field "{str(e)}" from authentication response.' - ) - - logger.info(f'Created Dataspace session {session_id}') - - return access_token, session_id, refresh_token, datetime.now() + timedelta(seconds=expires_in) - - @backoff.on_exception(backoff.constant, requests.exceptions.RequestException, max_time=300, giveup=fatal_code, - interval=15) - def _delete_token(self): - url = f'{DEFAULT_SESSION_ENDPOINT}/{self.__session}' - headers = {'Authorization': f'Bearer {self.token}', 'Content-Type': 'application/json'} - - response = requests.delete(url=url, headers=headers) - - logger.info(f'Delete request {response.url}: {response.status_code}') - response.raise_for_status() - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, exc_traceback): - self._delete_token() - - def get_parser(): """Returns the command line parser for dataspace_s1_download.py""" @@ -233,7 +113,12 @@ def build_query_filter(platform, *args): return {'$filter': filter_string, '$orderby': 'ContentDate/Start asc', '$top': str(QUERY_PAGE_SIZE)} -@backoff.on_exception(backoff.constant, requests.exceptions.RequestException, max_time=300, giveup=fatal_code, interval=15) +@backoff.on_exception(backoff.constant, + requests.exceptions.RequestException, + max_time=300, + giveup=fatal_code, + on_backoff=backoff_logger, + interval=15) def _do_query(url, **kwargs): response = requests.get(url, **kwargs) @@ -279,15 +164,19 @@ def main(): query_results = query(build_query_filter(args.platform, *query_filters)) if len(query_results) == 0: - raise NoQueryResultsException() + raise NoQueryResultsException('No results returned from parsed query results') if args.url_only: urls = {r['Name']: f'{DEFAULT_DOWNLOAD_ENDPOINT}({r["Id"]})/$value' for r in query_results} print(json.dumps(urls, indent=4)) else: with DataspaceSession(args.username, args.password) as dss: - @backoff.on_exception(backoff.constant, requests.exceptions.RequestException, max_time=300, - giveup=fatal_code, interval=15) + @backoff.on_exception(backoff.constant, + requests.exceptions.RequestException, + max_time=300, + giveup=fatal_code, + on_backoff=backoff_logger, + interval=15) def do_download(gid, filename): start_t = datetime.now() diff --git a/tools/stage_ionosphere_file.py b/tools/stage_ionosphere_file.py index f88577f1..ff6dd914 100644 --- a/tools/stage_ionosphere_file.py +++ b/tools/stage_ionosphere_file.py @@ -17,20 +17,16 @@ import re import subprocess import sys - from os.path import abspath, join import requests - -from commons.logger import logger from commons.logger import LogLevels +from commons.logger import logger +from util.edl_util import DEFAULT_EDL_ENDPOINT, SessionWithHeaderRedirection DEFAULT_DOWNLOAD_ENDPOINT = "https://cddis.nasa.gov/archive/gnss/products/ionex" """Default URL endpoint for Ionosphere download requests""" -DEFAULT_EDL_ENDPOINT = "urs.earthdata.nasa.gov" -"""Default endpoint for authenticating with EarthData Login""" - IONOSPHERE_TYPE_JPLG = "jplg" IONOSPHERE_TYPE_JPRG = "jprg" VALID_IONOSPHERE_TYPES = [IONOSPHERE_TYPE_JPLG, IONOSPHERE_TYPE_JPRG] @@ -89,40 +85,6 @@ def get_parser(): return parser -# TODO: this should land in a "login" utility module at some point -class SessionWithHeaderRedirection(requests.Session): - """ - Class with an override of the requests.Session.rebuild_auth to maintain - headers when redirected by EarthData Login. - - This code was adapted from the examples available here: - https://urs.earthdata.nasa.gov/documentation/for_users/data_access/python - - """ - AUTH_HOST = DEFAULT_EDL_ENDPOINT - - def __init__(self, username, password): - super().__init__() - self.auth = (username, password) - - # Overrides from the library to keep headers when redirected to or from - # the NASA auth host. - def rebuild_auth(self, prepared_request, response): - headers = prepared_request.headers - url = prepared_request.url - - if 'Authorization' in headers: - original_parsed = requests.utils.urlparse(response.request.url) - redirect_parsed = requests.utils.urlparse(url) - - if (original_parsed.hostname != redirect_parsed.hostname) and \ - redirect_parsed.hostname != self.AUTH_HOST and \ - original_parsed.hostname != self.AUTH_HOST: - del headers['Authorization'] - - return - - def parse_start_date_from_safe(input_safe_file): """ Parses the start date from the name of an input SLC archive. diff --git a/tools/stage_orbit_file.py b/tools/stage_orbit_file.py index 9eb1e426..8783fbdd 100644 --- a/tools/stage_orbit_file.py +++ b/tools/stage_orbit_file.py @@ -13,27 +13,22 @@ import argparse import os import re -import requests - from datetime import datetime, timedelta from os.path import abspath import backoff +import requests -from commons.logger import logger from commons.logger import LogLevels - -DEFAULT_QUERY_ENDPOINT = 'https://catalogue.dataspace.copernicus.eu/odata/v1/Products' -"""Default URL endpoint for the Copernicus Data Space Ecosystem (CDSE) query REST service""" - -DEFAULT_AUTH_ENDPOINT = 'https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token' -"""Default URL endpoint for performing user authentication with CDSE""" - -DEFAULT_DOWNLOAD_ENDPOINT = 'https://zipper.dataspace.copernicus.eu/odata/v1/Products' -"""Default URL endpoint for CDSE download REST service""" - -DEFAULT_SESSION_ENDPOINT = 'https://identity.dataspace.copernicus.eu/auth/realms/CDSE/account/sessions' -"""Default URL endpoint to manage authentication sessions with CDSE""" +from commons.logger import logger +from util.backoff_util import fatal_code, backoff_logger +from util.dataspace_util import (DEFAULT_QUERY_ENDPOINT, + DEFAULT_AUTH_ENDPOINT, + DEFAULT_DOWNLOAD_ENDPOINT, + DEFAULT_SESSION_ENDPOINT, + NoQueryResultsException, + NoSuitableOrbitFileException, + DataspaceSession) ORBIT_TYPE_POE = 'POEORB' """Orbit type identifier for Precise Orbit Ephemeris""" @@ -65,19 +60,6 @@ ascending node crossing is included when choosing the orbit file """ -class NoQueryResultsException(Exception): - """Custom exception to identify empty results from a query""" - pass - - -class NoSuitableOrbitFileException(Exception): - """Custom exception to identify no orbit files meeting overlap criteria""" - - -def to_datetime(value): - """Helper function to covert command-line arg to datetime object""" - return datetime.strptime(value, "%Y%m%dT%H%M%S") - def get_parser(): """Returns the command line parser for stage_orbit_file.py""" @@ -265,17 +247,6 @@ def construct_orbit_file_query(mission_id, orbit_type, search_start_time, search return query -def fatal_code(err: requests.exceptions.RequestException) -> bool: - """Only retry for common transient errors""" - return err.response.status_code not in [401, 429, 500, 502, 503, 504] - -def backoff_logger(details): - """Log details about the current backoff/retry""" - logger.warning( - f"Backing off {details['target']} function for {details['wait']:0.1f} " - f"seconds after {details['tries']} tries." - ) - logger.warning(f"Total time elapsed: {details['elapsed']:0.1f} seconds.") @backoff.on_exception(backoff.constant, requests.exceptions.RequestException, @@ -434,100 +405,6 @@ def select_orbit_file(query_results, req_start_time, req_stop_time): "No suitable orbit file could be found within the results of the query" ) -@backoff.on_exception(backoff.constant, - requests.exceptions.RequestException, - max_time=600, - giveup=fatal_code, - on_backoff=backoff_logger, - interval=15) -def get_access_token(endpoint_url, username, password): - """ - Acquires an access token from the CDSE authentication endpoint using the - credentials provided by the user. - - Parameters - ---------- - endpoint_url : str - URL to the authentication endpoint to provide credentials to. - username : str - Username of the account to authenticate with. - password : str - Password of the account to authenticate with. - - Returns - ------- - access_token : str - The access token parsed from a successful authentication request. - This token must be included with download requests for them to be valid. - session_id : str - The ID associated with the authenticated session. Should be used to - request deletion of the session once the desired orbit file(s) is downloaded. - - Raises - ------ - RuntimeError - If the authentication request fails, or an invalid response is returned - from the service. - - """ - # Set up the payload to the authentication service - data = { - "client_id": "cdse-public", - "username": username, - "password": password, - "grant_type": "password", - } - - response = requests.post(endpoint_url, data=data,) - response.raise_for_status() - - # Parse the access token from the response - try: - access_token = response.json()["access_token"] - session_id = response.json()["session_state"] - except KeyError as err: - raise RuntimeError( - f'Failed to parsed expected field "{str(err)}" from authentication response.' - ) - - return access_token, session_id - -@backoff.on_exception(backoff.constant, - requests.exceptions.RequestException, - max_time=300, - giveup=fatal_code, - on_backoff=backoff_logger, - interval=15) -def delete_access_token(endpoint_url, access_token, session_id): - """ - Submits a delete request on the provided endpoint URL for the provided - session ID. This function should always be called after successful authentication - to ensure we don't leave too many active session open (and hit the limit of - the service provider). - - Parameters - ---------- - endpoint_url : str - URL to the session deletion endpoint to request to. - access_token : str - Bearer token acquired from a previous successful authentication. Must - be provided to authenticate the session deletion request. - session_id : str - Identifier for the authenticated session corresponding to the provided - access token. Should have been provided in the same response as the - access token when the valid authentication request was granted by the - service provider. - - """ - request_url = f'{endpoint_url}/{session_id}' - headers = {'Authorization': f"Bearer {access_token}", 'Content-Type': 'application/json'} - - response = requests.delete(url=request_url, headers=headers) - - logger.debug(f'response.url: {response.url}') - logger.debug(f'response.status_code: {response.status_code}') - - response.raise_for_status() @backoff.on_exception(backoff.constant, requests.exceptions.RequestException, @@ -642,25 +519,18 @@ def main(args): # query result to the directory specified by the user else: # Obtain an access token for use with the download request from the provided - # credentials - logger.info("Authenticating to orbit file service provider") - access_token, session_id = get_access_token(args.auth_endpoint, args.username, args.password) - - try: + # credentials. This session will automatically delete itself after exiting + # this context. + with DataspaceSession(args.username, args.password) as dss: logger.info( f"Downloading Orbit file {orbit_file_name} from service endpoint " f"{args.download_endpoint}" ) output_orbit_file_path = download_orbit_file( - download_url, args.output_directory, orbit_file_name, access_token + download_url, args.output_directory, orbit_file_name, dss.token ) logger.info(f"Orbit file downloaded to {output_orbit_file_path}") - finally: - # Make sure we delete the current authentication session to avoid - # hitting the limit of active concurrent sessions - logger.info("Requesting deletion of open authentication session") - delete_access_token(args.session_endpoint, access_token, session_id) if __name__ == '__main__': diff --git a/util/aws_util.py b/util/aws_util.py index 32b6dfbe..aa9b643a 100644 --- a/util/aws_util.py +++ b/util/aws_util.py @@ -1,19 +1,19 @@ """Collection of AWS-related utilities""" + import concurrent.futures import contextlib -import logging -import threading import os +import threading from pathlib import Path from typing import Collection import backoff import boto3 from boto3.exceptions import Boto3Error -from more_itertools import chunked from mypy_boto3_s3 import S3Client -logger = logging.getLogger(__name__) +from commons.logger import logger +from util.backoff_util import giveup_s3_client_upload_file def concurrent_s3_client_try_upload_file(bucket: str, key_prefix: str, files: Collection[Path]): @@ -38,19 +38,6 @@ def concurrent_s3_client_try_upload_file(bucket: str, key_prefix: str, files: Co return s3paths -def giveup_s3_client_upload_file(e): - """ - giveup function for use with @backoff decorator. This only checks for a local-testing condition of running into - an expired AWS CLI/SDK session token. - """ - if isinstance(e, boto3.exceptions.Boto3Error): - if isinstance(e, boto3.exceptions.S3UploadFailedError): - if "ExpiredToken" in e.args[0]: - logger.error("Local testing error. Give up immediately.") - return True - return False - - @backoff.on_exception(backoff.expo, exception=Boto3Error, max_tries=3, jitter=None, giveup=giveup_s3_client_upload_file) def try_s3_client_try_upload_file(s3_client: S3Client = None, sem: threading.Semaphore = None, **kwargs): """ diff --git a/util/backoff_util.py b/util/backoff_util.py new file mode 100644 index 00000000..15fd2c2a --- /dev/null +++ b/util/backoff_util.py @@ -0,0 +1,32 @@ +"""Utility functions used with backoff/retry decorators""" +import boto3 +import requests + +from commons.logger import logger + + +def fatal_code(err: requests.exceptions.RequestException) -> bool: + """Only retry for common transient errors""" + return err.response.status_code not in [401, 418, 429, 500, 502, 503, 504] + + +def backoff_logger(details): + """Log details about the current backoff/retry""" + logger.warning( + f"Backing off {details['target']} function for {details['wait']:0.1f} " + f"seconds after {details['tries']} tries." + ) + logger.warning(f"Total time elapsed: {details['elapsed']:0.1f} seconds.") + + +def giveup_s3_client_upload_file(e): + """ + giveup function for use with @backoff decorator. This only checks for a + local-testing condition of running into an expired AWS CLI/SDK session token. + """ + if isinstance(e, boto3.exceptions.Boto3Error): + if isinstance(e, boto3.exceptions.S3UploadFailedError): + if "ExpiredToken" in e.args[0]: + logger.error("Local testing error. Give up immediately.") + return True + return False diff --git a/util/dataspace_util.py b/util/dataspace_util.py new file mode 100644 index 00000000..2209f35c --- /dev/null +++ b/util/dataspace_util.py @@ -0,0 +1,208 @@ +"""Utility functions and classes used to interface with the ESA Dataspace service""" + +from datetime import datetime, timedelta +from threading import Lock +from typing import Tuple + +import backoff +import requests + +from commons.logger import logger +from util.backoff_util import fatal_code, backoff_logger + +DEFAULT_DATASPACE_ENDPOINT = 'dataspace.copernicus.eu' +"""Default endpoint for pulling Dataspace credentials from netrc""" + +DEFAULT_QUERY_ENDPOINT = 'https://catalogue.dataspace.copernicus.eu/odata/v1/Products' +"""Default URL endpoint for the Copernicus Data Space Ecosystem (CDSE) query REST service""" + +DEFAULT_AUTH_ENDPOINT = 'https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token' +"""Default URL endpoint for performing user authentication with CDSE""" + +DEFAULT_DOWNLOAD_ENDPOINT = 'https://zipper.dataspace.copernicus.eu/odata/v1/Products' +"""Default URL endpoint for CDSE download REST service""" + +DEFAULT_SESSION_ENDPOINT = 'https://identity.dataspace.copernicus.eu/auth/realms/CDSE/account/sessions' +"""Default URL endpoint to manage authentication sessions with CDSE""" + + +class NoQueryResultsException(Exception): + """Custom exception to identify empty results from a query""" + pass + + +class NoSuitableOrbitFileException(Exception): + """Custom exception to identify no orbit files meeting overlap criteria""" + + +class DataspaceSession: + """ + Context manager class to wrap credentialed operations in. + Creates a session and gets its token on entry, and deletes the session on exit. + """ + + def __init__(self, username, password): + self.__lock = Lock() + self.__token, self.__session, self.__refresh_token, self.__expires = self._get_token(username, password) + + @property + def token(self) -> str: + """ + Get the current access token for the Dataspace session. + If it is expired, it will attempt to refresh. + """ + + with self.__lock: + if datetime.now() > self.__expires: + self.__token, self.__session, self.__refresh_token, self.__expires = self._refresh() + + return self.__token + + @backoff.on_exception(backoff.constant, + requests.exceptions.RequestException, + max_time=300, + giveup=fatal_code, + on_backoff=backoff_logger, + interval=15) + def _refresh(self) -> Tuple[str, str, str, datetime]: + """ + Performs an access token refresh request using the refresh token acquired + during initial authentication. + + Returns + ------- + access_token : str + The refreshed access token. + session_id : str + The ID associated with the authenticated session. Should be used to + request deletion of the session once the desired orbit file(s) is downloaded. + refresh_token : str + Token used to refresh the access token prior to its next expiration. + expiration_time : datetime.datetime + The time when the access token is next expected to expire. + + """ + data = { + 'client_id': 'cdse-public', + 'refresh_token': self.__refresh_token, + 'grant_type': 'refresh_token', + } + + response = requests.post(DEFAULT_AUTH_ENDPOINT, data=data) + + logger.info(f'Refresh Dataspace token request: {response.status_code}') + + response.raise_for_status() + + try: + response_json = response.json() + + access_token = response_json['access_token'] + session_id = response_json['session_state'] + refresh_token = response_json['refresh_token'] + expires_in = response_json['expires_in'] + except KeyError as e: + raise RuntimeError( + f'Failed to parse expected field "{str(e)}" from authentication response.' + ) + + logger.info(f'Refreshed Dataspace token for session {self.__session}') + + return access_token, session_id, refresh_token, datetime.now() + timedelta(seconds=expires_in) + + @backoff.on_exception(backoff.constant, + requests.exceptions.RequestException, + max_time=600, + giveup=fatal_code, + on_backoff=backoff_logger, + interval=15) + def _get_token(self, username: str, password: str) -> Tuple[str, str, str, datetime]: + """ + Acquires an access token from the CDSE authentication endpoint using the + credentials provided by the user. + + Parameters + ---------- + username : str + Username of the account to authenticate with. + password : str + Password of the account to authenticate with. + + Returns + ------- + access_token : str + The access token parsed from a successful authentication request. + This token must be included with download requests for them to be valid. + session_id : str + The ID associated with the authenticated session. Should be used to + request deletion of the session once the desired orbit file(s) is downloaded. + refresh_token : str + Token used to refresh the access token prior to its expiration. + expiration_time : datetime.datetime + The time when the access token is expected to expire. + + Raises + ------ + RuntimeError + If the authentication request fails, or an invalid response is returned + from the service. + + """ + data = { + 'client_id': 'cdse-public', + 'username': username, + 'password': password, + 'grant_type': 'password' + } + + response = requests.post(DEFAULT_AUTH_ENDPOINT, data=data) + + logger.info(f'Get Dataspace token for {username}: {response.status_code}') + + response.raise_for_status() + + try: + response_json = response.json() + + access_token = response_json['access_token'] + session_id = response_json['session_state'] + refresh_token = response_json['refresh_token'] + expires_in = response_json['expires_in'] + except KeyError as e: + raise RuntimeError( + f'Failed to parse expected field "{str(e)}" from authentication response.' + ) + + logger.info(f'Created Dataspace session {session_id}') + + return access_token, session_id, refresh_token, datetime.now() + timedelta(seconds=expires_in) + + @backoff.on_exception(backoff.constant, + requests.exceptions.RequestException, + max_time=300, + giveup=fatal_code, + on_backoff=backoff_logger, + interval=15) + def _delete_token(self): + """ + Submits a delete request on the provided endpoint URL for the provided + session ID. This function should always be called after successful authentication + to ensure we don't leave too many active session open (and hit the limit of + the service provider). + """ + logger.info("Requesting deletion of open authentication session") + + url = f'{DEFAULT_SESSION_ENDPOINT}/{self.__session}' + headers = {'Authorization': f'Bearer {self.token}', 'Content-Type': 'application/json'} + + response = requests.delete(url=url, headers=headers) + + logger.info(f'Delete request {response.url}: {response.status_code}') + + response.raise_for_status() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + self._delete_token() diff --git a/util/edl_util.py b/util/edl_util.py new file mode 100644 index 00000000..0b474819 --- /dev/null +++ b/util/edl_util.py @@ -0,0 +1,35 @@ +"""Utility functions and classes used to interface with the Earthdata Login service""" + +import requests + +DEFAULT_EDL_ENDPOINT = "urs.earthdata.nasa.gov" +"""Default endpoint for authenticating with EarthData Login""" + + +class SessionWithHeaderRedirection(requests.Session): + """ + Class with an override of the requests.Session.rebuild_auth to maintain + headers when redirected by EarthData Login. + + This code was adapted from the examples available here: + https://urs.earthdata.nasa.gov/documentation/for_users/data_access/python + """ + + def __init__(self, username, password, auth_host=DEFAULT_EDL_ENDPOINT): + super().__init__() + self.auth = (username, password) + self.auth_host = auth_host + + # Overrides from the library to keep headers when redirected to or from + # the NASA auth host. + def rebuild_auth(self, prepared_request, response): + headers = prepared_request.headers + url = prepared_request.url + + if "Authorization" in headers: + original_parsed = requests.utils.urlparse(response.request.url) + redirect_parsed = requests.utils.urlparse(url) + if (original_parsed.hostname != redirect_parsed.hostname) and \ + redirect_parsed.hostname != self.auth_host and \ + original_parsed.hostname != self.auth_host: + del headers["Authorization"]