Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingestion status callback update #142

Merged
merged 19 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions app/domain/data/lecture_unit_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@


class LectureUnitDTO(BaseModel):
to_update: bool = Field(alias="toUpdate")
base_url: str = Field(alias="artemisBaseUrl")
pdf_file_base64: str = Field(default="", alias="pdfFile")
lecture_unit_id: int = Field(alias="lectureUnitId")
lecture_unit_name: str = Field(default="", alias="lectureUnitName")
Expand Down
15 changes: 15 additions & 0 deletions app/domain/ingestion/deletionPipelineExecutionDto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import List, Optional

from pydantic import Field

from app.domain import PipelineExecutionDTO, PipelineExecutionSettingsDTO
from app.domain.data.lecture_unit_dto import LectureUnitDTO
from app.domain.status.stage_dto import StageDTO


class LecturesDeletionExecutionDto(PipelineExecutionDTO):
lecture_units: List[LectureUnitDTO] = Field(..., alias="pyrisLectureUnits")
settings: Optional[PipelineExecutionSettingsDTO]
initial_stages: Optional[List[StageDTO]] = Field(
default=None, alias="initialStages"
)
4 changes: 1 addition & 3 deletions app/domain/ingestion/ingestion_pipeline_execution_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@


class IngestionPipelineExecutionDto(PipelineExecutionDTO):
lecture_units: List[LectureUnitDTO] = Field(
..., alias="pyrisLectureUnitWebhookDTOS"
)
lecture_unit: LectureUnitDTO = Field(..., alias="pyrisLectureUnit")
settings: Optional[PipelineExecutionSettingsDTO]
initial_stages: Optional[List[StageDTO]] = Field(
default=None, alias="initialStages"
Expand Down
1 change: 1 addition & 0 deletions app/domain/ingestion/ingestion_status_update_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@

class IngestionStatusUpdateDTO(StatusUpdateDTO):
result: Optional[str] = None
id: Optional[int] = None
46 changes: 25 additions & 21 deletions app/pipeline/lecture_ingestion_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import tempfile
import threading
from asyncio.log import logger
from typing import Optional

import fitz
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
Expand Down Expand Up @@ -90,7 +92,7 @@ class LectureIngestionPipeline(AbstractIngestion, Pipeline):
def __init__(
self,
client: WeaviateClient,
dto: IngestionPipelineExecutionDto,
dto: Optional[IngestionPipelineExecutionDto],
callback: IngestionStatusCallback,
):
super().__init__()
Expand All @@ -116,33 +118,31 @@ def __init__(
def __call__(self) -> bool:
try:
self.callback.in_progress("Deleting old slides from database...")
self.delete_old_lectures()
self.delete_lecture_unit(
self.dto.lecture_unit.course_id,
self.dto.lecture_unit.lecture_id,
self.dto.lecture_unit.lecture_unit_id,
self.dto.settings.artemis_base_url,
)
yassinsws marked this conversation as resolved.
Show resolved Hide resolved
self.callback.done("Old slides removed")
# Here we check if the operation is for updating or for deleting,
# we only check the first file because all the files will have the same operation
if not self.dto.lecture_units[0].to_update:
self.callback.skip("Lecture Chunking and interpretation Skipped")
self.callback.skip("No new slides to update")
return True
self.callback.in_progress("Chunking and interpreting lecture...")
chunks = []
for i, lecture_unit in enumerate(self.dto.lecture_units):
pdf_path = save_pdf(lecture_unit.pdf_file_base64)
chunks.extend(
self.chunk_data(
lecture_pdf=pdf_path,
lecture_unit_dto=lecture_unit,
base_url=self.dto.settings.artemis_base_url,
)
pdf_path = save_pdf(self.dto.lecture_unit.pdf_file_base64)
chunks.extend(
self.chunk_data(
lecture_pdf=pdf_path,
lecture_unit_dto=self.dto.lecture_unit,
base_url=self.dto.settings.artemis_base_url,
)
cleanup_temporary_file(pdf_path)
)
cleanup_temporary_file(pdf_path)
Comment on lines +133 to +141
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure temporary files are cleaned up even on exceptions

If an exception occurs before cleanup_temporary_file(pdf_path) is called, the temporary file may remain on the system. To guarantee cleanup regardless of success or failure, consider using a finally block.

Apply this diff to modify the code:

         pdf_path = save_pdf(self.dto.lecture_unit.pdf_file_base64)
-        chunks.extend(
-            self.chunk_data(
-                lecture_pdf=pdf_path,
-                lecture_unit_dto=self.dto.lecture_unit,
-                base_url=self.dto.settings.artemis_base_url,
-            )
-        )
-        cleanup_temporary_file(pdf_path)
+        try:
+            chunks.extend(
+                self.chunk_data(
+                    lecture_pdf=pdf_path,
+                    lecture_unit_dto=self.dto.lecture_unit,
+                    base_url=self.dto.settings.artemis_base_url,
+                )
+            )
+        finally:
+            cleanup_temporary_file(pdf_path)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pdf_path = save_pdf(self.dto.lecture_unit.pdf_file_base64)
chunks.extend(
self.chunk_data(
lecture_pdf=pdf_path,
lecture_unit_dto=self.dto.lecture_unit,
base_url=self.dto.settings.artemis_base_url,
)
cleanup_temporary_file(pdf_path)
)
cleanup_temporary_file(pdf_path)
pdf_path = save_pdf(self.dto.lecture_unit.pdf_file_base64)
try:
chunks.extend(
self.chunk_data(
lecture_pdf=pdf_path,
lecture_unit_dto=self.dto.lecture_unit,
base_url=self.dto.settings.artemis_base_url,
)
)
finally:
cleanup_temporary_file(pdf_path)

self.callback.done("Lecture Chunking and interpretation Finished")
self.callback.in_progress("Ingesting lecture chunks into database...")
self.batch_update(chunks)
self.callback.done("Lecture Ingestion Finished")
logger.info(
f"Lecture ingestion pipeline finished Successfully for course "
f"{self.dto.lecture_units[0].course_name}"
f"{self.dto.lecture_unit.course_name}"
)
return True
except Exception as e:
Expand Down Expand Up @@ -294,23 +294,27 @@ def get_course_language(self, page_content: str) -> str:
)
return response.contents[0].text_content

def delete_old_lectures(self):
def delete_old_lectures(
self, lecture_units: list[LectureUnitDTO], artemis_base_url: str
):
"""
Delete the lecture unit from the database
"""
try:
for lecture_unit in self.dto.lecture_units:
for lecture_unit in lecture_units:
if self.delete_lecture_unit(
lecture_unit.course_id,
lecture_unit.lecture_id,
lecture_unit.lecture_unit_id,
self.dto.settings.artemis_base_url,
artemis_base_url,
):
logger.info("Lecture deleted successfully")
else:
logger.error("Failed to delete lecture")
self.callback.done("Old slides removed")
except Exception as e:
logger.error(f"Error deleting lecture unit: {e}")
self.callback.error("Error while removing old slides")
return False

def delete_lecture_unit(self, course_id, lecture_id, lecture_unit_id, base_url):
Expand Down
40 changes: 38 additions & 2 deletions app/web/routers/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@
IngestionPipelineExecutionDto,
)
from ..status.IngestionStatusCallback import IngestionStatusCallback
from ..status.LecturesDeletionStatusCallback import LecturesDeletionStatusCallback
from ...domain.ingestion.deletionPipelineExecutionDto import (
LecturesDeletionExecutionDto,
)
from ...pipeline.lecture_ingestion_pipeline import LectureIngestionPipeline
from ...vector_database.database import VectorDatabase

router = APIRouter(prefix="/api/v1/webhooks", tags=["webhooks"])


semaphore = Semaphore(5)


Expand All @@ -29,6 +32,7 @@ def run_lecture_update_pipeline_worker(dto: IngestionPipelineExecutionDto):
run_id=dto.settings.authentication_token,
base_url=dto.settings.artemis_base_url,
initial_stages=dto.initial_stages,
lecture_unit_id=dto.lecture_unit.lecture_unit_id,
)
db = VectorDatabase()
client = db.get_client()
Expand All @@ -44,14 +48,46 @@ def run_lecture_update_pipeline_worker(dto: IngestionPipelineExecutionDto):
semaphore.release()


