diff --git a/app/domain/data/lecture_unit_dto.py b/app/domain/data/lecture_unit_dto.py index 26ef1785..de9cb25b 100644 --- a/app/domain/data/lecture_unit_dto.py +++ b/app/domain/data/lecture_unit_dto.py @@ -2,8 +2,6 @@ class LectureUnitDTO(BaseModel): - to_update: bool = Field(alias="toUpdate") - base_url: str = Field(alias="artemisBaseUrl") pdf_file_base64: str = Field(default="", alias="pdfFile") lecture_unit_id: int = Field(alias="lectureUnitId") lecture_unit_name: str = Field(default="", alias="lectureUnitName") diff --git a/app/domain/ingestion/deletionPipelineExecutionDto.py b/app/domain/ingestion/deletionPipelineExecutionDto.py new file mode 100644 index 00000000..1cec7cdd --- /dev/null +++ b/app/domain/ingestion/deletionPipelineExecutionDto.py @@ -0,0 +1,15 @@ +from typing import List, Optional + +from pydantic import Field + +from app.domain import PipelineExecutionDTO, PipelineExecutionSettingsDTO +from app.domain.data.lecture_unit_dto import LectureUnitDTO +from app.domain.status.stage_dto import StageDTO + + +class LecturesDeletionExecutionDto(PipelineExecutionDTO): + lecture_units: List[LectureUnitDTO] = Field(..., alias="pyrisLectureUnits") + settings: Optional[PipelineExecutionSettingsDTO] + initial_stages: Optional[List[StageDTO]] = Field( + default=None, alias="initialStages" + ) diff --git a/app/domain/ingestion/ingestion_pipeline_execution_dto.py b/app/domain/ingestion/ingestion_pipeline_execution_dto.py index e8a9882f..12f3205f 100644 --- a/app/domain/ingestion/ingestion_pipeline_execution_dto.py +++ b/app/domain/ingestion/ingestion_pipeline_execution_dto.py @@ -1,12 +1,15 @@ -from typing import List +from typing import List, Optional from pydantic import Field -from app.domain import PipelineExecutionDTO +from app.domain import PipelineExecutionDTO, PipelineExecutionSettingsDTO from app.domain.data.lecture_unit_dto import LectureUnitDTO +from app.domain.status.stage_dto import StageDTO class IngestionPipelineExecutionDto(PipelineExecutionDTO): - lecture_units: List[LectureUnitDTO] = Field( - ..., alias="pyrisLectureUnitWebhookDTOS" + lecture_unit: LectureUnitDTO = Field(..., alias="pyrisLectureUnit") + settings: Optional[PipelineExecutionSettingsDTO] + initial_stages: Optional[List[StageDTO]] = Field( + default=None, alias="initialStages" ) diff --git a/app/domain/ingestion/ingestion_status_update_dto.py b/app/domain/ingestion/ingestion_status_update_dto.py index 351b9e6f..1b7b45b3 100644 --- a/app/domain/ingestion/ingestion_status_update_dto.py +++ b/app/domain/ingestion/ingestion_status_update_dto.py @@ -5,3 +5,4 @@ class IngestionStatusUpdateDTO(StatusUpdateDTO): result: Optional[str] = None + id: Optional[int] = None diff --git a/app/main.py b/app/main.py index 9d394f81..1abc5200 100644 --- a/app/main.py +++ b/app/main.py @@ -7,6 +7,7 @@ from app.web.routers.health import router as health_router from app.web.routers.pipelines import router as pipelines_router from app.web.routers.webhooks import router as webhooks_router +from app.web.routers.ingestion_status import router as ingestion_status_router import logging from fastapi import FastAPI, Request, status @@ -57,3 +58,4 @@ async def some_middleware(request: Request, call_next): app.include_router(health_router) app.include_router(pipelines_router) app.include_router(webhooks_router) +app.include_router(ingestion_status_router) diff --git a/app/pipeline/lecture_ingestion_pipeline.py b/app/pipeline/lecture_ingestion_pipeline.py index 73d6371b..8e7126d0 100644 --- a/app/pipeline/lecture_ingestion_pipeline.py +++ b/app/pipeline/lecture_ingestion_pipeline.py @@ -3,6 +3,8 @@ import tempfile import threading from asyncio.log import logger +from typing import Optional + import fitz from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate @@ -28,9 +30,10 @@ CapabilityRequestHandler, RequirementList, ) -from ..web.status import IngestionStatusCallback from langchain_text_splitters import RecursiveCharacterTextSplitter +from ..web.status import ingestion_status_callback + batch_update_lock = threading.Lock() @@ -91,8 +94,8 @@ class LectureIngestionPipeline(AbstractIngestion, Pipeline): def __init__( self, client: WeaviateClient, - dto: IngestionPipelineExecutionDto, - callback: IngestionStatusCallback, + dto: Optional[IngestionPipelineExecutionDto], + callback: ingestion_status_callback, ): super().__init__() self.collection = init_lecture_schema(client) @@ -118,33 +121,31 @@ def __init__( def __call__(self) -> bool: try: self.callback.in_progress("Deleting old slides from database...") - self.delete_old_lectures() + self.delete_lecture_unit( + self.dto.lecture_unit.course_id, + self.dto.lecture_unit.lecture_id, + self.dto.lecture_unit.lecture_unit_id, + self.dto.settings.artemis_base_url, + ) self.callback.done("Old slides removed") - # Here we check if the operation is for updating or for deleting, - # we only check the first file because all the files will have the same operation - if not self.dto.lecture_units[0].to_update: - self.callback.skip("Lecture Chunking and interpretation Skipped") - self.callback.skip("No new slides to update") - return True self.callback.in_progress("Chunking and interpreting lecture...") chunks = [] - for i, lecture_unit in enumerate(self.dto.lecture_units): - pdf_path = save_pdf(lecture_unit.pdf_file_base64) - chunks.extend( - self.chunk_data( - lecture_pdf=pdf_path, - lecture_unit_dto=lecture_unit, - base_url=self.dto.settings.artemis_base_url, - ) + pdf_path = save_pdf(self.dto.lecture_unit.pdf_file_base64) + chunks.extend( + self.chunk_data( + lecture_pdf=pdf_path, + lecture_unit_dto=self.dto.lecture_unit, + base_url=self.dto.settings.artemis_base_url, ) - cleanup_temporary_file(pdf_path) + ) + cleanup_temporary_file(pdf_path) self.callback.done("Lecture Chunking and interpretation Finished") self.callback.in_progress("Ingesting lecture chunks into database...") self.batch_update(chunks) self.callback.done("Lecture Ingestion Finished", tokens=self.tokens) logger.info( f"Lecture ingestion pipeline finished Successfully for course " - f"{self.dto.lecture_units[0].course_name}" + f"{self.dto.lecture_unit.course_name}" ) return True except Exception as e: @@ -306,23 +307,27 @@ def get_course_language(self, page_content: str) -> str: self._append_tokens(response.token_usage, PipelineEnum.IRIS_LECTURE_INGESTION) return response.contents[0].text_content - def delete_old_lectures(self): + def delete_old_lectures( + self, lecture_units: list[LectureUnitDTO], artemis_base_url: str + ): """ Delete the lecture unit from the database """ try: - for lecture_unit in self.dto.lecture_units: + for lecture_unit in lecture_units: if self.delete_lecture_unit( lecture_unit.course_id, lecture_unit.lecture_id, lecture_unit.lecture_unit_id, - self.dto.settings.artemis_base_url, + artemis_base_url, ): logger.info("Lecture deleted successfully") else: logger.error("Failed to delete lecture") + self.callback.done("Old slides removed") except Exception as e: logger.error(f"Error deleting lecture unit: {e}") + self.callback.error("Error while removing old slides") return False def delete_lecture_unit(self, course_id, lecture_id, lecture_unit_id, base_url): diff --git a/app/web/routers/ingestion_status.py b/app/web/routers/ingestion_status.py new file mode 100644 index 00000000..75868765 --- /dev/null +++ b/app/web/routers/ingestion_status.py @@ -0,0 +1,62 @@ +import json +from urllib.parse import unquote + +from fastapi import APIRouter, status, Response, Depends +from fastapi.params import Query +from weaviate.collections.classes.filters import Filter + +from app.dependencies import TokenValidator +from ...vector_database.database import VectorDatabase +from ...vector_database.lecture_schema import LectureSchema +from enum import Enum + +router = APIRouter(prefix="/api/v1", tags=["ingestion_status"]) + + +class IngestionState(str, Enum): + DONE = "DONE" + NOT_STARTED = "NOT_STARTED" + + +@router.get( + "/courses/{course_id}/lectures/{lecture_id}/lectureUnits/{lecture_unit_id}/ingestion-state", + dependencies=[Depends(TokenValidator())], +) +def get_lecture_unit_ingestion_state( + course_id: int, lecture_id: int, lecture_unit_id: int, base_url: str = Query(...) +): + """ + + :param course_id: + :param lecture_id: + :param lecture_unit_id: + :param base_url: + :return: + """ + db = VectorDatabase() + decoded_base_url = unquote(base_url) + result = db.lectures.query.fetch_objects( + filters=( + Filter.by_property(LectureSchema.BASE_URL.value).equal(decoded_base_url) + & Filter.by_property(LectureSchema.COURSE_ID.value).equal(course_id) + & Filter.by_property(LectureSchema.LECTURE_ID.value).equal(lecture_id) + & Filter.by_property(LectureSchema.LECTURE_UNIT_ID.value).equal( + lecture_unit_id + ) + ), + limit=1, + return_properties=[LectureSchema.LECTURE_UNIT_NAME.value], + ) + + if len(result.objects) > 0: + return Response( + status_code=status.HTTP_200_OK, + content=json.dumps({"state": IngestionState.DONE.value}), + media_type="application/json", + ) + else: + return Response( + status_code=status.HTTP_200_OK, + content=json.dumps({"state": IngestionState.NOT_STARTED.value}), + media_type="application/json", + ) diff --git a/app/web/routers/webhooks.py b/app/web/routers/webhooks.py index 65fbc810..739a9bbb 100644 --- a/app/web/routers/webhooks.py +++ b/app/web/routers/webhooks.py @@ -9,13 +9,16 @@ from app.domain.ingestion.ingestion_pipeline_execution_dto import ( IngestionPipelineExecutionDto, ) -from ..status.IngestionStatusCallback import IngestionStatusCallback +from ..status.ingestion_status_callback import IngestionStatusCallback +from ..status.lecture_deletion_status_callback import LecturesDeletionStatusCallback +from ...domain.ingestion.deletionPipelineExecutionDto import ( + LecturesDeletionExecutionDto, +) from ...pipeline.lecture_ingestion_pipeline import LectureIngestionPipeline from ...vector_database.database import VectorDatabase router = APIRouter(prefix="/api/v1/webhooks", tags=["webhooks"]) - semaphore = Semaphore(5) @@ -29,6 +32,7 @@ def run_lecture_update_pipeline_worker(dto: IngestionPipelineExecutionDto): run_id=dto.settings.authentication_token, base_url=dto.settings.artemis_base_url, initial_stages=dto.initial_stages, + lecture_unit_id=dto.lecture_unit.lecture_unit_id, ) db = VectorDatabase() client = db.get_client() @@ -44,14 +48,46 @@ def run_lecture_update_pipeline_worker(dto: IngestionPipelineExecutionDto): semaphore.release() +def run_lecture_deletion_pipeline_worker(dto: LecturesDeletionExecutionDto): + """ + Run the exercise chat pipeline in a separate thread + """ + try: + callback = LecturesDeletionStatusCallback( + run_id=dto.settings.authentication_token, + base_url=dto.settings.artemis_base_url, + initial_stages=dto.initial_stages, + ) + db = VectorDatabase() + client = db.get_client() + pipeline = LectureIngestionPipeline(client=client, dto=None, callback=callback) + pipeline.delete_old_lectures(dto.lecture_units, dto.settings.artemis_base_url) + except Exception as e: + logger.error(f"Error while deleting lectures: {e}") + logger.error(traceback.format_exc()) + + @router.post( "/lectures/fullIngestion", status_code=status.HTTP_202_ACCEPTED, dependencies=[Depends(TokenValidator())], ) -def lecture_webhook(dto: IngestionPipelineExecutionDto): +def lecture_ingestion_webhook(dto: IngestionPipelineExecutionDto): """ Webhook endpoint to trigger the exercise chat pipeline """ thread = Thread(target=run_lecture_update_pipeline_worker, args=(dto,)) thread.start() + + +@router.post( + "/lectures/delete", + status_code=status.HTTP_202_ACCEPTED, + dependencies=[Depends(TokenValidator())], +) +def lecture_deletion_webhook(dto: LecturesDeletionExecutionDto): + """ + Webhook endpoint to trigger the lecture deletion + """ + thread = Thread(target=run_lecture_deletion_pipeline_worker, args=(dto,)) + thread.start() diff --git a/app/web/status/IngestionStatusCallback.py b/app/web/status/ingestion_status_callback.py similarity index 79% rename from app/web/status/IngestionStatusCallback.py rename to app/web/status/ingestion_status_callback.py index a82a061c..b606c302 100644 --- a/app/web/status/IngestionStatusCallback.py +++ b/app/web/status/ingestion_status_callback.py @@ -11,11 +11,15 @@ class IngestionStatusCallback(StatusCallback): """ - Callback class for updating the status of a Tutor Chat pipeline run. + Callback class for updating the status of a Lecture ingestion Pipeline run. """ def __init__( - self, run_id: str, base_url: str, initial_stages: List[StageDTO] = None + self, + run_id: str, + base_url: str, + initial_stages: List[StageDTO] = None, + lecture_unit_id: int = None, ): url = f"{base_url}/api/public/pyris/webhooks/ingestion/runs/{run_id}/status" @@ -36,6 +40,6 @@ def __init__( name="Slides ingestion", ), ] - status = IngestionStatusUpdateDTO(stages=stages) + status = IngestionStatusUpdateDTO(stages=stages, id=lecture_unit_id) stage = stages[current_stage_index] super().__init__(url, run_id, status, stage, current_stage_index) diff --git a/app/web/status/lecture_deletion_status_callback.py b/app/web/status/lecture_deletion_status_callback.py new file mode 100644 index 00000000..3aaeb803 --- /dev/null +++ b/app/web/status/lecture_deletion_status_callback.py @@ -0,0 +1,31 @@ +from typing import List + +from .status_update import StatusCallback +from ...domain.ingestion.ingestion_status_update_dto import IngestionStatusUpdateDTO +from ...domain.status.stage_state_dto import StageStateEnum +from ...domain.status.stage_dto import StageDTO +import logging + +logger = logging.getLogger(__name__) + + +class LecturesDeletionStatusCallback(StatusCallback): + """ + Callback class for updating the status of a Tutor Chat pipeline run. + """ + + def __init__( + self, run_id: str, base_url: str, initial_stages: List[StageDTO] = None + ): + url = f"{base_url}/api/public/pyris/webhooks/ingestion/runs/{run_id}/status" + + current_stage_index = len(initial_stages) if initial_stages else 0 + stages = initial_stages or [] + stages += [ + StageDTO( + weight=100, state=StageStateEnum.NOT_STARTED, name="Slides removal" + ), + ] + status = IngestionStatusUpdateDTO(stages=stages) + stage = stages[current_stage_index] + super().__init__(url, run_id, status, stage, current_stage_index) diff --git a/docker/.docker-data/weaviate-data/.gitkeep b/docker/.docker-data/weaviate-data/.gitkeep deleted file mode 100644 index e69de29b..00000000