Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingestion status callback update #142

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions app/domain/data/lecture_unit_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@


class LectureUnitDTO(BaseModel):
to_update: bool = Field(alias="toUpdate")
base_url: str = Field(alias="artemisBaseUrl")
pdf_file_base64: str = Field(default="", alias="pdfFile")
lecture_unit_id: int = Field(alias="lectureUnitId")
lecture_unit_name: str = Field(default="", alias="lectureUnitName")
Expand Down
15 changes: 15 additions & 0 deletions app/domain/ingestion/deletionPipelineExecutionDto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import List, Optional

from pydantic import Field

from app.domain import PipelineExecutionDTO, PipelineExecutionSettingsDTO
from app.domain.data.lecture_unit_dto import LectureUnitDTO
from app.domain.status.stage_dto import StageDTO


class LecturesDeletionExecutionDto(PipelineExecutionDTO):
lecture_units: List[LectureUnitDTO] = Field(..., alias="pyrisLectureUnits")
settings: Optional[PipelineExecutionSettingsDTO]
initial_stages: Optional[List[StageDTO]] = Field(
default=None, alias="initialStages"
)
11 changes: 7 additions & 4 deletions app/domain/ingestion/ingestion_pipeline_execution_dto.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from typing import List
from typing import List, Optional

from pydantic import Field

from app.domain import PipelineExecutionDTO
from app.domain import PipelineExecutionDTO, PipelineExecutionSettingsDTO
from app.domain.data.lecture_unit_dto import LectureUnitDTO
from app.domain.status.stage_dto import StageDTO


class IngestionPipelineExecutionDto(PipelineExecutionDTO):
lecture_units: List[LectureUnitDTO] = Field(
..., alias="pyrisLectureUnitWebhookDTOS"
lecture_unit: LectureUnitDTO = Field(..., alias="pyrisLectureUnit")
settings: Optional[PipelineExecutionSettingsDTO]
initial_stages: Optional[List[StageDTO]] = Field(
default=None, alias="initialStages"
)
yassinsws marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions app/domain/ingestion/ingestion_status_update_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@

class IngestionStatusUpdateDTO(StatusUpdateDTO):
result: Optional[str] = None
id: Optional[int] = None
2 changes: 2 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from app.web.routers.health import router as health_router
from app.web.routers.pipelines import router as pipelines_router
from app.web.routers.webhooks import router as webhooks_router
from app.web.routers.ingestion_status import router as ingestion_status_router

import logging
from fastapi import FastAPI, Request, status
Expand Down Expand Up @@ -57,3 +58,4 @@ async def some_middleware(request: Request, call_next):
app.include_router(health_router)
app.include_router(pipelines_router)
app.include_router(webhooks_router)
app.include_router(ingestion_status_router)
51 changes: 28 additions & 23 deletions app/pipeline/lecture_ingestion_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import tempfile
import threading
from asyncio.log import logger
from typing import Optional

import fitz
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
Expand All @@ -27,9 +29,10 @@
CapabilityRequestHandler,
RequirementList,
)
from ..web.status import IngestionStatusCallback
from langchain_text_splitters import RecursiveCharacterTextSplitter

from ..web.status import ingestion_status_callback

Comment on lines +35 to +36
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Clarify the type annotation for callback parameter

The callback parameter is annotated with ingestion_status_callback, which seems to be a function rather than a type. If it's intended to be a type hint for a callable, consider using Callable from the typing module.

For example:

-from ..web.status import ingestion_status_callback
+from typing import Callable

...

     def __init__(
         self,
         client: WeaviateClient,
         dto: Optional[IngestionPipelineExecutionDto],
-        callback: ingestion_status_callback,
+        callback: Callable,
     ):

Alternatively, if ingestion_status_callback is a custom type, ensure it follows naming conventions (e.g., IngestionStatusCallback).

Also applies to: 97-98

batch_update_lock = threading.Lock()


