From b149f4e895c3eb664735a3488e28d25af8977f36 Mon Sep 17 00:00:00 2001 From: Jona Date: Wed, 8 Nov 2023 21:48:36 -0500 Subject: [PATCH] Add better logging --- .../cloud_run_orchestrator/handler.py | 61 ++++++++----------- 1 file changed, 27 insertions(+), 34 deletions(-) diff --git a/cerulean_cloud/cloud_run_orchestrator/handler.py b/cerulean_cloud/cloud_run_orchestrator/handler.py index 7f919d7a..eeeb24ef 100644 --- a/cerulean_cloud/cloud_run_orchestrator/handler.py +++ b/cerulean_cloud/cloud_run_orchestrator/handler.py @@ -273,24 +273,24 @@ async def _orchestrate( ): # Orchestrate inference start_time = datetime.now() - print(f"Orchestrating for sceneid {payload.sceneid}") print(f"Start time: {start_time}") + print(f"{start_time}: Orchestrating for sceneid {payload.sceneid}") async with DatabaseClient(db_engine) as db_client: async with db_client.session.begin(): model = await db_client.get_model(os.getenv("MODEL")) zoom = payload.zoom or model.zoom_level scale = payload.scale or model.scale - print(f"zoom: {zoom}") - print(f"scale: {scale}") + print(f"{start_time}: zoom: {zoom}") + print(f"{start_time}: scale: {scale}") if model.zoom_level != zoom: print( - f"WARNING: Model was trained on zoom level {model.zoom_level} but is being run on {zoom}" + f"{start_time}: WARNING: Model was trained on zoom level {model.zoom_level} but is being run on {zoom}" ) if model.tile_width_px != scale * 256: print( - f"WARNING: Model was trained on image tile of resolution {model.tile_width_px} but is being run on {scale*256}" + f"{start_time}: WARNING: Model was trained on image tile of resolution {model.tile_width_px} but is being run on {scale*256}" ) # WARNING: until this is resolved https://github.com/cogeotiff/rio-tiler-pds/issues/77 @@ -299,35 +299,27 @@ async def _orchestrate( scene_bounds = await titiler_client.get_bounds(payload.sceneid) scene_stats = await titiler_client.get_statistics(payload.sceneid, band="vv") scene_info = await roda_sentinelhub_client.get_product_info(payload.sceneid) - print(f"scene_bounds: {scene_bounds}") - print(f"scene_stats: {scene_stats}") - print(f"scene_info: {scene_info}") + print(f"{start_time}: scene_bounds: {scene_bounds}") + print(f"{start_time}: scene_stats: {scene_stats}") + print(f"{start_time}: scene_info: {scene_info}") base_tiles = list(tiler.tiles(*scene_bounds, [zoom], truncate=False)) - base_tiles_bounds = [tiler.bounds(t) for t in base_tiles] - base_group_bounds = group_bounds_from_list_of_bounds(base_tiles_bounds) - print(f"base_group_bounds: {base_group_bounds}") + # base_tiles_bounds = [tiler.bounds(t) for t in base_tiles] + # base_group_bounds = group_bounds_from_list_of_bounds(base_tiles_bounds) # tiling.py was updated to allow for offset_amount to be declared by offset_bounds_from_base_tiles(), see tiling.py line 61. offset_tiles_bounds = offset_bounds_from_base_tiles(base_tiles, offset_amount=0.33) offset_group_shape = offset_group_shape_from_base_tiles(base_tiles, scale=scale) offset_group_bounds = group_bounds_from_list_of_bounds(offset_tiles_bounds) - print(f"Offset 1 offset_tiles_bounds {offset_tiles_bounds}") - print(f"Offset 1 image shape is {offset_group_shape}") - print(f"Offset 1 offset_group_bounds: {offset_group_bounds}") - print("START OF OFFSET #2") offset_2_tiles_bounds = offset_bounds_from_base_tiles( base_tiles, offset_amount=0.66 ) - offset_2_group_shape = offset_group_shape_from_base_tiles(base_tiles, scale=scale) - offset_2_group_bounds = group_bounds_from_list_of_bounds(offset_2_tiles_bounds) - print(f"Offset 2 offset_tiles_bounds {offset_2_tiles_bounds}") - print(f"Offset 2 image shape is {offset_2_group_shape}") - print(f"Offset 2 offset_group_bounds: {offset_2_group_bounds}") + # offset_2_group_shape = offset_group_shape_from_base_tiles(base_tiles, scale=scale) # XXXC figure out where these should be used + # offset_2_group_bounds = group_bounds_from_list_of_bounds(offset_2_tiles_bounds) # XXXC figure out where these should be used print( - f"Original tiles are {len(base_tiles)}, {len(offset_tiles_bounds)}, {len(offset_2_tiles_bounds)}" + f"{start_time}: Original tiles are {len(base_tiles)}, {len(offset_tiles_bounds)}, {len(offset_2_tiles_bounds)}" ) # Filter out land tiles @@ -340,8 +332,8 @@ async def _orchestrate( ntiles = len(base_tiles) noffsettiles = len(offset_tiles_bounds) - print(f"Preparing {ntiles} base tiles (no land).") - print(f"Preparing {noffsettiles} offset tiles (no land).") + print(f"{start_time}: Preparing {ntiles} base tiles (no land).") + print(f"{start_time}: Preparing {noffsettiles} offset tiles (no land).") # write to DB async with DatabaseClient(db_engine) as db_client: @@ -363,7 +355,7 @@ async def _orchestrate( ) ) print( - f"Deactivating {stale_slick_count} slicks from stale runs on {payload.sceneid}." + f"{start_time}: Deactivating {stale_slick_count} slicks from stale runs on {payload.sceneid}." ) orchestrator_run = await db_client.add_orchestrator( start_time, @@ -394,7 +386,7 @@ async def _orchestrate( if not payload.dry_run: success = True try: - print("Instantiating inference client.") + print(f"{start_time}: Instantiating inference client.") cloud_run_inference = CloudRunInferenceClient( url=os.getenv("INFERENCE_URL"), titiler_client=titiler_client, @@ -497,9 +489,9 @@ async def _orchestrate( # XXXBUG ValueError: Cannot determine common CRS for concatenation inputs, got ['WGS 84 / UTM zone 28N', 'WGS 84 / UTM zone 29N']. Use `to_crs()` to transform geometries to the same CRS before merging." # Example: S1A_IW_GRDH_1SDV_20230727T185101_20230727T185126_049613_05F744_1E56 - print("XXXDEBUG out_fc", out_fc) - print("XXXDEBUG out_fc_offset", out_fc_offset) - print("XXXCDEBUG out_fc_offset2", out_fc_offset_2) + print(f"{start_time}: XXXDEBUG out_fc: {out_fc}") + print(f"{start_time}: XXXDEBUG out_fc_offset: {out_fc_offset}") + print(f"{start_time}: XXXDEBUG out_fc_offset_2: {out_fc_offset_2}") merged_inferences = merge_inferences( feature_collections=[out_fc, out_fc_offset, out_fc_offset_2], @@ -512,7 +504,7 @@ async def _orchestrate( async with db_client.session.begin(): LAND_MASK_BUFFER_M = 1000 print( - f"Removing all slicks within {LAND_MASK_BUFFER_M}m of land" + f"{start_time}: Removing all slicks within {LAND_MASK_BUFFER_M}m of land" ) for feat in merged_inferences.get("features"): buffered_gdf = gpd.GeoDataFrame( @@ -538,20 +530,21 @@ async def _orchestrate( feat.get("properties").get("inf_idx"), feat.get("properties").get("machine_confidence"), ) - print(f"Added slick: {slick}") + print(f"{start_time}: Added slick: {slick}") - print("Queueing up Automatic AIS Analysis") + print(f"{start_time}: Queueing up Automatic AIS Analysis") add_to_aaa_queue(sentinel1_grd.scene_id) except Exception as e: success = False exc = e + print(f"{start_time}: {e}") async with db_client.session.begin(): end_time = datetime.now() orchestrator_run.success = success orchestrator_run.inference_end_time = end_time - print(f"End time: {end_time}") - print("Returning results!") + print(f"{start_time}: End time: {end_time}") + print(f"{start_time}: Orchestration succes: {success}") if success is False: raise exc @@ -560,5 +553,5 @@ async def _orchestrate( del out_fc_offset del out_fc_offset_2 else: - print("WARNING: Operating as a DRY RUN!!") + print(f"{start_time}: WARNING: Operating as a DRY RUN!!") return "Success"