Skip to content

Commit

Permalink
Add better logging
Browse files Browse the repository at this point in the history
  • Loading branch information
jonaraphael committed Nov 9, 2023
1 parent b2dfbec commit b149f4e
Showing 1 changed file with 27 additions and 34 deletions.
61 changes: 27 additions & 34 deletions cerulean_cloud/cloud_run_orchestrator/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,24 +273,24 @@ async def _orchestrate(
):
# Orchestrate inference
start_time = datetime.now()
print(f"Orchestrating for sceneid {payload.sceneid}")
print(f"Start time: {start_time}")
print(f"{start_time}: Orchestrating for sceneid {payload.sceneid}")

async with DatabaseClient(db_engine) as db_client:
async with db_client.session.begin():
model = await db_client.get_model(os.getenv("MODEL"))
zoom = payload.zoom or model.zoom_level
scale = payload.scale or model.scale
print(f"zoom: {zoom}")
print(f"scale: {scale}")
print(f"{start_time}: zoom: {zoom}")
print(f"{start_time}: scale: {scale}")

if model.zoom_level != zoom:
print(
f"WARNING: Model was trained on zoom level {model.zoom_level} but is being run on {zoom}"
f"{start_time}: WARNING: Model was trained on zoom level {model.zoom_level} but is being run on {zoom}"
)
if model.tile_width_px != scale * 256:
print(
f"WARNING: Model was trained on image tile of resolution {model.tile_width_px} but is being run on {scale*256}"
f"{start_time}: WARNING: Model was trained on image tile of resolution {model.tile_width_px} but is being run on {scale*256}"
)