Expand Down Expand Up @@ -90,8 +93,8 @@ class LectureIngestionPipeline(AbstractIngestion, Pipeline):
def __init__(
self,
client: WeaviateClient,
dto: IngestionPipelineExecutionDto,
callback: IngestionStatusCallback,
dto: Optional[IngestionPipelineExecutionDto],
callback: ingestion_status_callback,
Comment on lines +97 to +98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle None value for optional dto parameter

Since dto is now optional, ensure that all usages of self.dto in the class handle the case where self.dto is None to prevent potential AttributeErrors.

):
super().__init__()
self.collection = init_lecture_schema(client)
Expand All @@ -116,33 +119,31 @@ def __init__(
def __call__(self) -> bool:
try:
self.callback.in_progress("Deleting old slides from database...")
self.delete_old_lectures()
self.delete_lecture_unit(
self.dto.lecture_unit.course_id,
self.dto.lecture_unit.lecture_id,
self.dto.lecture_unit.lecture_unit_id,
self.dto.settings.artemis_base_url,
)
yassinsws marked this conversation as resolved.
Show resolved Hide resolved
self.callback.done("Old slides removed")
# Here we check if the operation is for updating or for deleting,
# we only check the first file because all the files will have the same operation
if not self.dto.lecture_units[0].to_update:
self.callback.skip("Lecture Chunking and interpretation Skipped")
self.callback.skip("No new slides to update")
return True
self.callback.in_progress("Chunking and interpreting lecture...")
chunks = []
for i, lecture_unit in enumerate(self.dto.lecture_units):
pdf_path = save_pdf(lecture_unit.pdf_file_base64)
chunks.extend(
self.chunk_data(
lecture_pdf=pdf_path,
lecture_unit_dto=lecture_unit,
base_url=self.dto.settings.artemis_base_url,
)
pdf_path = save_pdf(self.dto.lecture_unit.pdf_file_base64)
chunks.extend(
self.chunk_data(
lecture_pdf=pdf_path,
lecture_unit_dto=self.dto.lecture_unit,
base_url=self.dto.settings.artemis_base_url,
)
cleanup_temporary_file(pdf_path)
)
cleanup_temporary_file(pdf_path)
Comment on lines +133 to +141
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure temporary files are cleaned up even on exceptions

If an exception occurs before cleanup_temporary_file(pdf_path) is called, the temporary file may remain on the system. To guarantee cleanup regardless of success or failure, consider using a finally block.

Apply this diff to modify the code:

         pdf_path = save_pdf(self.dto.lecture_unit.pdf_file_base64)
-        chunks.extend(
-            self.chunk_data(
-                lecture_pdf=pdf_path,
-                lecture_unit_dto=self.dto.lecture_unit,
-                base_url=self.dto.settings.artemis_base_url,
-            )
-        )
-        cleanup_temporary_file(pdf_path)
+        try:
+            chunks.extend(
+                self.chunk_data(
+                    lecture_pdf=pdf_path,
+                    lecture_unit_dto=self.dto.lecture_unit,
+                    base_url=self.dto.settings.artemis_base_url,
+                )
+            )
+        finally:
+            cleanup_temporary_file(pdf_path)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pdf_path = save_pdf(self.dto.lecture_unit.pdf_file_base64)
chunks.extend(
self.chunk_data(
lecture_pdf=pdf_path,
lecture_unit_dto=self.dto.lecture_unit,
base_url=self.dto.settings.artemis_base_url,
)
cleanup_temporary_file(pdf_path)
)
cleanup_temporary_file(pdf_path)
pdf_path = save_pdf(self.dto.lecture_unit.pdf_file_base64)
try:
chunks.extend(
self.chunk_data(
lecture_pdf=pdf_path,
lecture_unit_dto=self.dto.lecture_unit,
base_url=self.dto.settings.artemis_base_url,
)
)
finally:
cleanup_temporary_file(pdf_path)

self.callback.done("Lecture Chunking and interpretation Finished")
self.callback.in_progress("Ingesting lecture chunks into database...")
self.batch_update(chunks)
self.callback.done("Lecture Ingestion Finished")
logger.info(
f"Lecture ingestion pipeline finished Successfully for course "
f"{self.dto.lecture_units[0].course_name}"
f"{self.dto.lecture_unit.course_name}"
)
return True
except Exception as e:
Expand Down Expand Up @@ -294,23 +295,27 @@ def get_course_language(self, page_content: str) -> str:
)
return response.contents[0].text_content

def delete_old_lectures(self):
def delete_old_lectures(
self, lecture_units: list[LectureUnitDTO], artemis_base_url: str
):
"""
Delete the lecture unit from the database
"""
try:
for lecture_unit in self.dto.lecture_units:
for lecture_unit in lecture_units:
if self.delete_lecture_unit(
lecture_unit.course_id,
lecture_unit.lecture_id,
lecture_unit.lecture_unit_id,
self.dto.settings.artemis_base_url,
artemis_base_url,
):
logger.info("Lecture deleted successfully")
else:
logger.error("Failed to delete lecture")
self.callback.done("Old slides removed")
except Exception as e:
logger.error(f"Error deleting lecture unit: {e}")
self.callback.error("Error while removing old slides")
return False

def delete_lecture_unit(self, course_id, lecture_id, lecture_unit_id, base_url):
Expand Down
46 changes: 46 additions & 0 deletions app/web/routers/ingestion_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from urllib.parse import unquote

from fastapi import APIRouter, status, Response, Depends
from weaviate.collections.classes.filters import Filter

from app.dependencies import TokenValidator
from ...vector_database.database import VectorDatabase
from ...vector_database.lecture_schema import LectureSchema

router = APIRouter(prefix="/api/v1", tags=["ingestion_status"])


@router.get(
"/courses/{course_id}/lectures/{lecture_id}/lectureUnits/{lecture_unit_id}/ingestion-state",
dependencies=[Depends(TokenValidator())],
)
def get_lecture_unit_ingestion_state(
course_id: int, lecture_id: int, lecture_unit_id: int, base_url: str
):
yassinsws marked this conversation as resolved.
Show resolved Hide resolved
db = VectorDatabase()
decoded_base_url = unquote(base_url)
result = db.lectures.query.fetch_objects(
filters=(
Filter.by_property(LectureSchema.BASE_URL.value).equal(decoded_base_url)
& Filter.by_property(LectureSchema.COURSE_ID.value).equal(course_id)
& Filter.by_property(LectureSchema.LECTURE_ID.value).equal(lecture_id)
& Filter.by_property(LectureSchema.LECTURE_UNIT_ID.value).equal(
lecture_unit_id
)
),
limit=1,
return_properties=[LectureSchema.LECTURE_UNIT_NAME.value],
)
yassinsws marked this conversation as resolved.
Show resolved Hide resolved

if len(result.objects) > 0:
return Response(
status_code=status.HTTP_200_OK,
content='"DONE"',
yassinsws marked this conversation as resolved.
Show resolved Hide resolved
media_type="application/json",
)
yassinsws marked this conversation as resolved.
Show resolved Hide resolved
else:
return Response(
status_code=status.HTTP_200_OK,
content='"NOT_STARTED"',
media_type="application/json",
)
42 changes: 39 additions & 3 deletions app/web/routers/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
from app.domain.ingestion.ingestion_pipeline_execution_dto import (
IngestionPipelineExecutionDto,
)
from ..status.IngestionStatusCallback import IngestionStatusCallback
from ..status.ingestion_status_callback import IngestionStatusCallback
from ..status.lecture_deletion_status_callback import LecturesDeletionStatusCallback
from ...domain.ingestion.deletionPipelineExecutionDto import (
LecturesDeletionExecutionDto,
)
from ...pipeline.lecture_ingestion_pipeline import LectureIngestionPipeline
from ...vector_database.database import VectorDatabase

router = APIRouter(prefix="/api/v1/webhooks", tags=["webhooks"])


semaphore = Semaphore(5)


Expand All @@ -29,6 +32,7 @@ def run_lecture_update_pipeline_worker(dto: IngestionPipelineExecutionDto):
run_id=dto.settings.authentication_token,
base_url=dto.settings.artemis_base_url,
initial_stages=dto.initial_stages,
lecture_unit_id=dto.lecture_unit.lecture_unit_id,
)
db = VectorDatabase()
client = db.get_client()
Expand All @@ -44,14 +48,46 @@ def run_lecture_update_pipeline_worker(dto: IngestionPipelineExecutionDto):
semaphore.release()


