From 2a11f63ba7604c4350b7d8f02f25e2f04d1b7627 Mon Sep 17 00:00:00 2001 From: Yassine Souissi Date: Thu, 5 Sep 2024 17:59:00 +0200 Subject: [PATCH] Make Ingestion Status updated through api calls --- app/web/routers/ingestion_status.py | 40 +++++++++++++++++ app/web/status/ingestion_status_callback.py | 45 +++++++++++++++++++ .../lecture_deletion_status_callback.py | 31 +++++++++++++ 3 files changed, 116 insertions(+) create mode 100644 app/web/routers/ingestion_status.py create mode 100644 app/web/status/ingestion_status_callback.py create mode 100644 app/web/status/lecture_deletion_status_callback.py diff --git a/app/web/routers/ingestion_status.py b/app/web/routers/ingestion_status.py new file mode 100644 index 00000000..aac887f1 --- /dev/null +++ b/app/web/routers/ingestion_status.py @@ -0,0 +1,40 @@ +from urllib.parse import unquote + +from fastapi import APIRouter, status, Response, Depends +from weaviate.collections.classes.filters import Filter + +from app.dependencies import TokenValidator +from ...vector_database.database import VectorDatabase +from ...vector_database.lecture_schema import LectureSchema + +router = APIRouter(prefix="/api/v1", tags=["ingestion_status"]) + + +@router.get("/courses/{course_id}/lectures/{lecture_id}/lectureUnits/{lecture_unit_id}/ingestion-state", + dependencies=[Depends(TokenValidator())]) +def get_lecture_unit_ingestion_state(course_id: int, lecture_id: int, lecture_unit_id: int, baseUrl: str): + db = VectorDatabase() + decoded_base_url = unquote(baseUrl) + result = db.lectures.query.fetch_objects( + filters=( + Filter.by_property(LectureSchema.BASE_URL.value).equal(decoded_base_url) + & Filter.by_property(LectureSchema.COURSE_ID.value).equal(course_id) + & Filter.by_property(LectureSchema.LECTURE_ID.value).equal(lecture_id) + & Filter.by_property(LectureSchema.LECTURE_UNIT_ID.value).equal(lecture_unit_id) + ), + limit=1, + return_properties=[LectureSchema.LECTURE_UNIT_NAME.value], + ) + + if len(result.objects) > 0: + return Response( + status_code=status.HTTP_200_OK, + content='"DONE"', + media_type="application/json" + ) + else: + return Response( + status_code=status.HTTP_200_OK, + content='"NOT_STARTED"', + media_type="application/json" + ) diff --git a/app/web/status/ingestion_status_callback.py b/app/web/status/ingestion_status_callback.py new file mode 100644 index 00000000..b606c302 --- /dev/null +++ b/app/web/status/ingestion_status_callback.py @@ -0,0 +1,45 @@ +from typing import List + +from .status_update import StatusCallback +from ...domain.ingestion.ingestion_status_update_dto import IngestionStatusUpdateDTO +from ...domain.status.stage_state_dto import StageStateEnum +from ...domain.status.stage_dto import StageDTO +import logging + +logger = logging.getLogger(__name__) + + +class IngestionStatusCallback(StatusCallback): + """ + Callback class for updating the status of a Lecture ingestion Pipeline run. + """ + + def __init__( + self, + run_id: str, + base_url: str, + initial_stages: List[StageDTO] = None, + lecture_unit_id: int = None, + ): + url = f"{base_url}/api/public/pyris/webhooks/ingestion/runs/{run_id}/status" + + current_stage_index = len(initial_stages) if initial_stages else 0 + stages = initial_stages or [] + stages += [ + StageDTO( + weight=10, state=StageStateEnum.NOT_STARTED, name="Old slides removal" + ), + StageDTO( + weight=60, + state=StageStateEnum.NOT_STARTED, + name="Slides Interpretation", + ), + StageDTO( + weight=30, + state=StageStateEnum.NOT_STARTED, + name="Slides ingestion", + ), + ] + status = IngestionStatusUpdateDTO(stages=stages, id=lecture_unit_id) + stage = stages[current_stage_index] + super().__init__(url, run_id, status, stage, current_stage_index) diff --git a/app/web/status/lecture_deletion_status_callback.py b/app/web/status/lecture_deletion_status_callback.py new file mode 100644 index 00000000..3aaeb803 --- /dev/null +++ b/app/web/status/lecture_deletion_status_callback.py @@ -0,0 +1,31 @@ +from typing import List + +from .status_update import StatusCallback +from ...domain.ingestion.ingestion_status_update_dto import IngestionStatusUpdateDTO +from ...domain.status.stage_state_dto import StageStateEnum +from ...domain.status.stage_dto import StageDTO +import logging + +logger = logging.getLogger(__name__) + + +class LecturesDeletionStatusCallback(StatusCallback): + """ + Callback class for updating the status of a Tutor Chat pipeline run. + """ + + def __init__( + self, run_id: str, base_url: str, initial_stages: List[StageDTO] = None + ): + url = f"{base_url}/api/public/pyris/webhooks/ingestion/runs/{run_id}/status" + + current_stage_index = len(initial_stages) if initial_stages else 0 + stages = initial_stages or [] + stages += [ + StageDTO( + weight=100, state=StageStateEnum.NOT_STARTED, name="Slides removal" + ), + ] + status = IngestionStatusUpdateDTO(stages=stages) + stage = stages[current_stage_index] + super().__init__(url, run_id, status, stage, current_stage_index)