Skip to content

Commit d21bb6c

Browse files
JkaviaJanardan S Kaviaautofix-ci[bot]
authored
fix: Generate job response from vertex builds history by job id. (#11457)
* feat: reconstruct workflow execution response from vertex_build by job_id * [autofix.ci] apply automated fixes * fix: use correct attribute name 'id' instead of 'vertex_id' in VertexBuildTable * Updated the GET endpoint to return WorkflowExecutionResponse --------- Co-authored-by: Janardan S Kavia <[email protected]> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
1 parent b952827 commit d21bb6c

File tree

4 files changed

+268
-7
lines changed

4 files changed

+268
-7
lines changed

src/backend/base/langflow/api/v2/workflow.py

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
WorkflowStopRequest,
4040
WorkflowStopResponse,
4141
)
42-
from lfx.services.deps import get_settings_service
42+
from lfx.services.deps import get_settings_service, injectable_session_scope_readonly
4343
from pydantic_core import ValidationError as PydanticValidationError
4444
from sqlalchemy.exc import OperationalError
4545

@@ -486,9 +486,10 @@ async def execute_workflow_background(
486486
description="Get status of workflow job by job ID",
487487
)
488488
async def get_workflow_status(
489-
api_key_user: Annotated[UserRead, Depends(api_key_security)], # noqa: ARG001
489+
api_key_user: Annotated[UserRead, Depends(api_key_security)],
490490
job_id: Annotated[UUID | None, Query(description="Job ID to query")] = None,
491-
) -> list[WorkflowJobResponse] | WorkflowJobResponse:
491+
session: Annotated[object, Depends(injectable_session_scope_readonly)] = None,
492+
) -> WorkflowExecutionResponse | WorkflowJobResponse:
492493
"""Get workflow job status and results.
493494
494495
This endpoint allows clients to query job status either by:
@@ -500,6 +501,7 @@ async def get_workflow_status(
500501
Args:
501502
api_key_user: Authenticated user from API key
502503
job_id: Optional job ID to query specific job
504+
session: Database session for querying vertex builds
503505
page: Page number for pagination (default: 1)
504506
page_size: Number of results per page (default: 10, max: 100)
505507
@@ -538,10 +540,32 @@ async def get_workflow_status(
538540
},
539541
)
540542
job = await job_service.get_job_by_job_id(job_id=job_id)
541-
return WorkflowJobResponse(
542-
job_id=str(job.job_id),
543+
544+
# If job is completed, reconstruct full workflow response from vertex_builds
545+
if job.status == JobStatus.COMPLETED:
546+
from langflow.api.v2.workflow_reconstruction import reconstruct_workflow_response_from_job_id
547+
548+
# Get the flow
549+
flow = await get_flow_by_id_or_endpoint_name(str(job.flow_id), api_key_user.id)
550+
551+
# Reconstruct response from vertex_build table
552+
return await reconstruct_workflow_response_from_job_id(
553+
session=session,
554+
flow=flow,
555+
job_id=str(job.job_id),
556+
user_id=str(api_key_user.id),
557+
)
558+
559+
# If not completed, return WorkflowExecutionResponse with empty outputs
560+
# This ensures consistent response type (always WorkflowExecutionResponse) for SDK generation
561+
return WorkflowExecutionResponse(
543562
flow_id=str(job.flow_id),
563+
job_id=str(job.job_id),
544564
status=job.status,
565+
outputs={},
566+
errors=[],
567+
inputs={},
568+
metadata={},
545569
)
546570

547571
except HTTPException:
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
"""Workflow response reconstruction from vertex_build table.
2+
3+
This module reconstructs WorkflowExecutionResponse from vertex_build table data by job_id,
4+
enabling retrieval of past execution results without re-running workflows.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
from typing import TYPE_CHECKING
10+
11+
from lfx.graph.graph.base import Graph
12+
from lfx.graph.schema import ResultData, RunOutputs
13+
from lfx.schema.workflow import WorkflowExecutionRequest
14+
15+
from langflow.api.v1.schemas import RunResponse
16+
from langflow.api.v2.converters import run_response_to_workflow_response
17+
from langflow.services.database.models.vertex_builds.crud import get_vertex_builds_by_job_id
18+
19+
if TYPE_CHECKING:
20+
from sqlmodel.ext.asyncio.session import AsyncSession
21+
22+
from langflow.services.database.models.flow.model import FlowRead
23+
24+
25+
async def reconstruct_workflow_response_from_job_id(
26+
session: AsyncSession,
27+
flow: FlowRead,
28+
job_id: str,
29+
user_id: str,
30+
):
31+
"""Reconstruct WorkflowExecutionResponse from vertex_builds by job_id.
32+
33+
Args:
34+
session: Database session (readonly for performance)
35+
flow: Flow model from database
36+
job_id: Job ID to query vertex builds
37+
user_id: User ID for graph construction
38+
39+
Returns:
40+
WorkflowExecutionResponse reconstructed from vertex_build data
41+
42+
Raises:
43+
ValueError: If flow has no data or no vertex builds found for job_id
44+
"""
45+
# Validate flow data
46+
if not flow.data:
47+
msg = f"Flow {flow.id} has no data"
48+
raise ValueError(msg)
49+
50+
# Query vertex_builds by job_id
51+
vertex_builds = await get_vertex_builds_by_job_id(session, job_id)
52+
if not vertex_builds:
53+
msg = f"No vertex builds found for job_id {job_id}"
54+
raise ValueError(msg)
55+
56+
# Build graph to identify terminal nodes
57+
flow_id_str = str(flow.id)
58+
graph = Graph.from_payload(flow.data, flow_id=flow_id_str, user_id=user_id, flow_name=flow.name)
59+
terminal_node_ids = graph.get_terminal_nodes()
60+
61+
# Filter to terminal vertices with data
62+
terminal_vertex_builds = [vb for vb in vertex_builds if vb.id in terminal_node_ids and vb.data]
63+
if not terminal_vertex_builds:
64+
msg = f"No terminal vertex builds found for job_id {job_id}"
65+
raise ValueError(msg)
66+
67+
# Convert vertex_build data to RunOutputs format
68+
run_outputs_list = [RunOutputs(inputs={}, outputs=[ResultData(**vb.data)]) for vb in terminal_vertex_builds]
69+
70+
# Create RunResponse and convert to WorkflowExecutionResponse
71+
run_response = RunResponse(outputs=run_outputs_list, session_id=None)
72+
workflow_request = WorkflowExecutionRequest(flow_id=flow_id_str, inputs={})
73+
74+
return run_response_to_workflow_response(
75+
run_response=run_response,
76+
flow_id=flow_id_str,
77+
job_id=job_id,
78+
workflow_request=workflow_request,
79+
graph=graph,
80+
)