def run_lecture_deletion_pipeline_worker(dto: LecturesDeletionExecutionDto):
"""
Run the exercise chat pipeline in a separate thread
"""
try:
callback = LecturesDeletionStatusCallback(
run_id=dto.settings.authentication_token,
base_url=dto.settings.artemis_base_url,
initial_stages=dto.initial_stages,
)
db = VectorDatabase()
client = db.get_client()
pipeline = LectureIngestionPipeline(client=client, dto=None, callback=callback)
pipeline.delete_old_lectures(dto.lecture_units)
except Exception as e:
logger.error(f"Error while deleting lectures: {e}")
logger.error(traceback.format_exc())


@router.post(
"/lectures/fullIngestion",
status_code=status.HTTP_202_ACCEPTED,
dependencies=[Depends(TokenValidator())],
)
def lecture_webhook(dto: IngestionPipelineExecutionDto):
def lecture_ingestion_webhook(dto: IngestionPipelineExecutionDto):
"""
Webhook endpoint to trigger the exercise chat pipeline
"""
thread = Thread(target=run_lecture_update_pipeline_worker, args=(dto,))
thread.start()


@router.post(
"/lectures/delete",
status_code=status.HTTP_202_ACCEPTED,
dependencies=[Depends(TokenValidator())],
)
def lecture_deletion_webhook(dto: LecturesDeletionExecutionDto):
"""
Webhook endpoint to trigger the exercise chat pipeline
"""
thread = Thread(target=run_lecture_deletion_pipeline_worker, args=(dto,))
thread.start()
8 changes: 6 additions & 2 deletions app/web/status/IngestionStatusCallback.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ class IngestionStatusCallback(StatusCallback):
"""

def __init__(
self, run_id: str, base_url: str, initial_stages: List[StageDTO] = None
self,
run_id: str,
base_url: str,
initial_stages: List[StageDTO] = None,
lecture_unit_id: int = None,
):
url = f"{base_url}/api/public/pyris/webhooks/ingestion/runs/{run_id}/status"

Expand All @@ -36,6 +40,6 @@ def __init__(
name="Slides ingestion",
),
]
status = IngestionStatusUpdateDTO(stages=stages)
status = IngestionStatusUpdateDTO(stages=stages, id=lecture_unit_id)
stage = stages[current_stage_index]
super().__init__(url, run_id, status, stage, current_stage_index)
31 changes: 31 additions & 0 deletions app/web/status/LecturesDeletionStatusCallback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import List

from .status_update import StatusCallback
from ...domain.ingestion.ingestion_status_update_dto import IngestionStatusUpdateDTO
from ...domain.status.stage_state_dto import StageStateEnum
from ...domain.status.stage_dto import StageDTO
import logging

logger = logging.getLogger(__name__)


class LecturesDeletionStatusCallback(StatusCallback):
"""
Callback class for updating the status of a Tutor Chat pipeline run.
"""

def __init__(
self, run_id: str, base_url: str, initial_stages: List[StageDTO] = None
):
url = f"{base_url}/api/public/pyris/webhooks/ingestion/runs/{run_id}/status"

current_stage_index = len(initial_stages) if initial_stages else 0
stages = initial_stages or []
stages += [
StageDTO(
weight=100, state=StageStateEnum.NOT_STARTED, name="Slides removal"
),
]
status = IngestionStatusUpdateDTO(stages=stages)
stage = stages[current_stage_index]
super().__init__(url, run_id, status, stage, current_stage_index)
Loading