def run_lecture_deletion_pipeline_worker(dto: LecturesDeletionExecutionDto):
"""
Run the exercise chat pipeline in a separate thread
"""
try:
callback = LecturesDeletionStatusCallback(
run_id=dto.settings.authentication_token,
base_url=dto.settings.artemis_base_url,
initial_stages=dto.initial_stages,
)
db = VectorDatabase()
client = db.get_client()
pipeline = LectureIngestionPipeline(client=client, dto=None, callback=callback)
pipeline.delete_old_lectures(dto.lecture_units)
except Exception as e:
logger.error(f"Error while deleting lectures: {e}")
logger.error(traceback.format_exc())


@router.post(
"/lectures/fullIngestion",
status_code=status.HTTP_202_ACCEPTED,
dependencies=[Depends(TokenValidator())],
)
def lecture_webhook(dto: IngestionPipelineExecutionDto):
def lecture_ingestion_webhook(dto: IngestionPipelineExecutionDto):
"""
Webhook endpoint to trigger the exercise chat pipeline
"""
thread = Thread(target=run_lecture_update_pipeline_worker, args=(dto,))
thread.start()


@router.post(
"/lectures/delete",
status_code=status.HTTP_202_ACCEPTED,
dependencies=[Depends(TokenValidator())],
)
def lecture_deletion_webhook(dto: LecturesDeletionExecutionDto):
"""
Webhook endpoint to trigger the exercise chat pipeline
"""
thread = Thread(target=run_lecture_deletion_pipeline_worker, args=(dto,))
thread.start()
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@

