Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade upload process #70

Merged
merged 20 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
1d89deb
Upgrade qdrant-client version to 1.10.1 to fix bug when using prefer_…
StreetLamb Jul 9, 2024
ac8f36d
Set prefer_grpc=true in QdrantClient for better performance
StreetLamb Jul 9, 2024
11aeceb
Upgrade langchain dependencies
StreetLamb Jul 9, 2024
91880b9
Use grpc port for QDRANT_URL and add FASTEMBED_CACHE_PATH.
StreetLamb Jul 11, 2024
44feb54
Set default MAX_WORKERS to 1 in .env.example
StreetLamb Jul 11, 2024
886f6ae
Add celery and redis
StreetLamb Jul 12, 2024
d5fa5e5
Update Dockerfile to create non-root user for celery worker. Update h…
StreetLamb Jul 12, 2024
9262e60
Add docker-compose configs for celery and redis
StreetLamb Jul 12, 2024
4aea69d
Update upload routes to trigger celery worker to process and upload f…
StreetLamb Jul 12, 2024
84bd0ce
Switch back to pymupdf as pymupdf4llm is cpu and memory intensive
StreetLamb Jul 12, 2024
70ac5d8
Dont show success modal for adding and deleting upload in frontend
StreetLamb Jul 12, 2024
8c89bba
Update build images command in local-deployment guide
StreetLamb Jul 12, 2024
190c1d9
Set MAX_WORKERS as optional in .env.example
StreetLamb Jul 12, 2024
2b8f421
Add celery stubs
StreetLamb Jul 12, 2024
e5b780a
Uninstall pymupdf4llm and install pymupdf
StreetLamb Jul 12, 2024
9ad6528
Fix mypy issues
StreetLamb Jul 12, 2024
3a5bcf7
Refactor
StreetLamb Jul 12, 2024
63d9efb
Update upload tests
StreetLamb Jul 12, 2024
c06eace
Set MAX_WORKERS default to 1
StreetLamb Jul 12, 2024
7365661
Fix embedding models and upload permission issues
StreetLamb Jul 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ [email protected]
FIRST_SUPERUSER_PASSWORD=changethis
USERS_OPEN_REGISTRATION=False
MAX_UPLOAD_SIZE=50_000_000
MAX_WORKERS=4
MAX_WORKERS=1 # Sets the number of processes

# llm provider keys. Add only to models that you want to use
OPENAI_API_KEY=
Expand Down
12 changes: 12 additions & 0 deletions backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,15 @@ COPY ./prestart.sh /app/
COPY ./tests-start.sh /app/

COPY ./app /app/app

# Create a non-root user and group
RUN addgroup --system celery && adduser --system --ingroup celery celery

# Create necessary directories
RUN mkdir -p /app/.cache/huggingface /app/cache/ /app/upload-data/

# Set HuggingFace cache directory environment variable
ENV HF_HOME=/app/.cache/huggingface

# Set the ownership and permissions for the /app directory and its contents
RUN chown -R celery:celery /app
102 changes: 36 additions & 66 deletions backend/app/api/routes/uploads.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from collections.abc import Callable
import os
import shutil
import uuid
from datetime import datetime
from tempfile import NamedTemporaryFile
from typing import IO, Annotated, Any

