Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use background tasks for processing uploads #60

Merged
merged 6 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Add status col to uploads table

Revision ID: eab5bf7ec514
Revises: 45e43cb617f2
Create Date: 2024-06-26 14:00:46.079456

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'eab5bf7ec514'
down_revision = '45e43cb617f2'
branch_labels = None
depends_on = None

upload_status_enum = sa.Enum('IN_PROGRESS', 'COMPLETED', name='uploadstatus')

def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
upload_status_enum.create(op.get_bind(), checkfirst=True)

op.add_column('upload', sa.Column('status', sa.Enum('IN_PROGRESS', 'COMPLETED', name='uploadstatus'), nullable=False, server_default='COMPLETED'))
op.alter_column('upload', 'status', server_default=None)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('upload', 'status')

upload_status_enum.drop(op.get_bind(), checkfirst=True)
# ### end Alembic commands ###
85 changes: 61 additions & 24 deletions backend/app/api/routes/uploads.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,18 @@
from tempfile import NamedTemporaryFile
from typing import IO, Annotated, Any

from fastapi import APIRouter, Depends, File, Form, Header, HTTPException, UploadFile
from sqlmodel import func, select
from fastapi import (
APIRouter,
BackgroundTasks,
Depends,
File,
Form,
Header,
HTTPException,
UploadFile,
)
from sqlalchemy import ColumnElement
from sqlmodel import and_, func, select
from starlette import status

from app.api.deps import CurrentUser, SessionDep
Expand All @@ -15,6 +25,7 @@
UploadCreate,
UploadOut,
UploadsOut,
UploadStatus,
UploadUpdate,
)

