From 10ecfc735a12b956fe01c4fbe24c3c72cc4e1c7e Mon Sep 17 00:00:00 2001 From: ninsbl Date: Fri, 28 Jul 2023 08:50:15 +0200 Subject: [PATCH] handle token timeout, linting --- src/temporal/t.import.hrsi/t.import.hrsi.py | 79 +++++++++++++++++---- 1 file changed, 66 insertions(+), 13 deletions(-) diff --git a/src/temporal/t.import.hrsi/t.import.hrsi.py b/src/temporal/t.import.hrsi/t.import.hrsi.py index db956932..6ccdd8e0 100644 --- a/src/temporal/t.import.hrsi/t.import.hrsi.py +++ b/src/temporal/t.import.hrsi/t.import.hrsi.py @@ -146,6 +146,16 @@ # % guisection: Filter # %end +# %option +# % key: batch_size +# % label: Size of batches of files to download +# % type: integer +# % required: no +# % multiple: no +# % answer: 500 +# % guisection: Filter +# %end + # %option G_OPT_F_INPUT # % key: credits_file # % type: string @@ -194,8 +204,11 @@ from datetime import datetime from http.client import IncompleteRead from io import BytesIO -from multiprocessing import Pool + +# from multiprocessing import Pool +from concurrent import futures from pathlib import Path +from subprocess import PIPE from urllib import parse from urllib import request @@ -224,6 +237,7 @@ def __init__( credits_file=None, import_module=None, category_module=None, + batch_size="1000", cores=1, ): #: Number of cores to use for parallel download, extraction and import (if relevant) @@ -248,6 +262,8 @@ def __init__( #: Attribute containg credits for cryo.land.copernicus.eu as tuple (username, password) self.user_credits = (None, None) self.__get_cryo_land_credits(credits_file) + #: Attribute containing the search url + self.search_url = None #: Attribute containing initial search results self.initial_search_result = None #: Attribute containing a dictionary describing the requested HRSI product @@ -276,6 +292,7 @@ def __init__( Path(self.vrt_dir).mkdir(parents=True, exist_ok=True) #: Attribute defining if existing data should be overwritten self.recreate = gs.overwrite() + self.batch_size = batch_size def __parse_api_description(self, api_format="application/json"): """Extracts supported search parameters from API description XML @@ -389,13 +406,32 @@ def fetch_data(self, query_params, product_metadata): """Wrapper method to execute download in batches""" check_permissions(self.output_directory, "Download") # Minimize pageing - query_params["maxRecords"] = "2000" + query_params["maxRecords"] = self.batch_size + batches_n = int( + np.ceil( + self.initial_search_result["properties"]["totalResults"] + / float(self.batch_size) + ) + ) + batch_number = 0 + gs.verbose( + _( + "Downloading a total of {files} files in {batches} batches of data" + ).format( + files=self.initial_search_result["properties"]["totalResults"], + batches=batches_n, + ) + ) self.__construct_search_url(query_params) - self.__get_token() self.requested_hrsi_product = product_metadata next_batch = True url = self.search_url while next_batch: + batch_number = batch_number + 1 + self.__get_token() + gs.verbose( + _("Downloading batch {n}: {url}").format(n=batch_number, url=url) + ) with request.urlopen(url) as req: resp = req.read() resp_dict = json.loads(resp) @@ -409,7 +445,8 @@ def fetch_data(self, query_params, product_metadata): if active_cores == 1: result_dicts = [self._download_and_import_data(download_urls)] elif active_cores > 1: - with Pool(active_cores) as pool: + # with Pool(active_cores) as pool: + with futures.ThreadPoolExecutor(active_cores) as pool: result_dicts = pool.map( self._download_and_import_data, [ @@ -441,6 +478,7 @@ def fetch_data(self, query_params, product_metadata): ] ) ) + gs.percent(batch_number, batches_n, 1) def __get_cryo_land_credits(self, credits_file=None): """ @@ -491,7 +529,7 @@ def _download_and_import_data(self, download_urls): failed_downloads = [] # Download dict content for download_url in download_urls: - gs.verbose(_("Downloading {}...").format(download_url)) + # gs.verbose(_("Downloading {}...").format(download_url)) # Initialize request hrsi_request = request.Request(download_url) # Add token authorization @@ -531,13 +569,13 @@ def _download_and_import_data(self, download_urls): # temporal extend is not consistently represented in the # metadata, so this part of the code is deactivated if not self.requested_hrsi_product["time_pattern"]: - m = etree.fromstring(zip_data) - mdd = MD_Metadata(m) + metadata_xml = etree.fromstring(zip_data) + meta_data = MD_Metadata(metadata_xml) - # title = mdd.identification[0].title - # description = mdd.identification[0].abstract - start = mdd.identification[0].temporalextent_start - end = mdd.identification[0].temporalextent_end + # title = meta_data.identification[0].title + # description = meta_data.identification[0].abstract + start = meta_data.identification[0].temporalextent_start + end = meta_data.identification[0].temporalextent_end if start == end: end = None @@ -587,6 +625,7 @@ def _download_and_import_data(self, download_urls): ], recreate=self.recreate, ) + input_path = Path(input_path) except Exception: gdal_dataset = None continue @@ -605,10 +644,13 @@ def _download_and_import_data(self, download_urls): import_mod.run() except Exception: gs.warning( - _("Could not import map {}").format(str(hrsi_file_path)) + _("Could not import raster map {}").format( + str(hrsi_file_path) + ) ) hrsi_file_path.unlink() continue + # Add categories if relevant if self.requested_hrsi_product[sub_product]["categories"]: Module( @@ -783,7 +825,7 @@ def create_vrt( band.SetMetadataItem("STATISTICS_MAXIMUM", str(data_range[1])) vrt = None - return vrt_path + return str(vrt_path) def align_bbox(bbox, resolution_x, resolution_y): @@ -1450,6 +1492,10 @@ def main(): }, } + # Check batch_size input + if not 0 < int(options["batch_size"]) <= 2000: + gs.fatal(_("Invalid input for batch_size. Valid range is 1-2000.")) + # Get GRASS GIS environment gisenv = dict(gs.gisenv()) @@ -1474,6 +1520,8 @@ def main(): if flags["l"] or flags["f"]: import_module = Module( "r.external", + stdout_=PIPE, + stderr_=PIPE, quiet=True, overwrite=gs.overwrite(), run_=False, @@ -1482,6 +1530,8 @@ def main(): else: import_module = Module( "r.in.gdal", + stdout_=PIPE, + stderr_=PIPE, quiet=True, overwrite=gs.overwrite(), run_=False, @@ -1531,12 +1581,15 @@ def main(): # Make sure the query result does not change during download query_params["publishedBefore"] = run_time.isoformat() + query_params["maxRecords"] = options["batch_size"] + # Create download object clc_downloader = CLCCryoDownloader( output_directory=options["output_directory"], credits_file=options["credits_file"], import_module=import_module, category_module=category_module, + batch_size=options["batch_size"], cores=int(options["nprocs"]), )