class IngestionStatusCallback(StatusCallback):
"""
Callback class for updating the status of a Tutor Chat pipeline run.
Callback class for updating the status of a Lecture ingestion Pipeline run.
"""

def __init__(
self, run_id: str, base_url: str, initial_stages: List[StageDTO] = None
self,
run_id: str,
base_url: str,
initial_stages: List[StageDTO] = None,
lecture_unit_id: int = None,
):
url = f"{base_url}/api/public/pyris/webhooks/ingestion/runs/{run_id}/status"

Expand All @@ -36,6 +40,6 @@ def __init__(
name="Slides ingestion",
),
]
status = IngestionStatusUpdateDTO(stages=stages)
status = IngestionStatusUpdateDTO(stages=stages, id=lecture_unit_id)
stage = stages[current_stage_index]
super().__init__(url, run_id, status, stage, current_stage_index)
31 changes: 31 additions & 0 deletions app/web/status/lecture_deletion_status_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import List

from .status_update import StatusCallback
from ...domain.ingestion.ingestion_status_update_dto import IngestionStatusUpdateDTO
from ...domain.status.stage_state_dto import StageStateEnum
from ...domain.status.stage_dto import StageDTO
import logging

logger = logging.getLogger(__name__)


class LecturesDeletionStatusCallback(StatusCallback):
"""
Callback class for updating the status of a Tutor Chat pipeline run.
"""

def __init__(
self, run_id: str, base_url: str, initial_stages: List[StageDTO] = None
):
url = f"{base_url}/api/public/pyris/webhooks/ingestion/runs/{run_id}/status"

current_stage_index = len(initial_stages) if initial_stages else 0
stages = initial_stages or []
stages += [
StageDTO(
weight=100, state=StageStateEnum.NOT_STARTED, name="Slides removal"
),
]
status = IngestionStatusUpdateDTO(stages=stages)
stage = stages[current_stage_index]
super().__init__(url, run_id, status, stage, current_stage_index)
Empty file.
Loading