Skip to content

Commit

Permalink
Development: Add Sentry reporting for the pipelines (#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hialus authored Jul 3, 2024
1 parent 929e359 commit 80c183f
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 10 deletions.
14 changes: 9 additions & 5 deletions app/pipeline/chat/exercise_chat_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,12 @@ def __call__(self, dto: ExerciseChatPipelineExecutionDTO):
exc_info=e,
)
traceback.print_exc()
self.callback.error("Generating interaction suggestions failed.")
self.callback.error(
"Generating interaction suggestions failed.", exception=e
)
except Exception as e:
traceback.print_exc()
self.callback.error(f"Failed to generate response: {e}")
self.callback.error(f"Failed to generate response: {e}", exception=e)

def _run_exercise_chat_pipeline(
self,
Expand Down Expand Up @@ -210,7 +212,7 @@ def _run_exercise_chat_pipeline(
)
except Exception as e:
self.callback.error(
f"Failed to look up files in the repository: {e}"
f"Failed to look up files in the repository: {e}", exception=e
)
return

Expand All @@ -223,7 +225,9 @@ def _run_exercise_chat_pipeline(
self.retrieved_lecture_chunks
)
except Exception as e:
self.callback.error(f"Failed to retrieve lecture chunks: {e}")
self.callback.error(
f"Failed to retrieve lecture chunks: {e}", exception=e
)
return

self.callback.done()
Expand Down Expand Up @@ -270,7 +274,7 @@ def _run_exercise_chat_pipeline(
print("Response is rewritten.")
self.exercise_chat_response = guide_response
except Exception as e:
self.callback.error(f"Failed to create response: {e}")
self.callback.error(f"Failed to create response: {e}", exception=e)
# print stack trace
traceback.print_exc()
return "Failed to generate response"
Expand Down
6 changes: 4 additions & 2 deletions app/pipeline/lecture_ingestion_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ def __call__(self) -> bool:
return True
except Exception as e:
logger.error(f"Error updating lecture unit: {e}")
self.callback.error(f"Failed to ingest lectures into the database: {e}")
self.callback.error(
f"Failed to ingest lectures into the database: {e}", exception=e
)
return False

def batch_update(self, chunks):
Expand All @@ -168,7 +170,7 @@ def batch_update(self, chunks):
except Exception as e:
logger.error(f"Error updating lecture unit: {e}")
self.callback.error(
f"Failed to ingest lectures into the database: {e}"
f"Failed to ingest lectures into the database: {e}", exception=e
)

def chunk_data(
Expand Down
8 changes: 6 additions & 2 deletions app/web/routers/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import traceback
from threading import Thread

from sentry_sdk import capture_exception

from fastapi import APIRouter, status, Response, Depends

from app.domain import (
Expand Down Expand Up @@ -31,14 +33,15 @@ def run_exercise_chat_pipeline_worker(dto: ExerciseChatPipelineExecutionDTO):
except Exception as e:
logger.error(f"Error preparing exercise chat pipeline: {e}")
logger.error(traceback.format_exc())
capture_exception(e)
return

try:
pipeline(dto=dto)
except Exception as e:
logger.error(f"Error running exercise chat pipeline: {e}")
logger.error(traceback.format_exc())
callback.error("Fatal error.")
callback.error("Fatal error.", exception=e)


@router.post(
Expand All @@ -62,14 +65,15 @@ def run_course_chat_pipeline_worker(dto, variant):
except Exception as e:
logger.error(f"Error preparing exercise chat pipeline: {e}")
logger.error(traceback.format_exc())
capture_exception(e)
return

try:
pipeline(dto=dto)
except Exception as e:
logger.error(f"Error running exercise chat pipeline: {e}")
logger.error(traceback.format_exc())
callback.error("Fatal error.")
callback.error("Fatal error.", exception=e)


@router.post(
Expand Down
3 changes: 3 additions & 0 deletions app/web/routers/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from asyncio.log import logger
from threading import Thread, Semaphore

from sentry_sdk import capture_exception

from fastapi import APIRouter, status, Depends
from app.dependencies import TokenValidator
from app.domain.ingestion.ingestion_pipeline_execution_dto import (
Expand Down Expand Up @@ -37,6 +39,7 @@ def run_lecture_update_pipeline_worker(dto: IngestionPipelineExecutionDto):
except Exception as e:
logger.error(f"Error Ingestion pipeline: {e}")
logger.error(traceback.format_exc())
capture_exception(e)
finally:
semaphore.release()

Expand Down
11 changes: 10 additions & 1 deletion app/web/status/status_update.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Optional, List

from sentry_sdk import capture_exception, capture_message

import requests
from abc import ABC

Expand Down Expand Up @@ -57,6 +59,7 @@ def on_status_update(self):
).raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"Error sending status update: {e}")
capture_exception(e)

def get_next_stage(self):
"""Return the next stage in the status, or None if there are no more stages."""
Expand Down Expand Up @@ -117,7 +120,7 @@ def done(
"Invalid state transition to done. current state is ", self.stage.state
)

def error(self, message: str):
def error(self, message: str, exception=None):
"""
Transition the current stage to ERROR and update the status.
Set all later stages to SKIPPED if an error occurs.
Expand All @@ -140,6 +143,12 @@ def error(self, message: str):
logger.error(
f"Error occurred in job {self.run_id} in stage {self.stage.name}: {message}"
)
if exception:
capture_exception(exception)
else:
capture_message(
f"Error occurred in job {self.run_id} in stage {self.stage.name}: {message}"
)

def skip(self, message: Optional[str] = None, start_next_stage: bool = True):
"""
Expand Down

0 comments on commit 80c183f

Please sign in to comment.