Expand Down Expand Up @@ -59,28 +70,34 @@ def save_file_if_within_size_limit(file: UploadFile, file_size: int) -> IO[bytes
return temp


def update_upload_status(session: SessionDep, upload: Upload) -> None:
"""Set upload status to completed"""
upload.status = UploadStatus.COMPLETED
session.add(upload)
session.commit()


@router.get("/", response_model=UploadsOut)
def read_uploads(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
session: SessionDep,
current_user: CurrentUser,
status: UploadStatus | None = None,
skip: int = 0,
limit: int = 100,
) -> Any:
"""
Retrieve uploads.
"""
if current_user.is_superuser:
count_statement = select(func.count()).select_from(Upload)
statement = select(Upload).offset(skip).limit(limit)
else:
count_statement = (
select(func.count())
.select_from(Upload)
.where(Upload.owner_id == current_user.id)
)
statement = (
select(Upload)
.where(Upload.owner_id == current_user.id)
.offset(skip)
.limit(limit)
)
filters = []
if status:
filters.append(Upload.status == status)
if not current_user.is_superuser:
filters.append(Upload.owner_id == current_user.id)

filter_conditions: ColumnElement[bool] | bool = and_(*filters) if filters else True

count_statement = select(func.count()).select_from(Upload).where(filter_conditions)
statement = select(Upload).where(filter_conditions).offset(skip).limit(limit)

count = session.exec(count_statement).one()
uploads = session.exec(statement).all()
Expand All @@ -91,6 +108,7 @@ def read_uploads(
@router.post("/", response_model=UploadOut)
def create_upload(
session: SessionDep,
background_tasks: BackgroundTasks,
current_user: CurrentUser,
name: Annotated[str, Form()],
description: Annotated[str, Form()],
Expand All @@ -106,7 +124,7 @@ def create_upload(
temp_file = save_file_if_within_size_limit(file, file_size)
upload = Upload.model_validate(
UploadCreate(name=name, description=description),
update={"owner_id": current_user.id},
update={"owner_id": current_user.id, "status": UploadStatus.IN_PROGRESS},
)
session.add(upload)
session.commit()
Expand All @@ -117,8 +135,15 @@ def create_upload(
raise HTTPException(
status_code=500, detail="Failed to retrieve user and upload ID"
)
QdrantStore().create(
temp_file.name, upload.id, current_user.id, chunk_size, chunk_overlap

background_tasks.add_task(
QdrantStore().create,
temp_file.name,
upload.id,
current_user.id,
chunk_size,
chunk_overlap,
lambda: update_upload_status(session, upload),
)
except Exception as e:
session.delete(upload)
Expand All @@ -131,6 +156,7 @@ def create_upload(
@router.put("/{id}", response_model=UploadOut)
def update_upload(
session: SessionDep,
background_tasks: BackgroundTasks,
current_user: CurrentUser,
id: int,
name: str | None = Form(None),
Expand Down Expand Up @@ -169,9 +195,20 @@ def update_upload(
status_code=400,
detail="If file is provided, chunk size and chunk overlap must be provided.",
)
QdrantStore().delete(id, upload.owner_id)
QdrantStore().create(
temp_file.name, id, upload.owner_id, chunk_size, chunk_overlap

# Set upload status to in progress
upload.status = UploadStatus.IN_PROGRESS
session.add(upload)
session.commit()

background_tasks.add_task(
QdrantStore().update,
temp_file.name,
id,
upload.owner_id,
chunk_size,
chunk_overlap,
lambda: update_upload_status(session, upload),
)

session.commit()
Expand Down
18 changes: 18 additions & 0 deletions backend/app/core/graph/rag/qdrant.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from collections.abc import Callable

import pymupdf4llm # type: ignore[import-untyped]
from langchain_community.embeddings.fastembed import FastEmbedEmbeddings
from langchain_core.documents import Document
Expand Down Expand Up @@ -25,6 +27,7 @@ def create(
user_id: int,
chunk_size: int = 500,
chunk_overlap: int = 50,
callback: Callable[[], None] | None = None,
) -> None:
"""
Uploads a PDF document to the Qdrant vector store after converting it to markdown and splitting into chunks.
Expand All @@ -51,6 +54,7 @@ def create(
collection_name=self.collection_name,
api_key=settings.QDRANT__SERVICE__API_KEY,
)
callback() if callback else None

def _get_collection(self) -> Qdrant:
"""Get instance of an existing Qdrant collection."""
Expand Down Expand Up @@ -80,6 +84,20 @@ def delete(self, upload_id: int, user_id: int) -> bool | None:
)
)

def update(
self,
file_path: str,
upload_id: int,
user_id: int,
chunk_size: int = 500,
chunk_overlap: int = 50,
callback: Callable[[], None] | None = None,
) -> None:
"""Delete and re-upload the new PDF document to the Qdrant vector store"""
self.delete(user_id, upload_id)
self.create(file_path, upload_id, user_id, chunk_size, chunk_overlap)
callback() if callback else None

def retriever(self, user_id: int, upload_id: int) -> VectorStoreRetriever:
"""
Creates a VectorStoreRetriever that retrieves results containing the specified user_id and upload_id in the metadata.
Expand Down
10 changes: 10 additions & 0 deletions backend/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
UniqueConstraint,
func,
)
from sqlalchemy import (
Enum as SQLEnum,
)
from sqlmodel import Field, Relationship, SQLModel


Expand Down Expand Up @@ -393,6 +396,11 @@ class UploadUpdate(UploadBase):
last_modified: datetime


class UploadStatus(str, Enum):
IN_PROGRESS = "In Progress"
COMPLETED = "Completed"


class Upload(UploadBase, table=True):
id: int | None = Field(default=None, primary_key=True)
owner_id: int | None = Field(default=None, foreign_key="user.id", nullable=False)
Expand All @@ -402,12 +410,14 @@ class Upload(UploadBase, table=True):
link_model=MemberUploadsLink,
)
last_modified: datetime = Field(default_factory=lambda: datetime.now())
status: UploadStatus = Field(sa_column=Column(SQLEnum(UploadStatus)))


class UploadOut(UploadBase):
id: int
name: str
last_modified: datetime
status: UploadStatus


class UploadsOut(SQLModel):
Expand Down
29 changes: 27 additions & 2 deletions backend/app/tests/api/routes/test_uploads.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from app.core.config import settings
from app.core.graph.rag.qdrant import QdrantStore
from app.models import Upload, UploadCreate
from app.models import Upload, UploadCreate, UploadStatus
from app.tests.utils.utils import random_lower_string


Expand All @@ -17,7 +17,11 @@ def create_upload(db: Session, user_id: int) -> Upload:
"owner_id": user_id,
}
upload = Upload.model_validate(
UploadCreate(**upload_data), update={"owner_id": user_id}
UploadCreate(**upload_data),
update={
"owner_id": user_id,
"status": UploadStatus.COMPLETED,
},
)
db.add(upload)
db.commit()
Expand All @@ -38,6 +42,27 @@ def test_read_uploads(
assert "data" in data


def test_read_uploads_status_filter(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
# Create in progress upload
upload = create_upload(db, 1)
upload.status = UploadStatus.IN_PROGRESS
db.add(upload)
db.commit()

response = client.get(
f"{settings.API_V1_STR}/uploads",
headers=superuser_token_headers,
params={"status": UploadStatus.IN_PROGRESS.value},
)
assert response.status_code == 200
data = response.json()
assert "count" in data
assert "data" in data
assert all(upload["status"] == UploadStatus.IN_PROGRESS for upload in data["data"])


def test_create_upload(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export type { UpdatePassword } from './models/UpdatePassword';
export type { Upload } from './models/Upload';
export type { UploadOut } from './models/UploadOut';
export type { UploadsOut } from './models/UploadsOut';
export type { UploadStatus } from './models/UploadStatus';
export type { UserCreate } from './models/UserCreate';
export type { UserCreateOpen } from './models/UserCreateOpen';
export type { UserOut } from './models/UserOut';
Expand Down Expand Up @@ -85,6 +86,7 @@ export { $UpdatePassword } from './schemas/$UpdatePassword';
export { $Upload } from './schemas/$Upload';
export { $UploadOut } from './schemas/$UploadOut';
export { $UploadsOut } from './schemas/$UploadsOut';
export { $UploadStatus } from './schemas/$UploadStatus';
export { $UserCreate } from './schemas/$UserCreate';
export { $UserCreateOpen } from './schemas/$UserCreateOpen';
export { $UserOut } from './schemas/$UserOut';
Expand Down
3 changes: 3 additions & 0 deletions frontend/src/client/models/Upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
/* tslint:disable */
/* eslint-disable */

import type { UploadStatus } from './UploadStatus';

export type Upload = {
name: string;
description: string;
id?: (number | null);
owner_id?: (number | null);
last_modified?: string;
status: UploadStatus;
};

3 changes: 3 additions & 0 deletions frontend/src/client/models/UploadOut.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
/* tslint:disable */
/* eslint-disable */

import type { UploadStatus } from './UploadStatus';

export type UploadOut = {
name: string;
description: string;
id: number;
last_modified: string;
status: UploadStatus;
};

6 changes: 6 additions & 0 deletions frontend/src/client/models/UploadStatus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/* generated using openapi-typescript-codegen -- do no edit */
/* istanbul ignore file */
/* tslint:disable */
/* eslint-disable */

export type UploadStatus = 'In Progress' | 'Completed';
4 changes: 4 additions & 0 deletions frontend/src/client/schemas/$Upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,9 @@ export const $Upload = {
type: 'string',
format: 'date-time',
},
status: {
type: 'UploadStatus',
isRequired: true,
},
},
} as const;
4 changes: 4 additions & 0 deletions frontend/src/client/schemas/$UploadOut.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,9 @@ export const $UploadOut = {
isRequired: true,
format: 'date-time',
},
status: {
type: 'UploadStatus',
isRequired: true,
},
},
} as const;
7 changes: 7 additions & 0 deletions frontend/src/client/schemas/$UploadStatus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/* generated using openapi-typescript-codegen -- do no edit */
/* istanbul ignore file */
/* tslint:disable */
/* eslint-disable */
export const $UploadStatus = {
type: 'Enum',
} as const;
Loading