Skip to content

Commit

Permalink
handle token timeout, linting
Browse files Browse the repository at this point in the history
  • Loading branch information
ninsbl committed Jul 28, 2023
1 parent 06d05ea commit 10ecfc7
Showing 1 changed file with 66 additions and 13 deletions.
79 changes: 66 additions & 13 deletions src/temporal/t.import.hrsi/t.import.hrsi.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@
# % guisection: Filter
# %end

# %option
# % key: batch_size
# % label: Size of batches of files to download
# % type: integer
# % required: no
# % multiple: no
# % answer: 500
# % guisection: Filter
# %end

# %option G_OPT_F_INPUT
# % key: credits_file
# % type: string
Expand Down Expand Up @@ -194,8 +204,11 @@
from datetime import datetime
from http.client import IncompleteRead
from io import BytesIO
from multiprocessing import Pool

# from multiprocessing import Pool
from concurrent import futures
from pathlib import Path
from subprocess import PIPE
from urllib import parse
from urllib import request

Expand Down Expand Up @@ -224,6 +237,7 @@ def __init__(
credits_file=None,
import_module=None,
category_module=None,
batch_size="1000",
cores=1,
):
#: Number of cores to use for parallel download, extraction and import (if relevant)
Expand All @@ -248,6 +262,8 @@ def __init__(
#: Attribute containg credits for cryo.land.copernicus.eu as tuple (username, password)
self.user_credits = (None, None)
self.__get_cryo_land_credits(credits_file)
#: Attribute containing the search url
self.search_url = None
#: Attribute containing initial search results
self.initial_search_result = None
#: Attribute containing a dictionary describing the requested HRSI product
Expand Down Expand Up @@ -276,6 +292,7 @@ def __init__(
Path(self.vrt_dir).mkdir(parents=True, exist_ok=True)
#: Attribute defining if existing data should be overwritten
self.recreate = gs.overwrite()
self.batch_size = batch_size

def __parse_api_description(self, api_format="application/json"):
"""Extracts supported search parameters from API description XML
Expand Down Expand Up @@ -389,13 +406,32 @@ def fetch_data(self, query_params, product_metadata):
"""Wrapper method to execute download in batches"""
check_permissions(self.output_directory, "Download")
# Minimize pageing
query_params["maxRecords"] = "2000"
query_params["maxRecords"] = self.batch_size
batches_n = int(
np.ceil(
self.initial_search_result["properties"]["totalResults"]
/ float(self.batch_size)
)
)
batch_number = 0
gs.verbose(
_(
"Downloading a total of {files} files in {batches} batches of data"
).format(
files=self.initial_search_result["properties"]["totalResults"],
batches=batches_n,
)
)
self.__construct_search_url(query_params)
self.__get_token()
self.requested_hrsi_product = product_metadata
next_batch = True
url = self.search_url
while next_batch:
batch_number = batch_number + 1
self.__get_token()
gs.verbose(
_("Downloading batch {n}: {url}").format(n=batch_number, url=url)
)
with request.urlopen(url) as req:
resp = req.read()
resp_dict = json.loads(resp)
Expand All @@ -409,7 +445,8 @@ def fetch_data(self, query_params, product_metadata):
if active_cores == 1:
result_dicts = [self._download_and_import_data(download_urls)]
elif active_cores > 1:
with Pool(active_cores) as pool:
# with Pool(active_cores) as pool:
with futures.ThreadPoolExecutor(active_cores) as pool:
result_dicts = pool.map(
self._download_and_import_data,
[
Expand Down Expand Up @@ -441,6 +478,7 @@ def fetch_data(self, query_params, product_metadata):
]
)
)
gs.percent(batch_number, batches_n, 1)

def __get_cryo_land_credits(self, credits_file=None):
"""
Expand Down Expand Up @@ -491,7 +529,7 @@ def _download_and_import_data(self, download_urls):
failed_downloads = []
# Download dict content
for download_url in download_urls:
gs.verbose(_("Downloading {}...").format(download_url))
# gs.verbose(_("Downloading {}...").format(download_url))
# Initialize request
hrsi_request = request.Request(download_url)
# Add token authorization
Expand Down Expand Up @@ -531,13 +569,13 @@ def _download_and_import_data(self, download_urls):
# temporal extend is not consistently represented in the
# metadata, so this part of the code is deactivated
if not self.requested_hrsi_product["time_pattern"]:
m = etree.fromstring(zip_data)
mdd = MD_Metadata(m)
metadata_xml = etree.fromstring(zip_data)
meta_data = MD_Metadata(metadata_xml)

# title = mdd.identification[0].title
# description = mdd.identification[0].abstract
start = mdd.identification[0].temporalextent_start
end = mdd.identification[0].temporalextent_end
# title = meta_data.identification[0].title
# description = meta_data.identification[0].abstract
start = meta_data.identification[0].temporalextent_start
end = meta_data.identification[0].temporalextent_end
if start == end:
end = None

Expand Down Expand Up @@ -587,6 +625,7 @@ def _download_and_import_data(self, download_urls):
],
recreate=self.recreate,
)
input_path = Path(input_path)
except Exception:
gdal_dataset = None
continue
Expand All @@ -605,10 +644,13 @@ def _download_and_import_data(self, download_urls):
import_mod.run()
except Exception:
gs.warning(
_("Could not import map {}").format(str(hrsi_file_path))
_("Could not import raster map {}").format(
str(hrsi_file_path)
)
)
hrsi_file_path.unlink()
continue

# Add categories if relevant
if self.requested_hrsi_product[sub_product]["categories"]:
Module(
Expand Down Expand Up @@ -783,7 +825,7 @@ def create_vrt(
band.SetMetadataItem("STATISTICS_MAXIMUM", str(data_range[1]))
vrt = None

return vrt_path
return str(vrt_path)


def align_bbox(bbox, resolution_x, resolution_y):
Expand Down Expand Up @@ -1450,6 +1492,10 @@ def main():
},
}

# Check batch_size input
if not 0 < int(options["batch_size"]) <= 2000:
gs.fatal(_("Invalid input for batch_size. Valid range is 1-2000."))

# Get GRASS GIS environment
gisenv = dict(gs.gisenv())

Expand All @@ -1474,6 +1520,8 @@ def main():
if flags["l"] or flags["f"]:
import_module = Module(
"r.external",
stdout_=PIPE,
stderr_=PIPE,
quiet=True,
overwrite=gs.overwrite(),
run_=False,
Expand All @@ -1482,6 +1530,8 @@ def main():
else:
import_module = Module(
"r.in.gdal",
stdout_=PIPE,
stderr_=PIPE,
quiet=True,
overwrite=gs.overwrite(),
run_=False,
Expand Down Expand Up @@ -1531,12 +1581,15 @@ def main():
# Make sure the query result does not change during download
query_params["publishedBefore"] = run_time.isoformat()

query_params["maxRecords"] = options["batch_size"]

# Create download object
clc_downloader = CLCCryoDownloader(
output_directory=options["output_directory"],
credits_file=options["credits_file"],
import_module=import_module,
category_module=category_module,
batch_size=options["batch_size"],
cores=int(options["nprocs"]),
)

Expand Down

0 comments on commit 10ecfc7

Please sign in to comment.