src/backend/base/langflow/services/database/models/vertex_builds/crud.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,16 +145,19 @@ async def delete_vertex_builds_by_flow_id(db: AsyncSession, flow_id: UUID) -> No
145145
await db.exec(stmt)
146146

147147

148-
async def get_vertex_builds_by_job_id(db: AsyncSession, job_id: str) -> list[VertexBuildTable]:
148+
async def get_vertex_builds_by_job_id(db: AsyncSession, job_id: str | UUID) -> list[VertexBuildTable]:
149149
"""Get all vertex builds associated with a specific job ID.
150150
151151
Args:
152152
db (AsyncSession): The database session for executing queries.
153-
job_id (str): The unique identifier of the job to get builds for.
153+
job_id (str | UUID): The unique identifier of the job to get builds for.
154154
155155
Returns:
156156
list[VertexBuildTable]: List of vertex builds, ordered chronologically by timestamp.
157157
"""
158+
if isinstance(job_id, str):
159+
job_id = UUID(job_id)
160+
158161
stmt = select(VertexBuildTable).where(VertexBuildTable.job_id == job_id).order_by(col(VertexBuildTable.timestamp))
159162

160163
builds = await db.exec(stmt)
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
"""Unit tests for workflow reconstruction from vertex_build table.
2+
3+
Test Coverage:
4+
- Successful reconstruction with terminal nodes
5+
- Reconstruction with no vertex builds found (error case)
6+
- Reconstruction with flow having no data (error case)
7+
- Reconstruction filtering to terminal nodes only
8+
"""
9+
10+
from datetime import datetime, timezone
11+
from unittest.mock import MagicMock, patch
12+
from uuid import uuid4
13+
14+
import pytest
15+
from langflow.api.v2.workflow_reconstruction import reconstruct_workflow_response_from_job_id
16+
from langflow.services.database.models.vertex_builds.model import VertexBuildTable
17+
18+
19+
class TestWorkflowReconstruction:
20+
"""Unit tests for workflow reconstruction logic."""
21+
22+
async def test_reconstruct_success_with_terminal_nodes(self):
23+
"""Test successful reconstruction filters to terminal nodes and returns response."""
24+
flow_id = uuid4()
25+
job_id = uuid4()
26+
user_id = uuid4()
27+
28+
# Mock flow
29+
mock_flow = MagicMock()
30+
mock_flow.id = flow_id
31+
mock_flow.data = {"nodes": [{"id": "node1"}, {"id": "node2"}], "edges": []}
32+
33+
# Mock vertex_builds
34+
mock_vb1 = MagicMock(spec=VertexBuildTable)
35+
mock_vb1.id = "node1"
36+
mock_vb1.data = {"outputs": {"result": "output1"}}
37+
mock_vb1.artifacts = {}
38+
mock_vb1.timestamp = datetime.now(timezone.utc)
39+
40+
mock_vb2 = MagicMock(spec=VertexBuildTable)
41+
mock_vb2.id = "node2"
42+
mock_vb2.data = {"outputs": {"result": "output2"}}
43+
mock_vb2.artifacts = {}
44+
mock_vb2.timestamp = datetime.now(timezone.utc)
45+
46+
mock_session = MagicMock()
47+
48+
with (
49+
patch("langflow.api.v2.workflow_reconstruction.get_vertex_builds_by_job_id") as mock_get_vb,
50+
patch("langflow.api.v2.workflow_reconstruction.Graph") as mock_graph_class,
51+
patch("langflow.api.v2.workflow_reconstruction.run_response_to_workflow_response") as mock_converter,
52+
):
53+
mock_get_vb.return_value = [mock_vb1, mock_vb2]
54+
55+
mock_graph = MagicMock()
56+
mock_graph.get_terminal_nodes.return_value = ["node1", "node2"]
57+
mock_graph_class.from_payload.return_value = mock_graph
58+
59+
mock_response = MagicMock()
60+
mock_response.flow_id = str(flow_id)
61+
mock_response.job_id = str(job_id)
62+
mock_converter.return_value = mock_response
63+
64+
result = await reconstruct_workflow_response_from_job_id(
65+
session=mock_session,
66+
flow=mock_flow,
67+
job_id=str(job_id),
68+
user_id=user_id,
69+
)
70+
71+
assert result.flow_id == str(flow_id)
72+
assert result.job_id == str(job_id)
73+
mock_get_vb.assert_called_once_with(mock_session, str(job_id))
74+
mock_graph.get_terminal_nodes.assert_called_once()
75+
76+
async def test_reconstruct_fails_when_no_vertex_builds(self):
77+
"""Test reconstruction raises ValueError when no vertex_builds found."""
78+
mock_flow = MagicMock()
79+
mock_flow.data = {"nodes": [{"id": "node1"}], "edges": []}
80+
mock_session = MagicMock()
81+
82+
with patch("langflow.api.v2.workflow_reconstruction.get_vertex_builds_by_job_id") as mock_get_vb:
83+
mock_get_vb.return_value = []
84+
85+
with pytest.raises(ValueError, match="No vertex builds found"):
86+
await reconstruct_workflow_response_from_job_id(
87+
session=mock_session,
88+
flow=mock_flow,
89+
job_id=str(uuid4()),
90+
user_id=uuid4(),
91+
)
92+
93+
async def test_reconstruct_fails_when_flow_has_no_data(self):
94+
"""Test reconstruction raises ValueError when flow has no data."""
95+
mock_flow = MagicMock()
96+
mock_flow.data = None
97+
mock_session = MagicMock()
98+
99+
with pytest.raises(ValueError, match="has no data"):
100+
await reconstruct_workflow_response_from_job_id(
101+
session=mock_session,
102+
flow=mock_flow,
103+
job_id=str(uuid4()),
104+
user_id=uuid4(),
105+
)
106+
107+
async def test_reconstruct_filters_to_terminal_nodes_only(self):
108+
"""Test reconstruction only includes terminal node outputs, not intermediate nodes."""
109+
flow_id = uuid4()
110+
job_id = uuid4()
111+
user_id = uuid4()
112+
113+
mock_flow = MagicMock()
114+
mock_flow.id = flow_id
115+
mock_flow.data = {"nodes": [{"id": "node1"}, {"id": "node2"}, {"id": "node3"}], "edges": []}
116+
117+
# Create vertex_builds for all 3 nodes
118+
mock_vertex_builds = []
119+
for node_id in ["node1", "node2", "node3"]:
120+
mock_vb = MagicMock(spec=VertexBuildTable)
121+
mock_vb.id = node_id
122+
mock_vb.data = {"outputs": {"result": f"output_{node_id}"}}
123+
mock_vb.artifacts = {}
124+
mock_vb.timestamp = datetime.now(timezone.utc)
125+
mock_vertex_builds.append(mock_vb)
126+
127+
mock_session = MagicMock()
128+
129+
with (
130+
patch("langflow.api.v2.workflow_reconstruction.get_vertex_builds_by_job_id") as mock_get_vb,
131+
patch("langflow.api.v2.workflow_reconstruction.Graph") as mock_graph_class,
132+
patch("langflow.api.v2.workflow_reconstruction.run_response_to_workflow_response") as mock_converter,
133+
):
134+
mock_get_vb.return_value = mock_vertex_builds
135+
136+
# Only node1 and node3 are terminal nodes (node2 is intermediate)
137+
mock_graph = MagicMock()
138+
mock_graph.get_terminal_nodes.return_value = ["node1", "node3"]
139+
mock_graph_class.from_payload.return_value = mock_graph
140+
141+
mock_response = MagicMock()
142+
mock_converter.return_value = mock_response
143+
144+
result = await reconstruct_workflow_response_from_job_id(
145+
session=mock_session,
146+
flow=mock_flow,
147+
job_id=str(job_id),
148+
user_id=user_id,
149+
)
150+
151+
assert result is not None
152+
mock_converter.assert_called_once()
153+
# Verify filtering happened by checking terminal nodes were retrieved
154+
mock_graph.get_terminal_nodes.assert_called_once()

0 commit comments

Comments
 (0)