Skip to content

Commit

Permalink
Add some debugging code
Browse files Browse the repository at this point in the history
  • Loading branch information
jonaraphael committed Oct 27, 2023
1 parent a64c811 commit 927ebb7
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 35 deletions.
3 changes: 3 additions & 0 deletions cerulean_cloud/cloud_function_ais_analysis/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ async def handle_aaa_request(request):
if len(slicks_without_sources) > 0:
ais_constructor = AISConstructor(s1)
ais_constructor.retrieve_ais()
# ais_constructor.add_infra()
if (
ais_constructor.ais_gdf is not None
and not ais_constructor.ais_gdf.empty
Expand Down Expand Up @@ -96,6 +97,8 @@ async def handle_aaa_request(request):
st_name=traj["st_name"]
)
if source is None:
# if type 1: [column list]
# if type 2: [different column list]
source = await db_client.insert_source(
st_name=traj["st_name"],
source_type=1, # XXX This will need to be dynamic for SSS
Expand Down
99 changes: 65 additions & 34 deletions cerulean_cloud/cloud_run_orchestrator/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- INFERENCE_URL
"""
import asyncio
import logging
import os
import urllib.parse as urlparse
from base64 import b64decode # , b64encode
Expand Down Expand Up @@ -229,25 +230,27 @@ def flatten_feature_list(
async def _orchestrate(
payload, tiler, titiler_client, roda_sentinelhub_client, db_engine
):
logging.basicConfig(level=logging.INFO)

# Orchestrate inference
start_time = datetime.now()
print(f"Orchestrating for sceneid {payload.sceneid}")
print(f"Start time: {start_time}")
logging.info(f"Orchestrating for sceneid {payload.sceneid}")
logging.info(f"Start time: {start_time}")

async with DatabaseClient(db_engine) as db_client:
async with db_client.session.begin():
model = await db_client.get_model(os.getenv("MODEL"))
zoom = payload.zoom or model.zoom_level
scale = payload.scale or model.scale
print(f"zoom: {zoom}")
print(f"scale: {scale}")
logging.info(f"zoom: {zoom}")
logging.info(f"scale: {scale}")

if model.zoom_level != zoom:
print(
logging.info(
f"WARNING: Model was trained on zoom level {model.zoom_level} but is being run on {zoom}"
)
if model.tile_width_px != scale * 256:
print(
logging.info(
f"WARNING: Model was trained on image tile of resolution {model.tile_width_px} but is being run on {scale*256}"
)

Expand All @@ -257,22 +260,22 @@ async def _orchestrate(
scene_bounds = await titiler_client.get_bounds(payload.sceneid)
scene_stats = await titiler_client.get_statistics(payload.sceneid, band="vv")
scene_info = await roda_sentinelhub_client.get_product_info(payload.sceneid)
print(f"scene_bounds: {scene_bounds}")
print(f"scene_stats: {scene_stats}")
print(f"scene_info: {scene_info}")
logging.info(f"scene_bounds: {scene_bounds}")
logging.info(f"scene_stats: {scene_stats}")
logging.info(f"scene_info: {scene_info}")

base_tiles = list(tiler.tiles(*scene_bounds, [zoom], truncate=False))
base_tiles_bounds = [tiler.bounds(t) for t in base_tiles]
base_group_bounds = group_bounds_from_list_of_bounds(base_tiles_bounds)
print(f"base_group_bounds: {base_group_bounds}")
logging.info(f"base_group_bounds: {base_group_bounds}")

offset_tiles_bounds = offset_bounds_from_base_tiles(base_tiles)
offset_group_shape = offset_group_shape_from_base_tiles(base_tiles, scale=scale)
offset_group_bounds = group_bounds_from_list_of_bounds(offset_tiles_bounds)
print(f"Offset image shape is {offset_group_shape}")
print(f"offset_group_bounds: {offset_group_bounds}")
logging.info(f"Offset image shape is {offset_group_shape}")
logging.info(f"offset_group_bounds: {offset_group_bounds}")

print(f"Original tiles are {len(base_tiles)}, {len(offset_tiles_bounds)}")
logging.info(f"Original tiles are {len(base_tiles)}, {len(offset_tiles_bounds)}")

# Filter out land tiles
# XXXBUG is_tile_over_water throws ValueError if the scene crosses or is close to the antimeridian. Example: S1A_IW_GRDH_1SDV_20230726T183302_20230726T183327_049598_05F6CA_31E7
Expand All @@ -282,8 +285,8 @@ async def _orchestrate(

ntiles = len(base_tiles)
noffsettiles = len(offset_tiles_bounds)
print(f"Preparing {ntiles} base tiles (no land).")
print(f"Preparing {noffsettiles} offset tiles (no land).")
logging.info(f"Preparing {ntiles} base tiles (no land).")
logging.info(f"Preparing {noffsettiles} offset tiles (no land).")

# write to DB
async with DatabaseClient(db_engine) as db_client:
Expand All @@ -301,7 +304,7 @@ async def _orchestrate(
)
orchestrator_run = await db_client.add_orchestrator(
start_time,
start_time,
start_time, # XXX This should be updated at end of orchestrator run
ntiles,
noffsettiles,
os.getenv("GIT_HASH"),
Expand All @@ -326,7 +329,7 @@ async def _orchestrate(
}

if not payload.dry_run:
print("Instantiating inference client.")
logging.info("Instantiating inference client.")
cloud_run_inference = CloudRunInferenceClient(
url=os.getenv("INFERENCE_URL"),
titiler_client=titiler_client,
Expand All @@ -338,7 +341,7 @@ async def _orchestrate(
inference_parms=inference_parms,
)

print("Inference on base tiles!")
logging.info("Inference on base tiles!")
base_tile_semaphore = asyncio.Semaphore(value=20)
base_tiles_inference = await asyncio.gather(
*[
Expand All @@ -352,7 +355,7 @@ async def _orchestrate(
return_exceptions=True,
)

print("Inference on offset tiles!")
logging.info("Inference on offset tiles!")
offset_tile_semaphore = asyncio.Semaphore(value=20)
offset_tiles_inference = await asyncio.gather(
*[
Expand All @@ -367,7 +370,7 @@ async def _orchestrate(
)

if base_tiles_inference[0].stack[0].dict().get("classes"):
print("Loading all tiles into memory for merge!")
logging.info("Loading all tiles into memory for merge!")
ds_base_tiles = []
for base_tile_inference in base_tiles_inference:
ds_base_tiles.append(
Expand All @@ -386,7 +389,7 @@ async def _orchestrate(
]
)

print("Merging base tiles!")
logging.info("Merging base tiles!")
base_tile_inference_file = MemoryFile()
ar, transform = merge(ds_base_tiles)
with base_tile_inference_file.open(
Expand All @@ -402,7 +405,7 @@ async def _orchestrate(

out_fc = get_fc_from_raster(base_tile_inference_file)

print("Merging offset tiles!")
logging.info("Merging offset tiles!")
offset_tile_inference_file = MemoryFile()
ar, transform = merge(ds_offset_tiles)
with offset_tile_inference_file.open(
Expand All @@ -419,17 +422,30 @@ async def _orchestrate(
out_fc_offset = get_fc_from_raster(offset_tile_inference_file)

else:
out_fc = geojson.FeatureCollection(
features=flatten_feature_list(base_tiles_inference)
)
out_fc_offset = geojson.FeatureCollection(
features=flatten_feature_list(offset_tiles_inference)
)
try:
out_fc = geojson.FeatureCollection(
features=flatten_feature_list(base_tiles_inference)
)
out_fc_offset = geojson.FeatureCollection(
features=flatten_feature_list(offset_tiles_inference)
)
except AttributeError as e:
logging.info(f"YYY error details: {e}")
logging.info(f"YYY base_tiles_inference: {base_tiles_inference}")
logging.info(
f"YYY offset_tiles_inference: {offset_tiles_inference}"
)
logging.info(
f"YYY [r for r in base_tiles_inference]: {[r for r in base_tiles_inference]}"
)
logging.info(
f"YYY [r for r in offset_tiles_inference]: {[r for r in offset_tiles_inference]}"
)

# XXXBUG ValueError: Cannot determine common CRS for concatenation inputs, got ['WGS 84 / UTM zone 28N', 'WGS 84 / UTM zone 29N']. Use `to_crs()` to transform geometries to the same CRS before merging."
# Example: S1A_IW_GRDH_1SDV_20230727T185101_20230727T185126_049613_05F744_1E56
print("XXXDEBUG out_fc", out_fc)
print("XXXDEBUG out_fc_offset", out_fc_offset)
logging.info(f"XXXDEBUG out_fc: {out_fc}")
logging.info(f"XXXDEBUG out_fc_offset: {out_fc_offset}")
merged_inferences = merge_inferences(
out_fc,
out_fc_offset,
Expand All @@ -440,21 +456,36 @@ async def _orchestrate(
)

for feat in merged_inferences.get("features"):
try:
logging.info(f"XXX CHRISTIAN type(feat): {type(feat)}")
logging.info(
f"XXX CHRISTIAN type(feat['geometry']): {type(feat['geometry'])}"
)
logging.info(f"XXX CHRISTIAN feat['geometry']: {feat['geometry']}")
logging.info(
f"XXX CHRISTIAN geojson.dumps(feat['geometry']): {geojson.dumps(feat['geometry'])}"
)
except: # noqa
pass
async with db_client.session.begin():
# mini_gdf = gpd.GeoDataframe(feat)
# if mini_gdf.intersects(land):
# feat.set("properties").set("inf_idx") = "model.background" (most often 0)
slick = await db_client.add_slick(
orchestrator_run,
sentinel1_grd.start_time,
feat.get("geometry"),
feat.get("properties").get("inf_idx"),
feat.get("properties").get("machine_confidence"),
)
print(f"Added slick {slick}")
logging.info(f"Added slick: {slick}")

if merged_inferences.get("features"):
add_to_aaa_queue(sentinel1_grd.scene_id)

end_time = datetime.now()
print(f"End time: {end_time}")
print("Returning results!")
logging.info(f"End time: {end_time}")
logging.info("Returning results!")

async with db_client.session.begin():
orchestrator_run.success = True
Expand All @@ -468,7 +499,7 @@ async def _orchestrate(
noffsettiles=noffsettiles,
)
else:
print("DRY RUN!!")
logging.info("DRY RUN!!")
orchestrator_result = OrchestratorResult(
classification_base=geojson.FeatureCollection(features=[]),
classification_offset=geojson.FeatureCollection(features=[]),
Expand Down
2 changes: 1 addition & 1 deletion stack/cloud_run_offset_tile.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
value=pulumi.Config("cerulean-cloud").require("apikey"),
),
],
resources=dict(limits=dict(memory="6Gi", cpu="2000m")),
resources=dict(limits=dict(memory="8Gi", cpu="2000m")),
),
],
container_concurrency=3,
Expand Down

0 comments on commit 927ebb7

Please sign in to comment.