diff --git a/cerulean_cloud/cloud_function_ais_analysis/main.py b/cerulean_cloud/cloud_function_ais_analysis/main.py index 7fb8e1e7..cf916ba1 100644 --- a/cerulean_cloud/cloud_function_ais_analysis/main.py +++ b/cerulean_cloud/cloud_function_ais_analysis/main.py @@ -64,6 +64,7 @@ async def handle_aaa_request(request): if len(slicks_without_sources) > 0: ais_constructor = AISConstructor(s1) ais_constructor.retrieve_ais() + # ais_constructor.add_infra() if ( ais_constructor.ais_gdf is not None and not ais_constructor.ais_gdf.empty @@ -96,6 +97,8 @@ async def handle_aaa_request(request): st_name=traj["st_name"] ) if source is None: + # if type 1: [column list] + # if type 2: [different column list] source = await db_client.insert_source( st_name=traj["st_name"], source_type=1, # XXX This will need to be dynamic for SSS diff --git a/cerulean_cloud/cloud_run_orchestrator/handler.py b/cerulean_cloud/cloud_run_orchestrator/handler.py index a8b1e35a..028877a4 100644 --- a/cerulean_cloud/cloud_run_orchestrator/handler.py +++ b/cerulean_cloud/cloud_run_orchestrator/handler.py @@ -9,6 +9,7 @@ - INFERENCE_URL """ import asyncio +import logging import os import urllib.parse as urlparse from base64 import b64decode # , b64encode @@ -229,25 +230,27 @@ def flatten_feature_list( async def _orchestrate( payload, tiler, titiler_client, roda_sentinelhub_client, db_engine ): + logging.basicConfig(level=logging.INFO) + # Orchestrate inference start_time = datetime.now() - print(f"Orchestrating for sceneid {payload.sceneid}") - print(f"Start time: {start_time}") + logging.info(f"Orchestrating for sceneid {payload.sceneid}") + logging.info(f"Start time: {start_time}") async with DatabaseClient(db_engine) as db_client: async with db_client.session.begin(): model = await db_client.get_model(os.getenv("MODEL")) zoom = payload.zoom or model.zoom_level scale = payload.scale or model.scale - print(f"zoom: {zoom}") - print(f"scale: {scale}") + logging.info(f"zoom: {zoom}") + logging.info(f"scale: {scale}") if model.zoom_level != zoom: - print( + logging.info( f"WARNING: Model was trained on zoom level {model.zoom_level} but is being run on {zoom}" ) if model.tile_width_px != scale * 256: - print( + logging.info( f"WARNING: Model was trained on image tile of resolution {model.tile_width_px} but is being run on {scale*256}" ) @@ -257,22 +260,22 @@ async def _orchestrate( scene_bounds = await titiler_client.get_bounds(payload.sceneid) scene_stats = await titiler_client.get_statistics(payload.sceneid, band="vv") scene_info = await roda_sentinelhub_client.get_product_info(payload.sceneid) - print(f"scene_bounds: {scene_bounds}") - print(f"scene_stats: {scene_stats}") - print(f"scene_info: {scene_info}") + logging.info(f"scene_bounds: {scene_bounds}") + logging.info(f"scene_stats: {scene_stats}") + logging.info(f"scene_info: {scene_info}") base_tiles = list(tiler.tiles(*scene_bounds, [zoom], truncate=False)) base_tiles_bounds = [tiler.bounds(t) for t in base_tiles] base_group_bounds = group_bounds_from_list_of_bounds(base_tiles_bounds) - print(f"base_group_bounds: {base_group_bounds}") + logging.info(f"base_group_bounds: {base_group_bounds}") offset_tiles_bounds = offset_bounds_from_base_tiles(base_tiles) offset_group_shape = offset_group_shape_from_base_tiles(base_tiles, scale=scale) offset_group_bounds = group_bounds_from_list_of_bounds(offset_tiles_bounds) - print(f"Offset image shape is {offset_group_shape}") - print(f"offset_group_bounds: {offset_group_bounds}") + logging.info(f"Offset image shape is {offset_group_shape}") + logging.info(f"offset_group_bounds: {offset_group_bounds}") - print(f"Original tiles are {len(base_tiles)}, {len(offset_tiles_bounds)}") + logging.info(f"Original tiles are {len(base_tiles)}, {len(offset_tiles_bounds)}") # Filter out land tiles # XXXBUG is_tile_over_water throws ValueError if the scene crosses or is close to the antimeridian. Example: S1A_IW_GRDH_1SDV_20230726T183302_20230726T183327_049598_05F6CA_31E7 @@ -282,8 +285,8 @@ async def _orchestrate( ntiles = len(base_tiles) noffsettiles = len(offset_tiles_bounds) - print(f"Preparing {ntiles} base tiles (no land).") - print(f"Preparing {noffsettiles} offset tiles (no land).") + logging.info(f"Preparing {ntiles} base tiles (no land).") + logging.info(f"Preparing {noffsettiles} offset tiles (no land).") # write to DB async with DatabaseClient(db_engine) as db_client: @@ -301,7 +304,7 @@ async def _orchestrate( ) orchestrator_run = await db_client.add_orchestrator( start_time, - start_time, + start_time, # XXX This should be updated at end of orchestrator run ntiles, noffsettiles, os.getenv("GIT_HASH"), @@ -326,7 +329,7 @@ async def _orchestrate( } if not payload.dry_run: - print("Instantiating inference client.") + logging.info("Instantiating inference client.") cloud_run_inference = CloudRunInferenceClient( url=os.getenv("INFERENCE_URL"), titiler_client=titiler_client, @@ -338,7 +341,7 @@ async def _orchestrate( inference_parms=inference_parms, ) - print("Inference on base tiles!") + logging.info("Inference on base tiles!") base_tile_semaphore = asyncio.Semaphore(value=20) base_tiles_inference = await asyncio.gather( *[ @@ -352,7 +355,7 @@ async def _orchestrate( return_exceptions=True, ) - print("Inference on offset tiles!") + logging.info("Inference on offset tiles!") offset_tile_semaphore = asyncio.Semaphore(value=20) offset_tiles_inference = await asyncio.gather( *[ @@ -367,7 +370,7 @@ async def _orchestrate( ) if base_tiles_inference[0].stack[0].dict().get("classes"): - print("Loading all tiles into memory for merge!") + logging.info("Loading all tiles into memory for merge!") ds_base_tiles = [] for base_tile_inference in base_tiles_inference: ds_base_tiles.append( @@ -386,7 +389,7 @@ async def _orchestrate( ] ) - print("Merging base tiles!") + logging.info("Merging base tiles!") base_tile_inference_file = MemoryFile() ar, transform = merge(ds_base_tiles) with base_tile_inference_file.open( @@ -402,7 +405,7 @@ async def _orchestrate( out_fc = get_fc_from_raster(base_tile_inference_file) - print("Merging offset tiles!") + logging.info("Merging offset tiles!") offset_tile_inference_file = MemoryFile() ar, transform = merge(ds_offset_tiles) with offset_tile_inference_file.open( @@ -419,17 +422,30 @@ async def _orchestrate( out_fc_offset = get_fc_from_raster(offset_tile_inference_file) else: - out_fc = geojson.FeatureCollection( - features=flatten_feature_list(base_tiles_inference) - ) - out_fc_offset = geojson.FeatureCollection( - features=flatten_feature_list(offset_tiles_inference) - ) + try: + out_fc = geojson.FeatureCollection( + features=flatten_feature_list(base_tiles_inference) + ) + out_fc_offset = geojson.FeatureCollection( + features=flatten_feature_list(offset_tiles_inference) + ) + except AttributeError as e: + logging.info(f"YYY error details: {e}") + logging.info(f"YYY base_tiles_inference: {base_tiles_inference}") + logging.info( + f"YYY offset_tiles_inference: {offset_tiles_inference}" + ) + logging.info( + f"YYY [r for r in base_tiles_inference]: {[r for r in base_tiles_inference]}" + ) + logging.info( + f"YYY [r for r in offset_tiles_inference]: {[r for r in offset_tiles_inference]}" + ) # XXXBUG ValueError: Cannot determine common CRS for concatenation inputs, got ['WGS 84 / UTM zone 28N', 'WGS 84 / UTM zone 29N']. Use `to_crs()` to transform geometries to the same CRS before merging." # Example: S1A_IW_GRDH_1SDV_20230727T185101_20230727T185126_049613_05F744_1E56 - print("XXXDEBUG out_fc", out_fc) - print("XXXDEBUG out_fc_offset", out_fc_offset) + logging.info(f"XXXDEBUG out_fc: {out_fc}") + logging.info(f"XXXDEBUG out_fc_offset: {out_fc_offset}") merged_inferences = merge_inferences( out_fc, out_fc_offset, @@ -440,7 +456,21 @@ async def _orchestrate( ) for feat in merged_inferences.get("features"): + try: + logging.info(f"XXX CHRISTIAN type(feat): {type(feat)}") + logging.info( + f"XXX CHRISTIAN type(feat['geometry']): {type(feat['geometry'])}" + ) + logging.info(f"XXX CHRISTIAN feat['geometry']: {feat['geometry']}") + logging.info( + f"XXX CHRISTIAN geojson.dumps(feat['geometry']): {geojson.dumps(feat['geometry'])}" + ) + except: # noqa + pass async with db_client.session.begin(): + # mini_gdf = gpd.GeoDataframe(feat) + # if mini_gdf.intersects(land): + # feat.set("properties").set("inf_idx") = "model.background" (most often 0) slick = await db_client.add_slick( orchestrator_run, sentinel1_grd.start_time, @@ -448,13 +478,14 @@ async def _orchestrate( feat.get("properties").get("inf_idx"), feat.get("properties").get("machine_confidence"), ) - print(f"Added slick {slick}") + logging.info(f"Added slick: {slick}") + if merged_inferences.get("features"): add_to_aaa_queue(sentinel1_grd.scene_id) end_time = datetime.now() - print(f"End time: {end_time}") - print("Returning results!") + logging.info(f"End time: {end_time}") + logging.info("Returning results!") async with db_client.session.begin(): orchestrator_run.success = True @@ -468,7 +499,7 @@ async def _orchestrate( noffsettiles=noffsettiles, ) else: - print("DRY RUN!!") + logging.info("DRY RUN!!") orchestrator_result = OrchestratorResult( classification_base=geojson.FeatureCollection(features=[]), classification_offset=geojson.FeatureCollection(features=[]), diff --git a/stack/cloud_run_offset_tile.py b/stack/cloud_run_offset_tile.py index 8e721d40..26a75152 100644 --- a/stack/cloud_run_offset_tile.py +++ b/stack/cloud_run_offset_tile.py @@ -26,7 +26,7 @@ value=pulumi.Config("cerulean-cloud").require("apikey"), ), ], - resources=dict(limits=dict(memory="6Gi", cpu="2000m")), + resources=dict(limits=dict(memory="8Gi", cpu="2000m")), ), ], container_concurrency=3,