# WARNING: until this is resolved https://github.com/cogeotiff/rio-tiler-pds/issues/77
Expand All @@ -299,35 +299,27 @@ async def _orchestrate(
scene_bounds = await titiler_client.get_bounds(payload.sceneid)
scene_stats = await titiler_client.get_statistics(payload.sceneid, band="vv")
scene_info = await roda_sentinelhub_client.get_product_info(payload.sceneid)
print(f"scene_bounds: {scene_bounds}")
print(f"scene_stats: {scene_stats}")
print(f"scene_info: {scene_info}")
print(f"{start_time}: scene_bounds: {scene_bounds}")
print(f"{start_time}: scene_stats: {scene_stats}")
print(f"{start_time}: scene_info: {scene_info}")

base_tiles = list(tiler.tiles(*scene_bounds, [zoom], truncate=False))
base_tiles_bounds = [tiler.bounds(t) for t in base_tiles]
base_group_bounds = group_bounds_from_list_of_bounds(base_tiles_bounds)
print(f"base_group_bounds: {base_group_bounds}")
# base_tiles_bounds = [tiler.bounds(t) for t in base_tiles]
# base_group_bounds = group_bounds_from_list_of_bounds(base_tiles_bounds)

# tiling.py was updated to allow for offset_amount to be declared by offset_bounds_from_base_tiles(), see tiling.py line 61.
offset_tiles_bounds = offset_bounds_from_base_tiles(base_tiles, offset_amount=0.33)
offset_group_shape = offset_group_shape_from_base_tiles(base_tiles, scale=scale)
offset_group_bounds = group_bounds_from_list_of_bounds(offset_tiles_bounds)
print(f"Offset 1 offset_tiles_bounds {offset_tiles_bounds}")
print(f"Offset 1 image shape is {offset_group_shape}")
print(f"Offset 1 offset_group_bounds: {offset_group_bounds}")

print("START OF OFFSET #2")
offset_2_tiles_bounds = offset_bounds_from_base_tiles(
base_tiles, offset_amount=0.66
)
offset_2_group_shape = offset_group_shape_from_base_tiles(base_tiles, scale=scale)
offset_2_group_bounds = group_bounds_from_list_of_bounds(offset_2_tiles_bounds)
print(f"Offset 2 offset_tiles_bounds {offset_2_tiles_bounds}")
print(f"Offset 2 image shape is {offset_2_group_shape}")
print(f"Offset 2 offset_group_bounds: {offset_2_group_bounds}")
# offset_2_group_shape = offset_group_shape_from_base_tiles(base_tiles, scale=scale) # XXXC figure out where these should be used
# offset_2_group_bounds = group_bounds_from_list_of_bounds(offset_2_tiles_bounds) # XXXC figure out where these should be used

print(
f"Original tiles are {len(base_tiles)}, {len(offset_tiles_bounds)}, {len(offset_2_tiles_bounds)}"
f"{start_time}: Original tiles are {len(base_tiles)}, {len(offset_tiles_bounds)}, {len(offset_2_tiles_bounds)}"
)

# Filter out land tiles
Expand All @@ -340,8 +332,8 @@ async def _orchestrate(

ntiles = len(base_tiles)
noffsettiles = len(offset_tiles_bounds)
print(f"Preparing {ntiles} base tiles (no land).")
print(f"Preparing {noffsettiles} offset tiles (no land).")
print(f"{start_time}: Preparing {ntiles} base tiles (no land).")
print(f"{start_time}: Preparing {noffsettiles} offset tiles (no land).")

# write to DB
async with DatabaseClient(db_engine) as db_client:
Expand All @@ -363,7 +355,7 @@ async def _orchestrate(
)
)
print(
f"Deactivating {stale_slick_count} slicks from stale runs on {payload.sceneid}."
f"{start_time}: Deactivating {stale_slick_count} slicks from stale runs on {payload.sceneid}."
)
orchestrator_run = await db_client.add_orchestrator(
start_time,
Expand Down Expand Up @@ -394,7 +386,7 @@ async def _orchestrate(
if not payload.dry_run:
success = True
try:
print("Instantiating inference client.")
print(f"{start_time}: Instantiating inference client.")
cloud_run_inference = CloudRunInferenceClient(
url=os.getenv("INFERENCE_URL"),
titiler_client=titiler_client,
Expand Down Expand Up @@ -497,9 +489,9 @@ async def _orchestrate(

# XXXBUG ValueError: Cannot determine common CRS for concatenation inputs, got ['WGS 84 / UTM zone 28N', 'WGS 84 / UTM zone 29N']. Use `to_crs()` to transform geometries to the same CRS before merging."
# Example: S1A_IW_GRDH_1SDV_20230727T185101_20230727T185126_049613_05F744_1E56
print("XXXDEBUG out_fc", out_fc)
print("XXXDEBUG out_fc_offset", out_fc_offset)
print("XXXCDEBUG out_fc_offset2", out_fc_offset_2)
print(f"{start_time}: XXXDEBUG out_fc: {out_fc}")
print(f"{start_time}: XXXDEBUG out_fc_offset: {out_fc_offset}")
print(f"{start_time}: XXXDEBUG out_fc_offset_2: {out_fc_offset_2}")

merged_inferences = merge_inferences(
feature_collections=[out_fc, out_fc_offset, out_fc_offset_2],
Expand All @@ -512,7 +504,7 @@ async def _orchestrate(
async with db_client.session.begin():
LAND_MASK_BUFFER_M = 1000
print(
f"Removing all slicks within {LAND_MASK_BUFFER_M}m of land"
f"{start_time}: Removing all slicks within {LAND_MASK_BUFFER_M}m of land"
)
for feat in merged_inferences.get("features"):
buffered_gdf = gpd.GeoDataFrame(
Expand All @@ -538,20 +530,21 @@ async def _orchestrate(
feat.get("properties").get("inf_idx"),
feat.get("properties").get("machine_confidence"),
)
print(f"Added slick: {slick}")
print(f"{start_time}: Added slick: {slick}")

print("Queueing up Automatic AIS Analysis")
print(f"{start_time}: Queueing up Automatic AIS Analysis")
add_to_aaa_queue(sentinel1_grd.scene_id)

except Exception as e:
success = False
exc = e
print(f"{start_time}: {e}")
async with db_client.session.begin():
end_time = datetime.now()
orchestrator_run.success = success
orchestrator_run.inference_end_time = end_time
print(f"End time: {end_time}")
print("Returning results!")
print(f"{start_time}: End time: {end_time}")
print(f"{start_time}: Orchestration succes: {success}")
if success is False:
raise exc

Expand All @@ -560,5 +553,5 @@ async def _orchestrate(
del out_fc_offset
del out_fc_offset_2
else:
print("WARNING: Operating as a DRY RUN!!")
print(f"{start_time}: WARNING: Operating as a DRY RUN!!")
return "Success"

0 comments on commit b149f4e

Please sign in to comment.