Skip to content

Commit

Permalink
Ingestion status callback update (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
yassinsws authored Nov 29, 2024
1 parent dad698e commit fad216c
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 35 deletions.
2 changes: 0 additions & 2 deletions app/domain/data/lecture_unit_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@


class LectureUnitDTO(BaseModel):
to_update: bool = Field(alias="toUpdate")
base_url: str = Field(alias="artemisBaseUrl")
pdf_file_base64: str = Field(default="", alias="pdfFile")
lecture_unit_id: int = Field(alias="lectureUnitId")
lecture_unit_name: str = Field(default="", alias="lectureUnitName")
Expand Down
15 changes: 15 additions & 0 deletions app/domain/ingestion/deletionPipelineExecutionDto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import List, Optional

from pydantic import Field

from app.domain import PipelineExecutionDTO, PipelineExecutionSettingsDTO
from app.domain.data.lecture_unit_dto import LectureUnitDTO
from app.domain.status.stage_dto import StageDTO


class LecturesDeletionExecutionDto(PipelineExecutionDTO):
lecture_units: List[LectureUnitDTO] = Field(..., alias="pyrisLectureUnits")
settings: Optional[PipelineExecutionSettingsDTO]
initial_stages: Optional[List[StageDTO]] = Field(
default=None, alias="initialStages"
)
11 changes: 7 additions & 4 deletions app/domain/ingestion/ingestion_pipeline_execution_dto.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from typing import List
from typing import List, Optional

from pydantic import Field

from app.domain import PipelineExecutionDTO
from app.domain import PipelineExecutionDTO, PipelineExecutionSettingsDTO
from app.domain.data.lecture_unit_dto import LectureUnitDTO
from app.domain.status.stage_dto import StageDTO


class IngestionPipelineExecutionDto(PipelineExecutionDTO):
lecture_units: List[LectureUnitDTO] = Field(
..., alias="pyrisLectureUnitWebhookDTOS"
lecture_unit: LectureUnitDTO = Field(..., alias="pyrisLectureUnit")
settings: Optional[PipelineExecutionSettingsDTO]
initial_stages: Optional[List[StageDTO]] = Field(
default=None, alias="initialStages"
)
1 change: 1 addition & 0 deletions app/domain/ingestion/ingestion_status_update_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@

class IngestionStatusUpdateDTO(StatusUpdateDTO):
result: Optional[str] = None
id: Optional[int] = None
2 changes: 2 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from app.web.routers.health import router as health_router
from app.web.routers.pipelines import router as pipelines_router
from app.web.routers.webhooks import router as webhooks_router
from app.web.routers.ingestion_status import router as ingestion_status_router

import logging
from fastapi import FastAPI, Request, status
Expand Down Expand Up @@ -57,3 +58,4 @@ async def some_middleware(request: Request, call_next):
app.include_router(health_router)
app.include_router(pipelines_router)
app.include_router(webhooks_router)
app.include_router(ingestion_status_router)
51 changes: 28 additions & 23 deletions app/pipeline/lecture_ingestion_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import tempfile
import threading
from asyncio.log import logger
from typing import Optional

import fitz
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
Expand All @@ -28,9 +30,10 @@
CapabilityRequestHandler,
RequirementList,
)
from ..web.status import IngestionStatusCallback
from langchain_text_splitters import RecursiveCharacterTextSplitter

from ..web.status import ingestion_status_callback

batch_update_lock = threading.Lock()


Expand Down Expand Up @@ -92,8 +95,8 @@ class LectureIngestionPipeline(AbstractIngestion, Pipeline):
def __init__(
self,
client: WeaviateClient,
dto: IngestionPipelineExecutionDto,
callback: IngestionStatusCallback,
dto: Optional[IngestionPipelineExecutionDto],
callback: ingestion_status_callback,
):
super().__init__()
self.collection = init_lecture_schema(client)
Expand All @@ -119,33 +122,31 @@ def __init__(
def __call__(self) -> bool:
try:
self.callback.in_progress("Deleting old slides from database...")
self.delete_old_lectures()
self.delete_lecture_unit(
self.dto.lecture_unit.course_id,
self.dto.lecture_unit.lecture_id,
self.dto.lecture_unit.lecture_unit_id,
self.dto.settings.artemis_base_url,
)
self.callback.done("Old slides removed")
# Here we check if the operation is for updating or for deleting,
# we only check the first file because all the files will have the same operation
if not self.dto.lecture_units[0].to_update:
self.callback.skip("Lecture Chunking and interpretation Skipped")
self.callback.skip("No new slides to update")
return True
self.callback.in_progress("Chunking and interpreting lecture...")
chunks = []
for i, lecture_unit in enumerate(self.dto.lecture_units):
pdf_path = save_pdf(lecture_unit.pdf_file_base64)
chunks.extend(
self.chunk_data(
lecture_pdf=pdf_path,
lecture_unit_dto=lecture_unit,
base_url=self.dto.settings.artemis_base_url,
)
pdf_path = save_pdf(self.dto.lecture_unit.pdf_file_base64)
chunks.extend(
self.chunk_data(
lecture_pdf=pdf_path,
lecture_unit_dto=self.dto.lecture_unit,
base_url=self.dto.settings.artemis_base_url,
)
cleanup_temporary_file(pdf_path)
)
cleanup_temporary_file(pdf_path)
self.callback.done("Lecture Chunking and interpretation Finished")
self.callback.in_progress("Ingesting lecture chunks into database...")
self.batch_update(chunks)
self.callback.done("Lecture Ingestion Finished", tokens=self.tokens)
logger.info(
f"Lecture ingestion pipeline finished Successfully for course "
f"{self.dto.lecture_units[0].course_name}"
f"{self.dto.lecture_unit.course_name}"
)
return True
except Exception as e:
Expand Down Expand Up @@ -307,23 +308,27 @@ def get_course_language(self, page_content: str) -> str:
self._append_tokens(response.token_usage, PipelineEnum.IRIS_LECTURE_INGESTION)
return response.contents[0].text_content

def delete_old_lectures(self):
def delete_old_lectures(
self, lecture_units: list[LectureUnitDTO], artemis_base_url: str
):
"""
Delete the lecture unit from the database
"""
try:
for lecture_unit in self.dto.lecture_units:
for lecture_unit in lecture_units:
if self.delete_lecture_unit(
lecture_unit.course_id,
lecture_unit.lecture_id,
lecture_unit.lecture_unit_id,
self.dto.settings.artemis_base_url,
artemis_base_url,
):
logger.info("Lecture deleted successfully")
else:
logger.error("Failed to delete lecture")
self.callback.done("Old slides removed")
except Exception as e:
logger.error(f"Error deleting lecture unit: {e}")
self.callback.error("Error while removing old slides")
return False

def delete_lecture_unit(self, course_id, lecture_id, lecture_unit_id, base_url):
Expand Down
62 changes: 62 additions & 0 deletions app/web/routers/ingestion_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import json
from urllib.parse import unquote

from fastapi import APIRouter, status, Response, Depends
from fastapi.params import Query
from weaviate.collections.classes.filters import Filter

from app.dependencies import TokenValidator
from ...vector_database.database import VectorDatabase
from ...vector_database.lecture_schema import LectureSchema
from enum import Enum

router = APIRouter(prefix="/api/v1", tags=["ingestion_status"])


class IngestionState(str, Enum):
DONE = "DONE"
NOT_STARTED = "NOT_STARTED"


@router.get(
"/courses/{course_id}/lectures/{lecture_id}/lectureUnits/{lecture_unit_id}/ingestion-state",
dependencies=[Depends(TokenValidator())],
)
def get_lecture_unit_ingestion_state(
course_id: int, lecture_id: int, lecture_unit_id: int, base_url: str = Query(...)
):
"""
:param course_id:
:param lecture_id:
:param lecture_unit_id:
:param base_url:
:return:
"""
db = VectorDatabase()
decoded_base_url = unquote(base_url)
result = db.lectures.query.fetch_objects(
filters=(
Filter.by_property(LectureSchema.BASE_URL.value).equal(decoded_base_url)
& Filter.by_property(LectureSchema.COURSE_ID.value).equal(course_id)
& Filter.by_property(LectureSchema.LECTURE_ID.value).equal(lecture_id)
& Filter.by_property(LectureSchema.LECTURE_UNIT_ID.value).equal(
lecture_unit_id
)
),
limit=1,
return_properties=[LectureSchema.LECTURE_UNIT_NAME.value],
)

if len(result.objects) > 0:
return Response(
status_code=status.HTTP_200_OK,
content=json.dumps({"state": IngestionState.DONE.value}),
media_type="application/json",
)
else:
return Response(
status_code=status.HTTP_200_OK,
content=json.dumps({"state": IngestionState.NOT_STARTED.value}),
media_type="application/json",
)
42 changes: 39 additions & 3 deletions app/web/routers/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
from app.domain.ingestion.ingestion_pipeline_execution_dto import (
IngestionPipelineExecutionDto,
)
from ..status.IngestionStatusCallback import IngestionStatusCallback
from ..status.ingestion_status_callback import IngestionStatusCallback
from ..status.lecture_deletion_status_callback import LecturesDeletionStatusCallback
from ...domain.ingestion.deletionPipelineExecutionDto import (
LecturesDeletionExecutionDto,
)
from ...pipeline.lecture_ingestion_pipeline import LectureIngestionPipeline
from ...vector_database.database import VectorDatabase

router = APIRouter(prefix="/api/v1/webhooks", tags=["webhooks"])


semaphore = Semaphore(5)


Expand All @@ -29,6 +32,7 @@ def run_lecture_update_pipeline_worker(dto: IngestionPipelineExecutionDto):
run_id=dto.settings.authentication_token,
base_url=dto.settings.artemis_base_url,
initial_stages=dto.initial_stages,
lecture_unit_id=dto.lecture_unit.lecture_unit_id,
)
db = VectorDatabase()
client = db.get_client()
Expand All @@ -44,14 +48,46 @@ def run_lecture_update_pipeline_worker(dto: IngestionPipelineExecutionDto):
semaphore.release()


def run_lecture_deletion_pipeline_worker(dto: LecturesDeletionExecutionDto):
"""
Run the exercise chat pipeline in a separate thread
"""
try:
callback = LecturesDeletionStatusCallback(
run_id=dto.settings.authentication_token,
base_url=dto.settings.artemis_base_url,
initial_stages=dto.initial_stages,
)
db = VectorDatabase()
client = db.get_client()
pipeline = LectureIngestionPipeline(client=client, dto=None, callback=callback)
pipeline.delete_old_lectures(dto.lecture_units, dto.settings.artemis_base_url)
except Exception as e:
logger.error(f"Error while deleting lectures: {e}")
logger.error(traceback.format_exc())


@router.post(
"/lectures/fullIngestion",
status_code=status.HTTP_202_ACCEPTED,
dependencies=[Depends(TokenValidator())],
)
def lecture_webhook(dto: IngestionPipelineExecutionDto):
def lecture_ingestion_webhook(dto: IngestionPipelineExecutionDto):
"""
Webhook endpoint to trigger the exercise chat pipeline
"""
thread = Thread(target=run_lecture_update_pipeline_worker, args=(dto,))
thread.start()


@router.post(
"/lectures/delete",
status_code=status.HTTP_202_ACCEPTED,
dependencies=[Depends(TokenValidator())],
)
def lecture_deletion_webhook(dto: LecturesDeletionExecutionDto):
"""
Webhook endpoint to trigger the lecture deletion
"""
thread = Thread(target=run_lecture_deletion_pipeline_worker, args=(dto,))
thread.start()
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@

class IngestionStatusCallback(StatusCallback):
"""
Callback class for updating the status of a Tutor Chat pipeline run.
Callback class for updating the status of a Lecture ingestion Pipeline run.
"""

def __init__(
self, run_id: str, base_url: str, initial_stages: List[StageDTO] = None
self,
run_id: str,
base_url: str,
initial_stages: List[StageDTO] = None,
lecture_unit_id: int = None,
):
url = f"{base_url}/api/public/pyris/webhooks/ingestion/runs/{run_id}/status"

Expand All @@ -36,6 +40,6 @@ def __init__(
name="Slides ingestion",
),
]
status = IngestionStatusUpdateDTO(stages=stages)
status = IngestionStatusUpdateDTO(stages=stages, id=lecture_unit_id)
stage = stages[current_stage_index]
super().__init__(url, run_id, status, stage, current_stage_index)
31 changes: 31 additions & 0 deletions app/web/status/lecture_deletion_status_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import List

from .status_update import StatusCallback
from ...domain.ingestion.ingestion_status_update_dto import IngestionStatusUpdateDTO
from ...domain.status.stage_state_dto import StageStateEnum
from ...domain.status.stage_dto import StageDTO
import logging

logger = logging.getLogger(__name__)


class LecturesDeletionStatusCallback(StatusCallback):
"""
Callback class for updating the status of a Tutor Chat pipeline run.
"""

def __init__(
self, run_id: str, base_url: str, initial_stages: List[StageDTO] = None
):
url = f"{base_url}/api/public/pyris/webhooks/ingestion/runs/{run_id}/status"

current_stage_index = len(initial_stages) if initial_stages else 0
stages = initial_stages or []
stages += [
StageDTO(
weight=100, state=StageStateEnum.NOT_STARTED, name="Slides removal"
),
]
status = IngestionStatusUpdateDTO(stages=stages)
stage = stages[current_stage_index]
super().__init__(url, run_id, status, stage, current_stage_index)
Empty file.

0 comments on commit fad216c

Please sign in to comment.