Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Development: Add Sentry reporting for the pipelines #135

Merged
merged 2 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions app/pipeline/chat/exercise_chat_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,12 @@ def __call__(self, dto: ExerciseChatPipelineExecutionDTO):
exc_info=e,
)
traceback.print_exc()
self.callback.error("Generating interaction suggestions failed.")
self.callback.error(
"Generating interaction suggestions failed.", exception=e
)
except Exception as e:
traceback.print_exc()
self.callback.error(f"Failed to generate response: {e}")
self.callback.error(f"Failed to generate response: {e}", exception=e)

def _run_exercise_chat_pipeline(
self,
Expand Down Expand Up @@ -210,7 +212,7 @@ def _run_exercise_chat_pipeline(
)
except Exception as e:
self.callback.error(
f"Failed to look up files in the repository: {e}"
f"Failed to look up files in the repository: {e}", exception=e
)
return

Expand All @@ -223,7 +225,9 @@ def _run_exercise_chat_pipeline(
self.retrieved_lecture_chunks
)
except Exception as e:
self.callback.error(f"Failed to retrieve lecture chunks: {e}")
self.callback.error(
f"Failed to retrieve lecture chunks: {e}", exception=e
)
return

self.callback.done()
Expand Down Expand Up @@ -270,7 +274,7 @@ def _run_exercise_chat_pipeline(
print("Response is rewritten.")
self.exercise_chat_response = guide_response
except Exception as e:
self.callback.error(f"Failed to create response: {e}")
self.callback.error(f"Failed to create response: {e}", exception=e)
# print stack trace
traceback.print_exc()
return "Failed to generate response"
Expand Down
6 changes: 4 additions & 2 deletions app/pipeline/lecture_ingestion_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ def __call__(self) -> bool:
return True
except Exception as e:
logger.error(f"Error updating lecture unit: {e}")
self.callback.error(f"Failed to ingest lectures into the database: {e}")
self.callback.error(
f"Failed to ingest lectures into the database: {e}", exception=e
)
return False

def batch_update(self, chunks):
Expand All @@ -168,7 +170,7 @@ def batch_update(self, chunks):
except Exception as e:
logger.error(f"Error updating lecture unit: {e}")
self.callback.error(
f"Failed to ingest lectures into the database: {e}"
f"Failed to ingest lectures into the database: {e}", exception=e
)

def chunk_data(
Expand Down
8 changes: 6 additions & 2 deletions app/web/routers/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import traceback
from threading import Thread

from sentry_sdk import capture_exception

from fastapi import APIRouter, status, Response, Depends

from app.domain import (
Expand Down Expand Up @@ -31,14 +33,15 @@ def run_exercise_chat_pipeline_worker(dto: ExerciseChatPipelineExecutionDTO):
except Exception as e:
logger.error(f"Error preparing exercise chat pipeline: {e}")
logger.error(traceback.format_exc())
capture_exception(e)
return

try:
pipeline(dto=dto)
except Exception as e:
logger.error(f"Error running exercise chat pipeline: {e}")
logger.error(traceback.format_exc())
callback.error("Fatal error.")
callback.error("Fatal error.", exception=e)


@router.post(
Expand All @@ -62,14 +65,15 @@ def run_course_chat_pipeline_worker(dto, variant):
except Exception as e:
logger.error(f"Error preparing exercise chat pipeline: {e}")
logger.error(traceback.format_exc())
capture_exception(e)
return

try:
pipeline(dto=dto)
except Exception as e:
logger.error(f"Error running exercise chat pipeline: {e}")
logger.error(traceback.format_exc())
callback.error("Fatal error.")
callback.error("Fatal error.", exception=e)


@router.post(
Expand Down
3 changes: 3 additions & 0 deletions app/web/routers/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from asyncio.log import logger
from threading import Thread, Semaphore

from sentry_sdk import capture_exception

from fastapi import APIRouter, status, Depends
from app.dependencies import TokenValidator
from app.domain.ingestion.ingestion_pipeline_execution_dto import (
Expand Down Expand Up @@ -37,6 +39,7 @@ def run_lecture_update_pipeline_worker(dto: IngestionPipelineExecutionDto):
except Exception as e:
logger.error(f"Error Ingestion pipeline: {e}")
logger.error(traceback.format_exc())
capture_exception(e)
finally:
semaphore.release()

Expand Down
11 changes: 10 additions & 1 deletion app/web/status/status_update.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Optional, List

from sentry_sdk import capture_exception, capture_message

import requests
from abc import ABC

Expand Down Expand Up @@ -57,6 +59,7 @@ def on_status_update(self):
).raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"Error sending status update: {e}")
capture_exception(e)

def get_next_stage(self):
"""Return the next stage in the status, or None if there are no more stages."""
Expand Down Expand Up @@ -117,7 +120,7 @@ def done(
"Invalid state transition to done. current state is ", self.stage.state
)

def error(self, message: str):
def error(self, message: str, exception=None):
"""
Transition the current stage to ERROR and update the status.
Set all later stages to SKIPPED if an error occurs.
Expand All @@ -140,6 +143,12 @@ def error(self, message: str):
logger.error(
f"Error occurred in job {self.run_id} in stage {self.stage.name}: {message}"
)
if exception:
capture_exception(exception)
else:
capture_message(
f"Error occurred in job {self.run_id} in stage {self.stage.name}: {message}"
)

def skip(self, message: Optional[str] = None, start_next_stage: bool = True):
"""
Expand Down
Loading