Skip to content

Commit

Permalink
fix logging, update progress bar,simplify retry
Browse files Browse the repository at this point in the history
  • Loading branch information
2320sharon committed Nov 15, 2023
1 parent f332ac0 commit b90732e
Showing 1 changed file with 72 additions and 50 deletions.
122 changes: 72 additions & 50 deletions src/coastsat/SDS_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,44 +84,42 @@ class TooManyRequests(Exception):
pass


def retry_deprecated(func):
@wraps(func)
def wrapper(*args, **kwargs):
max_tries = 3
# Get image_id from kwargs or use 'Unknown'
image_id = kwargs.get("image_id", "Unknown image id")
logger = kwargs.get("logger", None)
delay = 1
backoff = 2
tries, delay_multiplier = max_tries, 1
# attempt to download the image up to max_tries times
for tries in range(max_tries):
try:
print(f"calling {func.__name__}")
return func(*args, **kwargs)
except Exception as e:
print(print("CLASS", type(e)))
if logger:
logger.warning(
f"Retry {tries + 1}/{max_tries} for function {func.__name__} with image_id {kwargs.get('image_id', 'N/A')} due to {e}"
)
if tries + 1 < max_tries:
# wait with exponential backoff
print(
f"Retry {tries + 1}/{max_tries} for function {func.__name__} with image_id {kwargs.get('image_id', 'N/A')} due to {type(e).__name__} error."
)
time.sleep(delay)
# delay_multiplier *= backoff
else:
# Re-raise the last exception if max retries have been exceeded )(i.e. no more retries)
print(
f"Max retries {tries + 1}/{max_tries} exceeded for {func.__name__} due to {type(e).__name__}"
)
raise TooManyRequests(
f"Failed to process {image_id} after {max_tries} attempts due to: {e}"
)

return wrapper
# def retry_deprecated(func):
# @wraps(func)
# def wrapper(*args, **kwargs):
# max_tries = 3
# # Get image_id from kwargs or use 'Unknown'
# image_id = kwargs.get("image_id", "Unknown image id")
# logger = kwargs.get("logger", None)
# delay = 1
# # attempt to download the image up to max_tries times
# for tries in range(max_tries):
# try:
# print(f"calling {func.__name__}")
# return func(*args, **kwargs)
# except Exception as e:
# print(print("CLASS", type(e)))
# if logger:
# logger.warning(
# f"Retry {tries + 1}/{max_tries} for function {func.__name__} with image_id {kwargs.get('image_id', 'N/A')} due to {e}"
# )
# if tries + 1 < max_tries:
# # wait with exponential backoff
# print(
# f"Retry {tries + 1}/{max_tries} for function {func.__name__} with image_id {kwargs.get('image_id', 'N/A')} due to {type(e).__name__} error."
# )
# time.sleep(delay)
# # delay_multiplier *= backoff
# else:
# # Re-raise the last exception if max retries have been exceeded )(i.e. no more retries)
# print(
# f"Max retries {tries + 1}/{max_tries} exceeded for {func.__name__} due to {type(e).__name__}"
# )
# raise TooManyRequests(
# f"Failed to process {image_id} after {max_tries} attempts due to: {e}"
# )

# return wrapper


import functools # retry v2
Expand All @@ -130,17 +128,28 @@ def wrapper(*args, **kwargs):
def retry(func):
@functools.wraps(func)
def wrapper_retry(*args, **kwargs):
# Get image_id from kwargs or use 'Unknown'
image_id = kwargs.get("image_id", "Unknown image id")
logger = kwargs.get("logger", None)
max_attempts = 3
for attempt in range(max_attempts):
try:
# first attempt does not wait
if attempt > 0:
time.sleep(1)
return func(*args, **kwargs)
except Exception as e:
print(
f"Attempt {attempt+1}/{max_attempts} failed with error: {type(e).__name__}"
)
if logger:
logger.warning(
f"Retry {attempt + 1}/{max_attempts} for function {func.__name__} with image_id {kwargs.get('image_id', 'N/A')} due to {e}"
)
if attempt == max_attempts - 1:
print(
f"Max retries {attempt+1}/{max_attempts} exceeded for {func.__name__} due to {type(e).__name__}"
)
raise