from fastapi import (
APIRouter,
BackgroundTasks,
Depends,
File,
Form,
Expand All @@ -19,7 +20,6 @@

from app.api.deps import CurrentUser, SessionDep
from app.core.config import settings
from app.core.graph.rag.qdrant import QdrantStore
from app.models import (
Message,
Upload,
Expand All @@ -29,11 +29,10 @@
UploadStatus,
UploadUpdate,
)
from app.tasks.tasks import add_upload, edit_upload, remove_upload

router = APIRouter()

qdrant_store = QdrantStore()


async def valid_content_length(
content_length: int = Header(..., le=settings.MAX_UPLOAD_SIZE),
Expand Down Expand Up @@ -73,36 +72,22 @@ def save_file_if_within_size_limit(file: UploadFile, file_size: int) -> IO[bytes
return temp


def process_add(
file_path: str,
upload_id: int,
user_id: int,
chunk_size: int,
chunk_overlap: int,
update_status_callback: Callable[[UploadStatus], None],
) -> None:
try:
qdrant_store.add(file_path, upload_id, user_id, chunk_size, chunk_overlap)
update_status_callback(UploadStatus.COMPLETED)
except Exception as e:
update_status_callback(UploadStatus.FAILED)
raise e
def move_upload_to_shared_folder(filename: str, temp_file_dir: str) -> str:
"""
Move an uploaded file to a shared folder with a unique name and set its permissions.

Args:
filename (str): The original name of the uploaded file.
temp_file_dir (str): The directory of the temporary file.

def process_update(
file_path: str,
upload_id: int,
user_id: int,
chunk_size: int,
chunk_overlap: int,
update_status_callback: Callable[[UploadStatus], None],
) -> None:
try:
qdrant_store.update(file_path, upload_id, user_id, chunk_size, chunk_overlap)
update_status_callback(UploadStatus.COMPLETED)
except Exception as e:
update_status_callback(UploadStatus.FAILED)
raise e
Returns:
str: The new file path in the shared folder.
"""
file_name = f"{uuid.uuid4()}-{filename}"
file_path = f"/app/upload-data/{file_name}"
shutil.move(temp_file_dir, file_path)
os.chmod(file_path, 0o775)
return file_path


@router.get("/", response_model=UploadsOut)
Expand Down Expand Up @@ -136,7 +121,6 @@ def read_uploads(
@router.post("/", response_model=UploadOut)
def create_upload(
session: SessionDep,
background_tasks: BackgroundTasks,
current_user: CurrentUser,
name: Annotated[str, Form()],
description: Annotated[str, Form()],
Expand Down Expand Up @@ -164,19 +148,12 @@ def create_upload(
status_code=500, detail="Failed to retrieve user and upload ID"
)

def update_status_callback(status: UploadStatus) -> None:
upload.status = status
session.add(upload)
session.commit()

background_tasks.add_task(
process_add,
temp_file.name,
upload.id,
current_user.id,
chunk_size,
chunk_overlap,
update_status_callback,
if not file.filename or not isinstance(temp_file.name, str):
raise HTTPException(status_code=500, detail="Failed to upload file")

file_path = move_upload_to_shared_folder(file.filename, temp_file.name)
add_upload.delay(
file_path, upload.id, current_user.id, chunk_size, chunk_overlap
)
except Exception as e:
session.delete(upload)
Expand All @@ -189,7 +166,6 @@ def update_status_callback(status: UploadStatus) -> None:
@router.put("/{id}", response_model=UploadOut)
def update_upload(
session: SessionDep,
background_tasks: BackgroundTasks,
current_user: CurrentUser,
id: int,
name: str | None = Form(None),
Expand Down Expand Up @@ -234,20 +210,11 @@ def update_upload(
session.add(upload)
session.commit()

def update_status_callback(status: UploadStatus) -> None:
upload.status = status
session.add(upload)
session.commit()

background_tasks.add_task(
process_update,
temp_file.name,
id,
upload.owner_id,
chunk_size,
chunk_overlap,
update_status_callback,
)
if not file.filename or not isinstance(temp_file.name, str):
raise HTTPException(status_code=500, detail="Failed to upload file")

file_path = move_upload_to_shared_folder(file.filename, temp_file.name)
edit_upload.delay(file_path, id, upload.owner_id, chunk_size, chunk_overlap)

session.commit()
session.refresh(upload)
Expand All @@ -261,13 +228,16 @@ def delete_upload(session: SessionDep, current_user: CurrentUser, id: int) -> Me
raise HTTPException(status_code=404, detail="Upload not found")
if not current_user.is_superuser and upload.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Not enough permissions")

try:
session.delete(upload)
# Set upload status to in progress
upload.status = UploadStatus.IN_PROGRESS
session.add(upload)
session.commit()

if upload.owner_id is None:
raise HTTPException(status_code=500, detail="Failed to retrieve owner ID")
qdrant_store.delete(id, upload.owner_id)
session.commit()

remove_upload.delay(id, upload.owner_id)
except Exception as e:
session.rollback()
raise HTTPException(status_code=500, detail="Failed to delete upload") from e
Expand Down
14 changes: 14 additions & 0 deletions backend/app/core/celery_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from celery import Celery

from app.core.config import settings

celery_app = Celery(
"worker",
broker=settings.CELERY_BROKER_URL,
backend=settings.CELERY_RESULT_BACKEND,
include=["app.tasks.tasks"],
)

celery_app.conf.update(
result_expires=3600,
)
7 changes: 6 additions & 1 deletion backend/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,17 @@ def _enforce_non_default_secrets(self) -> Self:

# Qdrant
QDRANT__SERVICE__API_KEY: str
QDRANT_URL: str = "http://qdrant:6333"
QDRANT_URL: str = "http://qdrant:6334"
QDRANT_COLLECTION: str = "uploads"

# Celery
CELERY_BROKER_URL: str
CELERY_RESULT_BACKEND: str

# Embeddings
DENSE_EMBEDDING_MODEL: str
SPARSE_EMBEDDING_MODEL: str
FASTEMBED_CACHE_PATH: str

MAX_UPLOAD_SIZE: int = 50_000_000

Expand Down
27 changes: 17 additions & 10 deletions backend/app/core/graph/rag/qdrant.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from collections.abc import Callable
from typing import Any

import pymupdf4llm # type: ignore[import-untyped]
import pymupdf # type: ignore[import-untyped]
from langchain_core.documents import Document
from langchain_text_splitters import MarkdownTextSplitter
from langchain_text_splitters import RecursiveCharacterTextSplitter
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest

Expand Down Expand Up @@ -40,14 +40,19 @@ def add(
chunk_size (int, optional): The size of each text chunk. Defaults to 500.
chunk_overlap (int, optional): The overlap size between chunks. Defaults to 50.
"""
md_text = pymupdf4llm.to_markdown(file_path)
text_spliter = MarkdownTextSplitter(
chunk_size=chunk_size, chunk_overlap=chunk_overlap
)
docs = text_spliter.create_documents(
[md_text],
[{"user_id": user_id, "upload_id": upload_id}],
doc = pymupdf.open(file_path)
documents = [
Document(
page_content=page.get_text().encode("utf8"),
metadata={"user_id": user_id, "upload_id": upload_id},
)
for page in doc
]
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
docs = text_splitter.split_documents(documents)

doc_texts: list[str] = []
metadata: list[dict[Any, Any]] = []
Expand All @@ -72,7 +77,9 @@ def _create_collection(self) -> QdrantClient:
Returns:
QdrantClient: An instance of the Qdrant client.
"""
client = QdrantClient(url=self.url, api_key=settings.QDRANT__SERVICE__API_KEY)
client = QdrantClient(
url=self.url, api_key=settings.QDRANT__SERVICE__API_KEY, prefer_grpc=True
)
client.set_model(settings.DENSE_EMBEDDING_MODEL)
client.set_sparse_model(settings.SPARSE_EMBEDDING_MODEL)
if not client.collection_exists(self.collection_name):
Expand Down
70 changes: 70 additions & 0 deletions backend/app/tasks/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import os

from sqlmodel import Session

from app.core.celery_app import celery_app
from app.core.db import engine
from app.core.graph.rag.qdrant import QdrantStore
from app.models import Upload, UploadStatus


@celery_app.task
def add_upload(
file_path: str, upload_id: int, user_id: int, chunk_size: int, chunk_overlap: int
) -> None:
with Session(engine) as session:
upload = session.get(Upload, upload_id)
if not upload:
raise ValueError("Upload not found")
try:
QdrantStore().add(file_path, upload_id, user_id, chunk_size, chunk_overlap)
upload.status = UploadStatus.COMPLETED
session.add(upload)
session.commit()
except Exception as e:
print(f"add_upload failed: {e}")
upload.status = UploadStatus.FAILED
session.add(upload)
session.commit()
finally:
if os.path.exists(file_path):
os.remove(file_path)


@celery_app.task
def edit_upload(
file_path: str, upload_id: int, user_id: int, chunk_size: int, chunk_overlap: int
) -> None:
with Session(engine) as session:
upload = session.get(Upload, upload_id)
if not upload:
raise ValueError("Upload not found")
try:
QdrantStore().update(
file_path, upload_id, user_id, chunk_size, chunk_overlap
)
upload.status = UploadStatus.COMPLETED
session.add(upload)
session.commit()
except Exception as e:
print(f"edit_upload failed: {e}")
upload.status = UploadStatus.FAILED
session.add(upload)
session.commit()
finally:
if os.path.exists(file_path):
os.remove(file_path)


@celery_app.task
def remove_upload(upload_id: int, user_id: int) -> None:
with Session(engine) as session:
upload = session.get(Upload, upload_id)
if not upload:
raise ValueError("Upload not found")
try:
QdrantStore().delete(upload_id, user_id)
session.delete(upload)
session.commit()
except Exception as e:
print(f"remove_upload failed: {e}")
Loading