diff --git a/cerulean_cloud/cloud_run_orchestrator/clients.py b/cerulean_cloud/cloud_run_orchestrator/clients.py index f37cd42a..4a897e35 100644 --- a/cerulean_cloud/cloud_run_orchestrator/clients.py +++ b/cerulean_cloud/cloud_run_orchestrator/clients.py @@ -6,6 +6,7 @@ import logging import os import sys +import traceback import zipfile from base64 import b64encode from datetime import datetime @@ -285,6 +286,7 @@ async def run_parallel_inference(self, tileset): severity="ERROR", scene_id=self.sceneid, exception=str(e), + traceback=traceback.format_exc(), ) ) inferences = None diff --git a/cerulean_cloud/cloud_run_orchestrator/handler.py b/cerulean_cloud/cloud_run_orchestrator/handler.py index 2e4fac3c..f3634ef3 100644 --- a/cerulean_cloud/cloud_run_orchestrator/handler.py +++ b/cerulean_cloud/cloud_run_orchestrator/handler.py @@ -271,6 +271,7 @@ async def _orchestrate( severity="ERROR", scene_id=payload.sceneid, exception=str(e), + traceback=traceback.format_exc(), ) ) return OrchestratorResult(status=str(e)) @@ -284,6 +285,7 @@ async def _orchestrate( severity="ERROR", scene_id=payload.sceneid, exception=str(e), + traceback=traceback.format_exc(), ) ) return OrchestratorResult(status=str(e)) @@ -439,7 +441,15 @@ async def _orchestrate( ) orchestrator_run_id = orchestrator_run.id except Exception as e: - logging.error("Failed to write to DB") + logger.error( + structured_log( + "Failed to write to DB", + severity="ERROR", + scene_id=payload.sceneid, + exception=str(e), + traceback=traceback.format_exc(), + ) + ) return OrchestratorResult(status=str(e)) success = True diff --git a/cerulean_cloud/database_client.py b/cerulean_cloud/database_client.py index a9423c64..7825f2e5 100644 --- a/cerulean_cloud/database_client.py +++ b/cerulean_cloud/database_client.py @@ -207,7 +207,10 @@ async def get_sentinel1_grd(self, sceneid: str, scene_info: dict, titiler_url: s except Exception as e: self.logger.error( structured_log( - "Failed to get S1 record", severity="ERROR", exception=str(e) + "Failed to get S1 record", + severity="ERROR", + exception=str(e), + traceback=traceback.format_exc(), ) ) raise @@ -450,6 +453,7 @@ async def deactivate_stale_slicks_from_scene_id(self, scene_id): "Failed to deactivate stale slicks", severity="ERROR", exception=str(e), + traceback=traceback.format_exc(), ) ) raise diff --git a/cerulean_cloud/models.py b/cerulean_cloud/models.py index 36f47d1a..000c92f5 100644 --- a/cerulean_cloud/models.py +++ b/cerulean_cloud/models.py @@ -10,6 +10,7 @@ import logging import os import sys +import traceback from base64 import b64decode, b64encode from io import BytesIO from typing import List, Union @@ -32,6 +33,7 @@ InferenceResult, InferenceResultStack, ) +from cerulean_cloud.cloud_run_orchestrator.utils import structured_log logger = logging.getLogger("model") handler = logging.StreamHandler(sys.stdout) @@ -75,7 +77,14 @@ def load(self): self.model = torch.jit.load(self.model_path_local, map_location="cpu") self.model.eval() except Exception as e: - logger.error("Error loading model: %s", e, exc_info=True) + logger.error( + structured_log( + "Error loading model", + severity="ERROR", + exception=str(e), + traceback=traceback.format_exc(), + ) + ) raise def predict(self, inf_stack: List[InferenceInput]) -> InferenceResultStack: @@ -85,7 +94,9 @@ def predict(self, inf_stack: List[InferenceInput]) -> InferenceResultStack: Args: inf_stack: The input data stack for inference. """ - logger.info(f"Stack has {len(inf_stack)} images") + logger.info( + structured_log(f"Stack has {len(inf_stack)} images", severity="INFO") + ) self.load() # Load model into memory preprocessed_tensors = self.preprocess_tiles(inf_stack) @@ -247,7 +258,12 @@ def preprocess_tiles(self, inf_stack: List[InferenceInput]): b64_image_to_array(record.image, tensor=True, to_float=True) for record in inf_stack ] - logger.info(f"Images have shape {stack_tensors[0].shape}") + logger.info( + structured_log( + f"Images have shape {stack_tensors[0].shape}", + severity="INFO", + ) + ) return stack_tensors def process_tiles(self, stack_tensors): @@ -331,11 +347,26 @@ def postprocess_tileset( geojson.FeatureCollection: A geojson feature collection representing the processed and combined geographical data. """ - logger.info("Reducing feature count on tiles") + logger.info( + structured_log( + "Reducing feature count on tiles", + severity="INFO", + ) + ) scene_polys = self.reduce_tile_features(tileset_results, tileset_bounds) - logger.info("Stitching tiles into scene") + logger.info( + structured_log( + "Stitching tiles into scene", + severity="INFO", + ) + ) feature_collection = self.stitch(scene_polys) - logger.info("Reducing feature count on scene") + logger.info( + structured_log( + "Reducing feature count on scene", + severity="INFO", + ) + ) reduced_feature_collection = self.nms_feature_reduction(feature_collection) return reduced_feature_collection @@ -732,10 +763,22 @@ def preprocess_tiles(self, inf_stack: List[InferenceInput]): for record in inf_stack ] batch_tensor = torch.cat(stack_tensors, dim=0).to(self.device) - logger.info(f"Batch tensor shape: {batch_tensor.shape}") + logger.info( + structured_log( + f"Batch tensor shape: {batch_tensor.shape}", + severity="INFO", + ) + ) return batch_tensor # Only the tensor batch is needed for the model except Exception as e: - logger.error("Error in preprocessing: %s", e, exc_info=True) + logger.error( + structured_log( + "Error in preprocessing", + severity="ERROR", + exception=str(e), + traceback=traceback.format_exc(), + ) + ) raise def process_tiles(self, preprocessed_tensors): @@ -809,11 +852,11 @@ def postprocess_tileset( Args: tileset_results: The list of InferenceResultStacks to stitch together. """ - logger.info("Stitching tiles into scene") + logger.info(structured_log("Stitching tiles into scene", severity="INFO")) scene_array_probs, transform = self.stitch(tileset_results, tileset_bounds) - logger.info("Finding instances in scene") + logger.info(structured_log("Finding instances in scene", severity="INFO")) feature_collection = self.instantiate(scene_array_probs, transform) - logger.info("Reducing feature count on scene") + logger.info(structured_log("Reducing feature count on scene", severity="INFO")) reduced_feature_collection = self.nms_feature_reduction(feature_collection) return reduced_feature_collection @@ -1062,7 +1105,7 @@ def get_model( An instance of the appropriate model class. """ model_type = model_dict["type"] - logger.info(f"Model type is {model_type}") + logger.info(structured_log(f"Model type is {model_type}", severity="INFO")) if model_type == "MASKRCNN": return MASKRCNNModel(model_dict, model_path_local) @@ -1127,7 +1170,14 @@ def b64_image_to_array(image: str, tensor: bool = False, to_float=False): np_img = dtype_to_float(np_img) return torch.tensor(np_img) if tensor else np_img except Exception as e: - logger.error(f"Failed to convert base64 image to array: {e}") + logger.error( + structured_log( + "Failed to convert base64 image to array", + severity="ERROR", + exception=str(e), + traceback=traceback.format_exc(), + ) + ) raise diff --git a/cerulean_cloud/titiler_client.py b/cerulean_cloud/titiler_client.py index a50cb5a7..591f7a35 100644 --- a/cerulean_cloud/titiler_client.py +++ b/cerulean_cloud/titiler_client.py @@ -4,6 +4,7 @@ import os import sys import time +import traceback import urllib.parse as urlib from typing import Dict, List, Optional, Tuple @@ -225,13 +226,15 @@ async def get_offset_tile( np_img = reshape_as_image(dataset.read()) return np_img - except Exception: + except Exception as e: if attempt == retries: self.logger.error( structured_log( f"Failed to retrieve {url}", severity="ERROR", scene_id=sceneid, + exception=str(e), + traceback=traceback.format_exc(), ) ) raise