return wrapper_retry
Expand Down Expand Up @@ -468,7 +477,7 @@ def retrieve_images(
# first delete dimensions key from dictionary
# otherwise the entire image is extracted (don't know why)
pbar.set_description_str(
desc=f"{satname}: Loading bands for {i} image ", refresh=True
desc=f"{satname}: Loading bands for {i}th image ", refresh=True
)
im_bands = remove_dimensions_from_bands(
image_ee, image_id=im_meta["id"], logger=logger
Expand All @@ -488,13 +497,16 @@ def retrieve_images(
]
# adjust polygon to match image coordinates so that there is no resampling
proj = image_ee.select("B1").projection()

pbar.set_description_str(
desc=f"{satname}: adjusting polygon {i}th image ", refresh=True
)
ee_region = adjust_polygon(
inputs["polygon"], proj, image_id=im_meta["id"], logger=logger
)
# download .tif from EE (one file with ms bands and one file with QA band)
pbar.set_description_str(
desc=f"{satname}: Downloading tif for {i} image ", refresh=True
desc=f"{satname}: Downloading tif for {i}th image ",
refresh=True,
)
fn_ms, fn_QA = download_tif(
image_ee,
Expand Down Expand Up @@ -535,6 +547,9 @@ def retrieve_images(
fn_in = fn_ms
fn_target = fn_ms
fn_out = os.path.join(fp_ms, im_fn["ms"])
pbar.set_description_str(
desc=f"{satname}: Transforming {i}th image ", refresh=True
)
warp_image_to_target(
fn_in,
fn_out,
Expand Down Expand Up @@ -599,7 +614,7 @@ def retrieve_images(
proj_ms = image_ee.select("B1").projection()
proj_pan = image_ee.select("B8").projection()
pbar.set_description_str(
desc=f"{satname}: adjusting polygon {i} image ", refresh=True
desc=f"{satname}: adjusting polygon {i}th image ", refresh=True
)
ee_region_ms = adjust_polygon(
inputs["polygon"],
Expand Down Expand Up @@ -649,7 +664,7 @@ def retrieve_images(
)
# if multiple images taken at the same date add 'dupX' to the name (duplicate number X)
pbar.set_description_str(
desc=f"{satname}: remove duplicates for {i} image ",
desc=f"{satname}: remove duplicates for {i}th image ",
refresh=True,
)
im_fn = handle_duplicate_image_names(
Expand All @@ -670,7 +685,7 @@ def retrieve_images(
fn_target = fn_pan
fn_out = os.path.join(fp_ms, im_fn["ms"])
pbar.set_description_str(
desc=f"{satname}: transforming {i} image ", refresh=True
desc=f"{satname}: Transforming {i}th image ", refresh=True
)
warp_image_to_target(
fn_in,
Expand Down Expand Up @@ -734,6 +749,9 @@ def retrieve_images(
proj_ms = image_ee.select("B1").projection()
proj_swir = image_ee.select("B11").projection()
proj_mask = image_ee.select("QA60").projection()
pbar.set_description_str(
desc=f"{satname}: adjusting polygon {i}th image ", refresh=True
)
ee_region_ms = adjust_polygon(
inputs["polygon"],
proj_ms,
Expand All @@ -753,6 +771,9 @@ def retrieve_images(
logger=logger,
)
# download the ms, swir and QA bands from EE
pbar.set_description_str(
desc=f"{satname}: Downloading tif for {i} image ", refresh=True
)
fn_ms = download_tif(
image_ee,
ee_region_ms,
Expand Down Expand Up @@ -794,6 +815,10 @@ def retrieve_images(
+ suffix
)
# if multiple images taken at the same date add 'dupX' to the name (duplicate)
pbar.set_description_str(
desc=f"{satname}: remove duplicates for {i}th image ",
refresh=True,
)
im_fn = handle_duplicate_image_names(
all_names,
bands,
Expand All @@ -810,6 +835,9 @@ def retrieve_images(
fn_in = fn_swir
fn_target = fn_ms
fn_out = os.path.join(fp_swir, im_fn["swir"])
pbar.set_description_str(
desc=f"{satname}: Transforming {i}th image ", refresh=True
)
warp_image_to_target(
fn_in,
fn_out,
Expand Down Expand Up @@ -1351,8 +1379,6 @@ def adjust_polygon(polygon, proj, **kwargs):
# adjust polygon to match image coordinates so that there is no resampling
polygon_ee = ee.Geometry.Polygon(polygon)
# convert polygon to image coordinates
print("making the list")

polygon_coords = np.array(
ee.List(polygon_ee.transform(proj, 1).coordinates().get(0)).getInfo()
)
Expand Down Expand Up @@ -1412,10 +1438,7 @@ def download_tif(
)
# for the newer versions of ee
else:
print("downloading id")
# time.sleep(3)
# crop and download

download_id = ee.data.getDownloadId(
{
"image": image,
Expand All @@ -1425,7 +1448,6 @@ def download_tif(
"name": "image",
}
)
print(f"done downloading id")
response = requests.get(
ee.data.makeDownloadUrl(download_id),
timeout=(30, 30), # 30 seconds to connect, 30 seconds to read
Expand Down

0 comments on commit b90732e

Please sign in to comment.