-
Notifications
You must be signed in to change notification settings - Fork 8.4k
Job execution status endpoint, status tracking, DB models #11438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the
WalkthroughThis PR introduces a comprehensive Job service system for tracking workflow execution lifecycle. It adds database migrations to create a job table and extend vertex_build schema, implements a service layer for job management with status tracking, integrates job tracking into workflow execution APIs, and updates logging infrastructure to capture job identifiers. Changes
Sequence Diagram(s)sequenceDiagram
participant API as Workflow API
participant JobSvc as Job Service
participant DB as Database
participant Executor as Graph Executor
API->>JobSvc: create_job(job_id, flow_id)
JobSvc->>DB: insert Job(status=QUEUED)
DB-->>JobSvc: Job created
API->>JobSvc: with_status_updates(job_id, flow_id, run_graph_func)
JobSvc->>DB: update Job(status=IN_PROGRESS)
DB-->>JobSvc: updated
JobSvc->>Executor: run_graph_func(args)
alt Execution Success
Executor-->>JobSvc: result
JobSvc->>DB: update Job(status=COMPLETED, finished_timestamp)
else Execution Failure
Executor-->>JobSvc: exception
JobSvc->>DB: update Job(status=FAILED, finished_timestamp)
end
DB-->>JobSvc: Job finalized
JobSvc-->>API: execution result (or re-raise exception)
API->>JobSvc: get_jobs_by_flow_id(flow_id, page, size)
JobSvc->>DB: query Jobs by flow_id with pagination
DB-->>JobSvc: list[Job]
JobSvc-->>API: list[WorkflowJobResponse]
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 3 | ❌ 4❌ Failed checks (1 error, 3 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
|
Codecov Report❌ Patch coverage is
❌ Your patch check has failed because the patch coverage (30.76%) is below the target coverage (40.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #11438 +/- ##
==========================================
- Coverage 34.89% 34.87% -0.02%
==========================================
Files 1420 1420
Lines 68217 68216 -1
Branches 9984 9986 +2
==========================================
- Hits 23804 23791 -13
- Misses 43179 43190 +11
- Partials 1234 1235 +1
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/backend/base/langflow/api/v2/workflow.py (1)
374-385: Add explicit job creation before callingwith_status_updatesin the sync path.The
with_status_updatesmethod's docstring claims to create a job, but the actual implementation only updates an existing job's status. The background execution path correctly callsawait job_service.create_job()before passing the job to status management, but the sync path is missing this step. Without pre-creating the job,update_job_statuswill fail to find the record and returnNone, leaving the execution untracked.Add the job creation call:
Suggested fix
# Execute graph - component errors are caught and returned in response body job_service = get_job_service() + await job_service.create_job(job_id=job_id, flow_id=flow_id_str) try: task_result, execution_session_id = await job_service.with_status_updates( job_id=UUID(job_id), flow_id=UUID(flow_id_str),
🤖 Fix all issues with AI agents
In
`@src/backend/base/langflow/alembic/versions/f7a564a956a8_add_job_id_to_vertex_build_create_job_.py`:
- Around line 50-62: The downgrade is missing removal of the PostgreSQL enum
type created in upgrade (job_status_enum); update the downgrade() to explicitly
drop the enum after dropping the "job" table by calling the corresponding
Alembic/SQLAlchemy operation to drop the enum type (mirror what was created for
job_status_enum in upgrade), ensuring the enum name (job_status_enum) is used
and that the drop occurs after op.drop_table("job") so no orphaned enum remains.
In `@src/backend/base/langflow/api/v2/workflow.py`:
- Around line 488-548: get_workflow_status currently ignores api_key_user and
calls get_jobs_by_flow_id without verifying flow ownership, allowing any
authenticated user to view any flow's jobs; before calling get_jobs_by_flow_id,
validate ownership by resolving the flow with get_flow_by_id_or_endpoint_name
(or a new service method) passing the requesting user's id and the flow_id, and
if the flow does not belong to api_key_user raise HTTP 403; alternatively update
get_jobs_by_flow_id to accept a user_id parameter and enforce filtering at the
service/CRUD layer (and return 403 on mismatch) so only jobs for flows owned by
api_key_user are returned.
In `@src/backend/base/langflow/services/database/models/jobs/crud.py`:
- Around line 41-44: The pagination query in the jobs CRUD (the select statement
assigned to statement that currently uses select(Job).where(Job.flow_id ==
flow_id)) lacks an ORDER BY, making paging non-deterministic; update the query
used in the function (the variable statement) to include an explicit ordering,
e.g. order by Job.created_timestamp descending (and include a secondary
tie-breaker like Job.id) so result = await db.exec(statement) returns consistent
pages.
- Around line 47-64: The update_job_status function's type hints are incorrect:
change its parameters to accept the actual types used upstream and in the model
by updating the signature of update_job_status(db: AsyncSession, job_id:
uuid.UUID, status: JobStatus) -> Job | None (import uuid.UUID and JobStatus from
the model/enums), keep the body logic the same but ensure the imports for uuid
and JobStatus are present at the top and the function uses Job.job_id and
JobStatus types consistently with the service layer and model.
In `@src/backend/base/langflow/services/database/models/vertex_builds/crud.py`:
- Around line 148-160: Normalize the job_id to a UUID before querying in
get_vertex_builds_by_job_id: accept UUID | str, convert string inputs with
UUID(job_id) and let invalid strings raise ValueError so the call fails fast;
use the normalized UUID in the where clause comparing VertexBuildTable.job_id
and return the query results (e.g., result.all()) so the function returns a
proper list of VertexBuildTable instances.
In `@src/backend/base/langflow/services/jobs/factory.py`:
- Around line 7-19: The create() method on JobServiceFactory must keep the same
signature as ServiceFactory to accept passed dependencies; change the method
signature to def create(self, *args, **kwargs) and forward those parameters when
instantiating the service (i.e., return JobService(*args, **kwargs)) so callers
that supply extras via the factory continue to work; reference:
JobServiceFactory.create, ServiceFactory, and JobService.
In `@src/backend/base/langflow/services/jobs/service.py`:
- Around line 18-24: The __init__ docstring erroneously documents a non-existent
parameter `database_service`; update the __init__ method's docstring in
service.py (the __init__ method that calls self.set_ready()) to either remove
the Args section entirely or replace it with a correct description of the
initializer (no parameters), ensuring the docstring matches the actual signature
of __init__.
- Around line 136-138: The current logic only calls update_job_status(job_id,
JobStatus.COMPLETED.value, finished_timestamp=True) when result is truthy,
leaving jobs IN_PROGRESS if run_graph_func returns falsy; change the flow in the
method that calls run_graph_func so update_job_status is invoked regardless of
result truthiness (i.e., remove the truthy check around result and always call
update_job_status with finished_timestamp=True before returning), keeping the
return result behavior intact (or alternatively map falsy results to FAILED if
you prefer explicit failure handling) — look for the variables/methods result,
job_id, run_graph_func, update_job_status, and JobStatus.COMPLETED.value to make
the change.
🧹 Nitpick comments (3)
src/backend/base/langflow/services/deps.py (1)
249-257: Add return type annotation for consistency.The function lacks a return type annotation, unlike other service getters in this file (e.g.,
get_queue_service() -> JobQueueService). Adding the type hint improves IDE support and documentation.♻️ Suggested fix
-def get_job_service(): +def get_job_service() -> "JobService": """Retrieves the JobService instance from the service manager. Returns: JobService: The JobService instance. """ from langflow.services.jobs.factory import JobServiceFactory return get_service(ServiceType.JOB_SERVICE, JobServiceFactory())You'll also need to add
JobServiceto theTYPE_CHECKINGimports at the top:if TYPE_CHECKING: ... from langflow.services.jobs.service import JobServicesrc/backend/base/langflow/services/database/models/jobs/crud.py (1)
21-40: Consider removing or implementing the commented-out dialect handling.This commented-out code block adds noise. If dialect-specific UUID handling is needed for SQLite compatibility, implement it; otherwise, remove the dead code.
src/backend/base/langflow/services/database/models/jobs/model.py (1)
10-16: DuplicateJobStatusenum definition.This enum is also defined in
src/lfx/src/lfx/schema/workflow.py(lines 11-19) with identical values. Consider consolidating into a single shared location to avoid maintenance burden and potential drift between the two definitions.
| def downgrade() -> None: | ||
| # ### commands auto generated by Alembic - please adjust! ### | ||
| with op.batch_alter_table("vertex_build", schema=None) as batch_op: | ||
| batch_op.drop_index(batch_op.f("ix_vertex_build_job_id")) | ||
| batch_op.drop_column("job_id") | ||
|
|
||
| with op.batch_alter_table("job", schema=None) as batch_op: | ||
| batch_op.drop_index(batch_op.f("ix_job_status")) | ||
| batch_op.drop_index(batch_op.f("ix_job_job_id")) | ||
| batch_op.drop_index(batch_op.f("ix_job_flow_id")) | ||
|
|
||
| op.drop_table("job") | ||
| # ### end Alembic commands ### |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Orphan enum type not dropped in downgrade.
The job_status_enum is created during upgrade (line 31) but op.drop_table("job") does not automatically drop the associated enum type in PostgreSQL. This can leave orphaned enum types in the database after rollback.
Proposed fix
op.drop_table("job")
+ # Drop the enum type (PostgreSQL-specific; no-op on SQLite)
+ op.execute("DROP TYPE IF EXISTS job_status_enum")
# ### end Alembic commands ###🤖 Prompt for AI Agents
In
`@src/backend/base/langflow/alembic/versions/f7a564a956a8_add_job_id_to_vertex_build_create_job_.py`
around lines 50 - 62, The downgrade is missing removal of the PostgreSQL enum
type created in upgrade (job_status_enum); update the downgrade() to explicitly
drop the enum after dropping the "job" table by calling the corresponding
Alembic/SQLAlchemy operation to drop the enum type (mirror what was created for
job_status_enum in upgrade), ensuring the enum name (job_status_enum) is used
and that the drop occurs after op.drop_table("job") so no orphaned enum remains.
| async def get_workflow_status( | ||
| api_key_user: Annotated[UserRead, Depends(api_key_security)], # noqa: ARG001 | ||
| job_id: Annotated[str, Query(description="Job ID to query")], # noqa: ARG001 | ||
| ) -> WorkflowJobResponse: | ||
| """Get workflow job status and results by job ID. | ||
| flow_id: Annotated[UUID | None, Query(description="Flow ID to filter jobs")] = None, | ||
| job_id: Annotated[UUID | None, Query(description="Job ID to query")] = None, | ||
| page: Annotated[int, Query(description="Page number (1-indexed)", ge=1)] = 1, | ||
| page_size: Annotated[int, Query(description="Number of results per page", ge=1, le=100)] = 10, | ||
| ) -> list[WorkflowJobResponse]: | ||
| """Get workflow job status and results. | ||
| This endpoint allows clients to poll for the status of background workflow executions. | ||
| This endpoint allows clients to query job status either by: | ||
| - **flow_id**: Get all jobs for a specific flow (with pagination) | ||
| - **job_id**: Get a specific job by ID | ||
| At least one of flow_id or job_id must be provided. | ||
| Args: | ||
| api_key_user: Authenticated user from API key | ||
| job_id: The job ID returned from a background execution request | ||
| flow_id: Optional flow ID to filter jobs | ||
| job_id: Optional job ID to query specific job | ||
| page: Page number for pagination (default: 1) | ||
| page_size: Number of results per page (default: 10, max: 100) | ||
| Returns: | ||
| - WorkflowExecutionResponse: If job is complete or failed | ||
| - StreamingResponse: If job is still running (for streaming mode) | ||
| List of WorkflowJobResponse objects | ||
| Raises: | ||
| HTTPException: | ||
| - 400: Neither flow_id nor job_id provided | ||
| - 403: Developer API disabled or unauthorized | ||
| - 404: Job ID not found | ||
| - 501: Not yet implemented | ||
| Note: | ||
| This endpoint is not yet implemented. It will be added in a future release | ||
| to support background and streaming execution modes. | ||
| - 404: Job or flow not found | ||
| - 500: Internal server error | ||
| """ | ||
| # TODO: Implement job status tracking and retrieval | ||
| # - Store job metadata in database or cache | ||
| # - Track execution progress and status | ||
| # - Return appropriate response based on job state | ||
| raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Workflow status not yet available.") | ||
| # Validate that at least one filter is provided | ||
| if not flow_id and not job_id: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail={ | ||
| "error": "Missing required parameter", | ||
| "code": "MISSING_PARAMETER", | ||
| "message": "Either flow_id or job_id must be provided", | ||
| }, | ||
| ) | ||
|
|
||
| try: | ||
| job_service = get_job_service() | ||
|
|
||
| if job_id: | ||
| # Query by specific job_id (returns single job) | ||
| # TODO: Implement get_job_by_id in CRUD and service | ||
| raise HTTPException( | ||
| status_code=status.HTTP_501_NOT_IMPLEMENTED, | ||
| detail={ | ||
| "error": "Not implemented", | ||
| "code": "NOT_IMPLEMENTED", | ||
| "message": "Query by job_id not yet implemented. Use flow_id instead.", | ||
| }, | ||
| ) | ||
|
|
||
| # Query by flow_id | ||
| jobs = await job_service.get_jobs_by_flow_id(flow_id=flow_id, page=page, page_size=page_size) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify whether get_jobs_by_flow_id enforces user ownership
rg -n "def get_jobs_by_flow_id" -C4 src/backend/base/langflow/services/jobs/Repository: langflow-ai/langflow
Length of output: 892
🏁 Script executed:
# Search for get_jobs_by_flow_id in CRUD layer
rg -n "get_jobs_by_flow_id" src/backend/base/langflow/services/database/models/jobs/Repository: langflow-ai/langflow
Length of output: 237
🏁 Script executed:
# Look for the job service to understand the full implementation
find src/backend/base/langflow/services/jobs -type f -name "*.py" | head -20Repository: langflow-ai/langflow
Length of output: 217
🏁 Script executed:
# Read the full implementation of the service method
cat -n src/backend/base/langflow/services/jobs/service.pyRepository: langflow-ai/langflow
Length of output: 6658
🏁 Script executed:
# Read the full CRUD implementation
cat -n src/backend/base/langflow/services/database/models/jobs/crud.pyRepository: langflow-ai/langflow
Length of output: 2482
🏁 Script executed:
# Check other endpoints in v2 to see authorization patterns
rg -n "api_key_user" src/backend/base/langflow/api/v2/ -A 5 -B 2 | head -60Repository: langflow-ai/langflow
Length of output: 4868
🏁 Script executed:
# Verify get_flow_by_id_or_endpoint_name exists and how it validates ownership
rg -n "def get_flow_by_id_or_endpoint_name" -C 8 src/backend/base/langflow/api/Repository: langflow-ai/langflow
Length of output: 46
🏁 Script executed:
# Check the full workflow.py endpoint to understand the authorization pattern
sed -n '108,180p' src/backend/base/langflow/api/v2/workflow.pyRepository: langflow-ai/langflow
Length of output: 3238
🏁 Script executed:
# Search for get_flow_by_id_or_endpoint_name in different ways
rg -n "get_flow_by_id_or_endpoint_name" src/backend/base/langflow/api/ -C 3Repository: langflow-ai/langflow
Length of output: 6709
🏁 Script executed:
# Search in the entire codebase
rg -n "def get_flow_by_id_or_endpoint_name"Repository: langflow-ai/langflow
Length of output: 221
🏁 Script executed:
# Check imports in workflow.py to find where it comes from
head -50 src/backend/base/langflow/api/v2/workflow.py | grep -E "^from|^import"Repository: langflow-ai/langflow
Length of output: 735
🏁 Script executed:
# Read the implementation of get_flow_by_id_or_endpoint_name
sed -n '399,450p' src/backend/base/langflow/helpers/flow.pyRepository: langflow-ai/langflow
Length of output: 2005
🏁 Script executed:
# Get more context around the function
sed -n '399,480p' src/backend/base/langflow/helpers/flow.pyRepository: langflow-ai/langflow
Length of output: 3382
🏁 Script executed:
# Check the get_workflow_status endpoint signature and flow_id type
sed -n '480,520p' src/backend/base/langflow/api/v2/workflow.pyRepository: langflow-ai/langflow
Length of output: 1697
🏁 Script executed:
# Check if there's a user_id field in the Flow model
rg -n "class Flow" src/backend/base/langflow/services/database/models/flow/model.py -A 30 | head -50Repository: langflow-ai/langflow
Length of output: 2828
Add explicit ownership check before querying job status by flow_id.
api_key_user is unused, and get_jobs_by_flow_id accepts no user filter. This allows any authenticated user to query job status for any flow by UUID. Add an ownership check by first verifying the user owns the flow:
# Query by flow_id
+ # Ensure caller owns the flow before accessing its jobs
+ flow = await get_flow_by_id_or_endpoint_name(str(flow_id), api_key_user.id)
+ if not flow:
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found or access denied")
jobs = await job_service.get_jobs_by_flow_id(flow_id=flow_id, page=page, page_size=page_size)Note: get_flow_by_id_or_endpoint_name only enforces user_id checks for endpoint names, not UUID lookups. You may need to pass user_id to get_jobs_by_flow_id and filter at the CRUD layer instead.
🤖 Prompt for AI Agents
In `@src/backend/base/langflow/api/v2/workflow.py` around lines 488 - 548,
get_workflow_status currently ignores api_key_user and calls get_jobs_by_flow_id
without verifying flow ownership, allowing any authenticated user to view any
flow's jobs; before calling get_jobs_by_flow_id, validate ownership by resolving
the flow with get_flow_by_id_or_endpoint_name (or a new service method) passing
the requesting user's id and the flow_id, and if the flow does not belong to
api_key_user raise HTTP 403; alternatively update get_jobs_by_flow_id to accept
a user_id parameter and enforce filtering at the service/CRUD layer (and return
403 on mismatch) so only jobs for flows owned by api_key_user are returned.
| statement = select(Job).where(Job.flow_id == flow_id).offset((page - 1) * size).limit(size) | ||
|
|
||
| result = await db.exec(statement) | ||
| return list(result.all()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing ORDER BY clause makes pagination non-deterministic.
Without an explicit ordering, the database may return rows in any order across pages, leading to duplicated or skipped records when paginating. Consider ordering by created_timestamp descending to show recent jobs first.
Proposed fix
- statement = select(Job).where(Job.flow_id == flow_id).offset((page - 1) * size).limit(size)
+ statement = (
+ select(Job)
+ .where(Job.flow_id == flow_id)
+ .order_by(Job.created_timestamp.desc())
+ .offset((page - 1) * size)
+ .limit(size)
+ )🤖 Prompt for AI Agents
In `@src/backend/base/langflow/services/database/models/jobs/crud.py` around lines
41 - 44, The pagination query in the jobs CRUD (the select statement assigned to
statement that currently uses select(Job).where(Job.flow_id == flow_id)) lacks
an ORDER BY, making paging non-deterministic; update the query used in the
function (the variable statement) to include an explicit ordering, e.g. order by
Job.created_timestamp descending (and include a secondary tie-breaker like
Job.id) so result = await db.exec(statement) returns consistent pages.
| async def update_job_status(db: AsyncSession, job_id: str, status: str) -> Job | None: | ||
| """Update job status. | ||
| Args: | ||
| db: Async database session | ||
| job_id: Job ID to update | ||
| status: New status value | ||
| Returns: | ||
| Updated Job object or None if not found | ||
| """ | ||
| result = await db.exec(select(Job).where(Job.job_id == job_id)) | ||
| job = result.first() | ||
| if job: | ||
| job.status = status | ||
| db.add(job) | ||
| await db.commit() | ||
| await db.refresh(job) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type signature inconsistent with model and caller.
The function signature declares job_id: str and status: str, but:
Job.job_idisUUIDin the model (line 20 of model.py)- The service layer converts these to
UUIDandJobStatusbefore calling (service.py lines 87-91)
This mismatch can cause confusion and potential type errors.
Proposed fix
-async def update_job_status(db: AsyncSession, job_id: str, status: str) -> Job | None:
+async def update_job_status(db: AsyncSession, job_id: UUID, status: JobStatus) -> Job | None:Also add the import at the top:
from langflow.services.database.models.jobs.model import Job
+from langflow.services.database.models.jobs.model import JobStatus🤖 Prompt for AI Agents
In `@src/backend/base/langflow/services/database/models/jobs/crud.py` around lines
47 - 64, The update_job_status function's type hints are incorrect: change its
parameters to accept the actual types used upstream and in the model by updating
the signature of update_job_status(db: AsyncSession, job_id: uuid.UUID, status:
JobStatus) -> Job | None (import uuid.UUID and JobStatus from the model/enums),
keep the body logic the same but ensure the imports for uuid and JobStatus are
present at the top and the function uses Job.job_id and JobStatus types
consistently with the service layer and model.
| async def get_vertex_builds_by_job_id(db: AsyncSession, job_id: str) -> list[VertexBuildTable]: | ||
| """Get all vertex builds associated with a specific job ID. | ||
| Args: | ||
| db (AsyncSession): The database session for executing queries. | ||
| job_id (str): The unique identifier of the job to get builds for. | ||
| Returns: | ||
| list[VertexBuildTable]: List of vertex builds, ordered chronologically by timestamp. | ||
| """ | ||
| stmt = select(VertexBuildTable).where(VertexBuildTable.job_id == job_id).order_by(col(VertexBuildTable.timestamp)) | ||
|
|
||
| builds = await db.exec(stmt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normalize job_id to UUID before querying.
The column is UUID-backed; accept UUID | str and convert to avoid type mismatches and fail fast on invalid IDs.
🛠️ Suggested fix
-async def get_vertex_builds_by_job_id(db: AsyncSession, job_id: str) -> list[VertexBuildTable]:
+async def get_vertex_builds_by_job_id(db: AsyncSession, job_id: UUID | str) -> list[VertexBuildTable]:
"""Get all vertex builds associated with a specific job ID.
Args:
db (AsyncSession): The database session for executing queries.
- job_id (str): The unique identifier of the job to get builds for.
+ job_id (UUID | str): The unique identifier of the job to get builds for.
@@
- stmt = select(VertexBuildTable).where(VertexBuildTable.job_id == job_id).order_by(col(VertexBuildTable.timestamp))
+ if isinstance(job_id, str):
+ job_id = UUID(job_id)
+ stmt = select(VertexBuildTable).where(VertexBuildTable.job_id == job_id).order_by(col(VertexBuildTable.timestamp))🤖 Prompt for AI Agents
In `@src/backend/base/langflow/services/database/models/vertex_builds/crud.py`
around lines 148 - 160, Normalize the job_id to a UUID before querying in
get_vertex_builds_by_job_id: accept UUID | str, convert string inputs with
UUID(job_id) and let invalid strings raise ValueError so the call fails fast;
use the normalized UUID in the where clause comparing VertexBuildTable.job_id
and return the query results (e.g., result.all()) so the function returns a
proper list of VertexBuildTable instances.
| class JobServiceFactory(ServiceFactory): | ||
| """Factory for creating JobService instances.""" | ||
|
|
||
| def __init__(self): | ||
| super().__init__(JobService) | ||
|
|
||
| def create(self): | ||
| """Create a JobService instance. | ||
| Returns: | ||
| JobService instance | ||
| """ | ||
| return JobService() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep create() signature compatible with ServiceFactory.
Dropping *args/**kwargs can break callers that pass dependencies through the factory.
🛠️ Suggested fix
- def create(self):
+ def create(self, *args, **kwargs):
"""Create a JobService instance.
@@
- return JobService()
+ return super().create(*args, **kwargs)🤖 Prompt for AI Agents
In `@src/backend/base/langflow/services/jobs/factory.py` around lines 7 - 19, The
create() method on JobServiceFactory must keep the same signature as
ServiceFactory to accept passed dependencies; change the method signature to def
create(self, *args, **kwargs) and forward those parameters when instantiating
the service (i.e., return JobService(*args, **kwargs)) so callers that supply
extras via the factory continue to work; reference: JobServiceFactory.create,
ServiceFactory, and JobService.
| def __init__(self): | ||
| """Initialize the job service. | ||
| Args: | ||
| database_service: Database service for database operations | ||
| """ | ||
| self.set_ready() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docstring mentions non-existent parameter.
The docstring references database_service in the Args section, but __init__ takes no parameters beyond self.
Proposed fix
def __init__(self):
- """Initialize the job service.
-
- Args:
- database_service: Database service for database operations
- """
+ """Initialize the job service."""
self.set_ready()🤖 Prompt for AI Agents
In `@src/backend/base/langflow/services/jobs/service.py` around lines 18 - 24, The
__init__ docstring erroneously documents a non-existent parameter
`database_service`; update the __init__ method's docstring in service.py (the
__init__ method that calls self.set_ready()) to either remove the Args section
entirely or replace it with a correct description of the initializer (no
parameters), ensuring the docstring matches the actual signature of __init__.
| if result: | ||
| await self.update_job_status(job_id, JobStatus.COMPLETED.value, finished_timestamp=True) | ||
| return result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Job status not updated when result is falsy.
If run_graph_func returns a falsy value (e.g., None, empty dict) without raising an exception, the job remains in IN_PROGRESS indefinitely. Consider updating to COMPLETED regardless of result truthiness, or explicitly handling this as a failure case.
Proposed fix (always complete on success)
# Execute the wrapped function
kwargs["flow_id"] = str(flow_id)
result = await run_graph_func(*args, **kwargs)
# Update to COMPLETED
- if result:
- await self.update_job_status(job_id, JobStatus.COMPLETED.value, finished_timestamp=True)
- return result
+ await self.update_job_status(job_id, JobStatus.COMPLETED.value, finished_timestamp=True)
+ return result📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if result: | |
| await self.update_job_status(job_id, JobStatus.COMPLETED.value, finished_timestamp=True) | |
| return result | |
| await self.update_job_status(job_id, JobStatus.COMPLETED.value, finished_timestamp=True) | |
| return result |
🤖 Prompt for AI Agents
In `@src/backend/base/langflow/services/jobs/service.py` around lines 136 - 138,
The current logic only calls update_job_status(job_id,
JobStatus.COMPLETED.value, finished_timestamp=True) when result is truthy,
leaving jobs IN_PROGRESS if run_graph_func returns falsy; change the flow in the
method that calls run_graph_func so update_job_status is invoked regardless of
result truthiness (i.e., remove the truthy check around result and always call
update_job_status with finished_timestamp=True before returning), keeping the
return result behavior intact (or alternatively map falsy results to FAILED if
you prefer explicit failure handling) — look for the variables/methods result,
job_id, run_graph_func, update_job_status, and JobStatus.COMPLETED.value to make
the change.
| from langflow.services.jobs.factory import JobServiceFactory | ||
|
|
||
| return get_service(ServiceType.JOB_SERVICE, JobServiceFactory()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚡️Codeflash found 215% (2.15x) speedup for get_job_service in src/backend/base/langflow/services/deps.py
⏱️ Runtime : 405 microseconds → 128 microseconds (best of 20 runs)
📝 Explanation and details
The optimization introduces function attribute caching to avoid repeatedly importing and instantiating JobServiceFactory on every call to get_job_service().
Key Optimization:
In the original code, every call to get_job_service() executes:
from langflow.services.jobs.factory import JobServiceFactory
return get_service(ServiceType.JOB_SERVICE, JobServiceFactory())This means each invocation:
- Imports the module (though Python caches module imports, the import statement still has overhead)
- Creates a new
JobServiceFactory()instance (the main performance cost)
The optimized version uses a function attribute as a singleton cache:
if not hasattr(get_job_service, '_factory'):
from langflow.services.jobs.factory import JobServiceFactory
get_job_service._factory = JobServiceFactory()
return get_service(ServiceType.JOB_SERVICE, get_job_service._factory)Why This Is Faster:
-
Factory instantiation is expensive - The line profiler shows that
JobServiceFactory()instantiation takes 37 milliseconds (18.6% of total time in the optimized version). This only happens once now instead of on every call. -
Hasattr check is cheap - After the first call, the
hasattr()check (1.58 microseconds per call) is far cheaper than creating a new factory instance. -
Import happens once - The import statement only executes on first invocation, eliminating repeated import overhead.
Performance Gains:
- 215% speedup (from 405µs to 128µs per call)
- On subsequent calls after the first, the function avoids ~37ms of factory instantiation overhead
- The optimization is particularly effective when
get_job_service()is called multiple times, as indicated by the annotated tests showing 12 hits on the cached path versus 1 hit on the initialization path
Test Case Performance:
The annotated tests demonstrate this optimization excels in scenarios where:
- Multiple calls to
get_job_service()occur in the same session (tests show 11 out of 12 calls benefit from the cache) - The function is called in loops or repeatedly during application lifecycle
- JobServiceFactory instantiation has non-trivial initialization cost
This is a clean optimization that uses function attributes (a Python idiom for function-scoped state) instead of module-level globals, avoiding namespace pollution while achieving significant performance gains through memoization.
✅ Correctness verification report:
| Test | Status |
|---|---|
| ⚙️ Existing Unit Tests | 🔘 None Found |
| 🌀 Generated Regression Tests | ✅ 12 Passed |
| ⏪ Replay Tests | 🔘 None Found |
| 🔎 Concolic Coverage Tests | 🔘 None Found |
| 📊 Tests Coverage | 100.0% |
🌀 Click to see Generated Regression Tests
import sys
import types
import pytest # used for our unit tests
from langflow.services.deps import get_job_service
from langflow.services.schema import ServiceType # noqa: E402
_langflow_services_manager = types.ModuleType("langflow.services.manager")
_lfx_services_manager = types.ModuleType("lfx.services.manager")
# Provide a JobServiceFactory class in the jobs.factory module.
class JobServiceFactory:
def __init__(self):
# simple marker attribute so tests can reliably identify instances
self._factory_marker = True
def __repr__(self):
return "<JobServiceFactory>"
# ---------------------------------------------------------------------
# Test helpers: a small, well-behaved ServiceManager implementation used
# by tests. Each test will instantiate one and assign it to
# lfx.services.manager._service_manager so that get_service uses it.
# ---------------------------------------------------------------------
class _TestServiceManager:
def __init__(self, *, are_registered=True, get_behavior=None):
"""
are_registered: boolean or callable producing boolean
get_behavior: callable(service_type, default) -> return value or raise
"""
# If callable, call it in are_factories_registered(); else use boolean
self._are_registered = are_registered
# register_factories will capture what was passed
self.registered_factories_arg = None
# get behavior callable provided by test; if None, default returns default
if get_behavior is None:
def _default_get(_service_type, default):
return default
self._get_behavior = _default_get
else:
self._get_behavior = get_behavior
# track how many times 'get' was called for optional assertions
self.get_call_count = 0
def are_factories_registered(self):
# If boolean, return it; if callable, call it to allow dynamic tests.
if callable(self._are_registered):
return self._are_registered()
return self._are_registered
def register_factories(self, factories):
# record the factories the real code asked to register
self.registered_factories_arg = factories
def get(self, service_type, default):
# increment call count and delegate to the configured behavior
self.get_call_count += 1
return self._get_behavior(service_type, default)
def test_basic_returns_existing_job_service():
# Basic scenario: Factories are already registered and the manager returns
# a pre-existing JobService instance (so default is not used).
returned_obj = object() # simple sentinel object to represent the job service
# Create a manager that reports factories are registered and returns our sentinel.
manager = _TestServiceManager(
are_registered=True,
get_behavior=lambda service_type, default: returned_obj
)
# Put this manager into the fake lfx.services.manager module so get_service will use it.
_lfx_services_manager._service_manager = manager
# Call the function under test. It should return the exact object returned by manager.get.
codeflash_output = get_job_service(); result = codeflash_output
def test_registers_factories_when_not_registered_and_returns_default_factory_instance():
# Edge scenario: Factories are not registered. The function should call
# ServiceManager.get_factories(), pass them to register_factories, and
# then call get(...) which returns the default that was passed in.
# We set get to return the default, so get_job_service should return
# an instance of JobServiceFactory created inside the function.
# Prepare a "large-ish but modest" list of factories (kept small here for clarity).
factories_list = ["f1", "f2", "f3"]
_langflow_services_manager._factories_list = factories_list
# Create a manager that reports factories are NOT registered and returns the default.
manager = _TestServiceManager(
are_registered=False,
get_behavior=lambda service_type, default: default # return the exact default passed in
)
_lfx_services_manager._service_manager = manager
# Import the JobServiceFactory class from the fake module to assert type.
from langflow.services.jobs.factory import \
JobServiceFactory as ImportedFactory
codeflash_output = get_job_service(); result = codeflash_output
def test_large_scale_registering_many_factories_performs_registration_accepting_large_lists():
# Large-scale scenario: ServiceManager.get_factories returns a large list (but within limits).
# The get_service implementation should accept and relay that list to register_factories
# without modifying it. We use 500 elements to stay under the 1000-element instruction.
large_factories = list(range(500))
_langflow_services_manager._factories_list = large_factories
# Manager reports factories are not registered so register_factories will be invoked.
# Make manager.get return a concrete "job service" sentinel.
job_service_sentinel = {"job": "service_object"}
manager = _TestServiceManager(
are_registered=False,
get_behavior=lambda service_type, default: job_service_sentinel
)
_lfx_services_manager._service_manager = manager
# Call the function under test.
codeflash_output = get_job_service(); result = codeflash_output
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
from unittest.mock import MagicMock, patch
import pytest
from langflow.services.deps import get_job_service
from langflow.services.jobs.factory import JobServiceFactory
from langflow.services.schema import ServiceType
def test_get_job_service_returns_service_instance():
"""Basic test: Verify that get_job_service returns a service instance."""
# Call the function
codeflash_output = get_job_service(); result = codeflash_output
def test_get_job_service_returns_consistent_type():
"""Basic test: Verify that get_job_service returns an object with expected properties."""
# Call the function
codeflash_output = get_job_service(); result = codeflash_output
def test_get_job_service_multiple_calls_same_session():
"""Basic test: Verify get_job_service can be called multiple times in the same session."""
# Call the function multiple times
codeflash_output = get_job_service(); result1 = codeflash_output
codeflash_output = get_job_service(); result2 = codeflash_output
def test_get_job_service_factory_creation():
"""Edge case: Verify that JobServiceFactory can be instantiated without errors."""
# Simply verify that creating a JobServiceFactory doesn't raise an exception
try:
factory = JobServiceFactory()
except Exception as e:
pytest.fail(f"JobServiceFactory instantiation failed: {e}")
def test_get_job_service_service_type_is_correct():
"""Edge case: Verify ServiceType.JOB_SERVICE is correctly referenced."""
def test_get_job_service_no_exceptions_raised():
"""Basic test: Verify that get_job_service doesn't raise exceptions under normal conditions."""
# Should not raise any exception
try:
codeflash_output = get_job_service(); result = codeflash_output
except Exception as e:
pytest.fail(f"get_job_service raised an unexpected exception: {e}")
def test_get_job_service_with_factory_instantiation_error():
"""Edge case: Test behavior when JobServiceFactory instantiation might fail."""
# Patch get_service to raise an exception
with patch('langflow.services.deps.get_service') as mock_get_service:
mock_get_service.side_effect = Exception("Service creation failed")
# Function should propagate the exception
with pytest.raises(Exception) as exc_info:
get_job_service()
def test_get_job_service_large_scale_factory_comparison():
"""Large scale test: Verify JobServiceFactory instances are created consistently."""
from langflow.services.jobs.factory import JobServiceFactory
# Create multiple factories
factories = []
for i in range(100):
try:
factory = JobServiceFactory()
factories.append(factory)
except Exception as e:
pytest.fail(f"Failed to create JobServiceFactory instance {i}: {e}")To test or edit this optimization locally git merge codeflash/optimize-pr11438-2026-01-26T03.20.56
| from langflow.services.jobs.factory import JobServiceFactory | |
| return get_service(ServiceType.JOB_SERVICE, JobServiceFactory()) | |
| if not hasattr(get_job_service, "_factory"): | |
| from langflow.services.jobs.factory import JobServiceFactory | |
| get_job_service._factory = JobServiceFactory() | |
| return get_service(ServiceType.JOB_SERVICE, get_job_service._factory) |
|
|
| from langflow.services.database.models.jobs.model import Job, JobStatus | ||
|
|
||
|
|
||
| class JobService(Service): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Add a purge mechanism to keep the Job Status table within size limits like the VertexBuild table as per max_vertex_builds_per_vertex.
3d749ea to
f3c31f4
Compare
|
|
2 similar comments
|
|
|
|
d21bb6c to
ceb4f7c
Compare
|
|
…b status tracking. Added status wrapper to handle job status updates in the DB. Added filtration using job_id to fetch job status responses. fix: Generate job response from vertex builds history by job id. (#11457) * feat: reconstruct workflow execution response from vertex_build by job_id * [autofix.ci] apply automated fixes * fix: use correct attribute name 'id' instead of 'vertex_id' in VertexBuildTable * Updated the GET endpoint to return WorkflowExecutionResponse --------- Co-authored-by: Janardan S Kavia <[email protected]> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Added /stop endpoint, task handling, job_id typing through job_service, job_status updates.
ceb4f7c to
064657c
Compare
|
|
|
|
Description
This PR matures the V2 Workflow execution infrastructure, transitioning it from a skeletal implementation to a production-ready, type-safe, and resilient system. The broad focus is on robust background job management, lifecycle persistence, and unified status tracking.
🚀 Broad Changes
1. V2 API & Workflow Lifecycle
POST /workflows/stopendpoint, providing the ability to revoke running jobs across both AnyIO (local) and Celery (distributed) backends.workflow_reconstruction.pylogic to allow background-completed jobs to rebuild full WorkflowExecutionResponse objects from persistentvertex_builddata.GET /workflowsendpoint to reliably fetch job states (QUEUED,IN_PROGRESS,COMPLETED,FAILED,CANCELLED,TIMED_OUT).2. Service & Backend Maturation
ExceptionGroupcrashes during task cancellation, significantly improving API stability when stopping jobs.3. Type Safety & Schema Standards
JobId: Introduced a unifiedJobIdtype via PydanticAnnotated, ensuring consistentUUIDvalidation across all execution boundaries.UUIDobject inputs, resolving critical initialization errors in serialization.4. Database Persistence
vertex_buildschema to explicitly link component results to their parent job_id, enabling historical execution retrieval.5. Automation & CI
nightly-build,docker-build-v2) to align with the new dependency structures and V2 architecture.Verification Checklist
/stop.UUIDobjects to responses no longer triggers.replace()attribute errors.