Skip to content

Commit 0bc3eca

Browse files
committed
Features: job status table with related DB model, migration files, job status tracking.
Added status wrapper to handle job status updates in the DB.
1 parent 2bd1ec5 commit 0bc3eca

File tree

19 files changed

+549
-25
lines changed

19 files changed

+549
-25
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
"""Changed job_id in JobTable to be UUID instead of string
2+
3+
Revision ID: 00e5ef437a72
4+
Revises: dd3dd875cd97
5+
Create Date: 2026-01-25 21:32:44.515850
6+
7+
Phase: MIGRATE
8+
"""
9+
10+
from collections.abc import Sequence
11+
12+
import sqlalchemy as sa
13+
from alembic import op
14+
15+
# revision identifiers, used by Alembic.
16+
revision: str = "00e5ef437a72" # pragma: allowlist secret
17+
down_revision: str | None = "dd3dd875cd97" # pragma: allowlist secret
18+
branch_labels: str | Sequence[str] | None = None
19+
depends_on: str | Sequence[str] | None = None
20+
21+
22+
def upgrade() -> None:
23+
# ### commands auto generated by Alembic - please adjust! ###
24+
with op.batch_alter_table("job", schema=None) as batch_op:
25+
batch_op.alter_column("job_id", existing_type=sa.VARCHAR(), type_=sa.Uuid(), existing_nullable=False)
26+
27+
# ### end Alembic commands ###
28+
29+
30+
def downgrade() -> None:
31+
# ### commands auto generated by Alembic - please adjust! ###
32+
with op.batch_alter_table("job", schema=None) as batch_op:
33+
batch_op.alter_column("job_id", existing_type=sa.Uuid(), type_=sa.VARCHAR(), existing_nullable=False)
34+
35+
# ### end Alembic commands ###
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
"""Changed job_id in vertex_build table to be UUID instead of str.
2+
3+
Revision ID: dd3dd875cd97
4+
Revises: f7a564a956a8
5+
Create Date: 2026-01-25 20:23:02.795268
6+
7+
Phase: MIGRATE
8+
"""
9+
10+
from collections.abc import Sequence
11+
12+
import sqlalchemy as sa
13+
from alembic import op
14+
15+
# revision identifiers, used by Alembic.
16+
revision: str = "dd3dd875cd97"
17+
down_revision: str | None = "f7a564a956a8"
18+
branch_labels: str | Sequence[str] | None = None
19+
depends_on: str | Sequence[str] | None = None
20+
21+
22+
def upgrade() -> None:
23+
# ### commands auto generated by Alembic - please adjust! ###
24+
with op.batch_alter_table("vertex_build", schema=None) as batch_op:
25+
batch_op.alter_column("job_id", existing_type=sa.VARCHAR(), type_=sa.Uuid(), existing_nullable=True)
26+
27+
# ### end Alembic commands ###
28+
29+
30+
def downgrade() -> None:
31+
# ### commands auto generated by Alembic - please adjust! ###
32+
with op.batch_alter_table("vertex_build", schema=None) as batch_op:
33+
batch_op.alter_column("job_id", existing_type=sa.Uuid(), type_=sa.VARCHAR(), existing_nullable=True)
34+
35+
# ### end Alembic commands ###
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
"""Add job_id to vertex_build, create job_id status model
2+
3+
Revision ID: f7a564a956a8
4+
Revises: 182e5471b900
5+
Create Date: 2026-01-25 19:07:45.358554
6+
7+
Phase: EXPAND
8+
"""
9+
10+
from collections.abc import Sequence
11+
12+
import sqlalchemy as sa
13+
import sqlmodel
14+
from alembic import op
15+
16+
# revision identifiers, used by Alembic.
17+
revision: str = "f7a564a956a8"
18+
down_revision: str | None = "182e5471b900" # pragma: allowlist secret
19+
branch_labels: str | Sequence[str] | None = None
20+
depends_on: str | Sequence[str] | None = None
21+
22+
23+
def upgrade() -> None:
24+
# ### commands auto generated by Alembic - please adjust! ###
25+
op.create_table(
26+
"job",
27+
sa.Column("job_id", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
28+
sa.Column("flow_id", sa.Uuid(), nullable=False),
29+
sa.Column(
30+
"status",
31+
sa.Enum("queued", "in_progress", "completed", "failed", "cancelled", "timed_out", name="job_status_enum"),
32+
nullable=False,
33+
),
34+
sa.Column("created_timestamp", sa.DateTime(timezone=True), nullable=False),
35+
sa.Column("finished_timestamp", sa.DateTime(timezone=True), nullable=True),
36+
sa.PrimaryKeyConstraint("job_id"),
37+
)
38+
with op.batch_alter_table("job", schema=None) as batch_op:
39+
batch_op.create_index(batch_op.f("ix_job_flow_id"), ["flow_id"], unique=False)
40+
batch_op.create_index(batch_op.f("ix_job_job_id"), ["job_id"], unique=False)
41+
batch_op.create_index(batch_op.f("ix_job_status"), ["status"], unique=False)
42+
43+
with op.batch_alter_table("vertex_build", schema=None) as batch_op:
44+
batch_op.add_column(sa.Column("job_id", sqlmodel.sql.sqltypes.AutoString(), nullable=True))
45+
batch_op.create_index(batch_op.f("ix_vertex_build_job_id"), ["job_id"], unique=False)
46+
47+
# ### end Alembic commands ###
48+
49+
50+
def downgrade() -> None:
51+
# ### commands auto generated by Alembic - please adjust! ###
52+
with op.batch_alter_table("vertex_build", schema=None) as batch_op:
53+
batch_op.drop_index(batch_op.f("ix_vertex_build_job_id"))
54+
batch_op.drop_column("job_id")
55+
56+
with op.batch_alter_table("job", schema=None) as batch_op:
57+
batch_op.drop_index(batch_op.f("ix_job_status"))
58+
batch_op.drop_index(batch_op.f("ix_job_job_id"))
59+
batch_op.drop_index(batch_op.f("ix_job_flow_id"))
60+
61+
op.drop_table("job")
62+
# ### end Alembic commands ###

src/backend/base/langflow/api/v2/workflow.py

Lines changed: 89 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import asyncio
2525
from copy import deepcopy
2626
from typing import Annotated
27-
from uuid import uuid4
27+
from uuid import UUID, uuid4
2828

2929
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, Request, status
3030
from fastapi.responses import StreamingResponse
@@ -62,7 +62,7 @@
6262
from langflow.services.auth.utils import api_key_security
6363
from langflow.services.database.models.flow.model import FlowRead
6464
from langflow.services.database.models.user.model import UserRead
65-
from langflow.services.deps import get_task_service
65+
from langflow.services.deps import get_job_service, get_task_service
6666

6767
# Configuration constants
6868
EXECUTION_TIMEOUT = 300 # 5 minutes default timeout for sync execution
@@ -371,10 +371,13 @@ async def execute_sync_workflow(
371371
terminal_node_ids = graph.get_terminal_nodes()
372372

373373
# Execute graph - component errors are caught and returned in response body
374+
job_service = get_job_service()
374375
try:
375-
task_result, execution_session_id = await run_graph_internal(
376+
task_result, execution_session_id = await job_service.with_status_updates(
377+
job_id=UUID(job_id),
378+
flow_id=UUID(flow_id_str),
379+
run_graph_func=run_graph_internal,
376380
graph=graph,
377-
flow_id=flow_id_str,
378381
session_id=session_id,
379382
inputs=None,
380383
outputs=terminal_node_ids,
@@ -444,10 +447,21 @@ async def execute_workflow_background(
444447

445448
# Launch background task
446449
task_service = get_task_service()
450+
job_service = get_job_service()
451+
452+
# Create job synchronously to ensure it exists before background task starts
453+
# and so we can return a valid job status immediately
454+
await job_service.create_job(
455+
job_id=job_id,
456+
flow_id=flow_id_str,
457+
)
458+
447459
await task_service.fire_and_forget_task(
448-
run_graph_internal,
460+
job_service.with_status_updates,
461+
job_id=UUID(job_id),
462+
flow_id=UUID(flow_id_str),
463+
run_graph_func=run_graph_internal,
449464
graph=graph,
450-
flow_id=flow_id_str,
451465
session_id=session_id,
452466
inputs=None,
453467
outputs=terminal_node_ids,
@@ -473,35 +487,86 @@ async def execute_workflow_background(
473487
)
474488
async def get_workflow_status(
475489
api_key_user: Annotated[UserRead, Depends(api_key_security)], # noqa: ARG001
476-
job_id: Annotated[str, Query(description="Job ID to query")], # noqa: ARG001
477-
) -> WorkflowJobResponse:
478-
"""Get workflow job status and results by job ID.
490+
flow_id: Annotated[UUID | None, Query(description="Flow ID to filter jobs")] = None,
491+
job_id: Annotated[UUID | None, Query(description="Job ID to query")] = None,
492+
page: Annotated[int, Query(description="Page number (1-indexed)", ge=1)] = 1,
493+
page_size: Annotated[int, Query(description="Number of results per page", ge=1, le=100)] = 10,
494+
) -> list[WorkflowJobResponse]:
495+
"""Get workflow job status and results.
479496
480-
This endpoint allows clients to poll for the status of background workflow executions.
497+
This endpoint allows clients to query job status either by:
498+
- **flow_id**: Get all jobs for a specific flow (with pagination)
499+
- **job_id**: Get a specific job by ID
500+
501+
At least one of flow_id or job_id must be provided.
481502
482503
Args:
483504
api_key_user: Authenticated user from API key
484-
job_id: The job ID returned from a background execution request
505+
flow_id: Optional flow ID to filter jobs
506+
job_id: Optional job ID to query specific job
507+
page: Page number for pagination (default: 1)
508+
page_size: Number of results per page (default: 10, max: 100)
485509
486510
Returns:
487-
- WorkflowExecutionResponse: If job is complete or failed
488-
- StreamingResponse: If job is still running (for streaming mode)
511+
List of WorkflowJobResponse objects
489512
490513
Raises:
491514
HTTPException:
515+
- 400: Neither flow_id nor job_id provided
492516
- 403: Developer API disabled or unauthorized
493-
- 404: Job ID not found
494-
- 501: Not yet implemented
495-
496-
Note:
497-
This endpoint is not yet implemented. It will be added in a future release
498-
to support background and streaming execution modes.
517+
- 404: Job or flow not found
518+
- 500: Internal server error
499519
"""
500-
# TODO: Implement job status tracking and retrieval
501-
# - Store job metadata in database or cache
502-
# - Track execution progress and status
503-
# - Return appropriate response based on job state
504-
raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Workflow status not yet available.")
520+
# Validate that at least one filter is provided
521+
if not flow_id and not job_id:
522+
raise HTTPException(
523+
status_code=status.HTTP_400_BAD_REQUEST,
524+
detail={
525+
"error": "Missing required parameter",
526+
"code": "MISSING_PARAMETER",
527+
"message": "Either flow_id or job_id must be provided",
528+
},
529+
)
530+
531+
try:
532+
job_service = get_job_service()
533+
534+
if job_id:
535+
# Query by specific job_id (returns single job)
536+
# TODO: Implement get_job_by_id in CRUD and service
537+
raise HTTPException(
538+
status_code=status.HTTP_501_NOT_IMPLEMENTED,
539+
detail={
540+
"error": "Not implemented",
541+
"code": "NOT_IMPLEMENTED",
542+
"message": "Query by job_id not yet implemented. Use flow_id instead.",
543+
},
544+
)
545+
546+
# Query by flow_id
547+
jobs = await job_service.get_jobs_by_flow_id(flow_id=flow_id, page=page, page_size=page_size)
548+
549+
# Convert to response format
550+
return [
551+
WorkflowJobResponse(
552+
job_id=str(job.job_id),
553+
flow_id=str(job.flow_id),
554+
status=job.status,
555+
)
556+
for job in jobs
557+
]
558+
559+
except HTTPException:
560+
raise
561+
except Exception as exc:
562+
raise HTTPException(
563+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
564+
detail={
565+
"error": "Internal server error",
566+
"code": "INTERNAL_SERVER_ERROR",
567+
"message": f"Failed to retrieve job status: {exc!s}",
568+
},
569+
) from exc
505570

506571

507572
@router.post(

src/backend/base/langflow/services/database/models/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from .file import File
33
from .flow import Flow
44
from .folder import Folder
5+
from .jobs import Job
56
from .message import MessageTable
67
from .transactions import TransactionTable
78
from .user import User
@@ -12,6 +13,7 @@
1213
"File",
1314
"Flow",
1415
"Folder",
16+
"Job",
1517
"MessageTable",
1618
"TransactionTable",
1719
"User",
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .model import Job
2+
3+
__all__ = ["Job"]
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from uuid import UUID
2+
3+
from sqlmodel import select
4+
from sqlmodel.ext.asyncio.session import AsyncSession
5+
6+
from langflow.services.database.models.jobs.model import Job
7+
8+
9+
async def get_jobs_by_flow_id(db: AsyncSession, flow_id: UUID, page: int = 1, size: int = 10) -> list[Job]:
10+
"""Get jobs by flow ID with pagination.
11+
12+
Args:
13+
db: Async database session
14+
flow_id: Flow ID to filter by
15+
page: Page number (1-indexed)
16+
size: Number of results per page
17+
18+
Returns:
19+
List of Job objects
20+
"""
21+
# Handle UUID comparison for different databases:
22+
# - SQLite: stores UUIDs as strings without hyphens
23+
# - PostgreSQL: has native UUID type
24+
# We need to detect the database type and adjust the query accordingly
25+
26+
# # Get the database dialect name
27+
# dialect_name = db.bind.dialect.name if db.bind else "sqlite"
28+
29+
# if dialect_name == "sqlite":
30+
# # SQLite stores UUIDs as unhyphenated strings
31+
# from sqlalchemy import cast, String
32+
33+
# statement = (
34+
# select(Job)
35+
# .where(cast(Job.flow_id, String) == flow_id.hex)
36+
# .offset((page - 1) * size)
37+
# .limit(size)
38+
# )
39+
# else:
40+
# # PostgreSQL and other databases with native UUID support
41+
statement = select(Job).where(Job.flow_id == flow_id).offset((page - 1) * size).limit(size)
42+
43+
result = await db.exec(statement)
44+
return list(result.all())
45+
46+
47+
async def update_job_status(db: AsyncSession, job_id: str, status: str) -> Job | None:
48+
"""Update job status.
49+
50+
Args:
51+
db: Async database session
52+
job_id: Job ID to update
53+
status: New status value
54+
55+
Returns:
56+
Updated Job object or None if not found
57+
"""
58+
result = await db.exec(select(Job).where(Job.job_id == job_id))
59+
job = result.first()
60+
if job:
61+
job.status = status
62+
db.add(job)
63+
await db.commit()
64+
await db.refresh(job)
65+
return job

0 commit comments

Comments
 (0)