-
Notifications
You must be signed in to change notification settings - Fork 18.5k
Enhanced GraphEngine Pause Handling #28196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Enhanced GraphEngine Pause Handling #28196
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enhances the GraphEngine pause handling system by converting the scalar pause_reason field to a list-based pause_reasons field and introducing a new pause_metadata database field to store structured information about pause events.
Key Changes:
- Converted
pause_reason(scalar) topause_reasons(list) inGraphExecutionand related classes to support multiple pause events - Added
PauseMetadatamodel withpause_metadatadatabase field toWorkflowPausefor storing pause details - Simplified discriminator patterns in
PauseReasonunion type - Updated all tests to use list-based pause reasons
Reviewed Changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
api/core/workflow/entities/pause_reason.py |
Simplified discriminator approach for PauseReason union type using Pydantic's Field(discriminator="TYPE") |
api/core/workflow/entities/workflow_pause.py |
Added PauseDetail, PauseType, and HumanInputPause/SchedulingPause models; added get_pause_details() method to WorkflowPauseEntity |
api/core/workflow/graph_engine/domain/graph_execution.py |
Changed pause_reason field to pause_reasons list; updated pause() method to append reasons |
api/core/workflow/graph_engine/graph_engine.py |
Updated pause handling logic to work with pause_reasons list; clears list on resume |
api/core/workflow/runtime/graph_runtime_state.py |
Added is_paused and pause_reasons to GraphExecutionProtocol |
api/core/workflow/graph_events/graph.py |
Changed GraphRunPausedEvent.reason to reasons list field |
api/core/workflow/graph_engine/event_management/event_manager.py |
Moved _notify_layers call outside critical section to reduce lock contention |
api/models/workflow.py |
Added PauseMetadata model and pause_metadata field to WorkflowPause table |
api/repositories/sqlalchemy_api_workflow_run_repository.py |
Implemented get_pause_details() method in _PrivateWorkflowPauseEntity |
api/tests/unit_tests/core/workflow/graph_engine/test_command_system.py |
Updated tests to use reasons list instead of scalar reason |
api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py |
Updated test data factory to create events with reasons list |
api/tests/test_containers_integration_tests/core/app/layers/test_pause_state_persist_layer.py |
Updated all test cases to use reasons list in pause events |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
351d419 to
1372cf4
Compare
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request significantly enhances the GraphEngine's pause handling by transitioning pause_reason from a single scalar value to a list of pause_reasons, and by introducing a new pause_metadata field to the WorkflowPause model. These changes allow for the capture of multiple pause events and structured metadata, greatly improving the granularity and flexibility of workflow pausing. The refactoring of Pydantic models for PauseReason is a good modernization, and the updates are consistently applied across the graph engine, persistence layer, and associated tests. The migration script correctly adds the new pause_metadata column with a sensible default. Overall, the changes are well-implemented and address the stated objectives.
api/tests/unit_tests/repositories/test_sqlalchemy_api_workflow_run_repository.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces significant enhancements to the graph engine's pause handling. The transition from a single pause_reason to a list of pause_reasons is a great improvement for capturing multiple pause events. The introduction of pause_metadata with a structured Pydantic model is also a solid design choice for storing detailed pause information. The refactoring of PauseReason to use Pydantic's built-in discriminator is a nice cleanup.
I've found one minor issue related to the new HumanInputPause model and its usage in tests, where extra fields are being passed during instantiation but are not defined in the model. My suggestion aims to align the model with its intended usage.
Overall, this is a well-executed enhancement that improves the robustness and extensibility of the workflow pause mechanism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request effectively enhances the graph engine's pause handling capabilities. The conversion of pause_reason to a list of pause_reasons is a logical improvement for capturing multiple pause events, and the changes have been applied consistently across the codebase. The introduction of the pause_metadata field with its own structured Pydantic models is a solid addition for storing detailed pause information. The refactoring of pause_reason.py to use Pydantic's built-in discriminator is a nice touch for code simplification and maintainability. The integration and unit tests have been updated thoroughly to cover the new functionality. I have one minor suggestion in a test file to improve clarity.
api/tests/unit_tests/repositories/test_sqlalchemy_api_workflow_run_repository.py
Outdated
Show resolved
Hide resolved
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This PR enhances pause handling by converting pause_reason to a list pause_reasons to support multiple pause events, and introduces a new WorkflowPauseReason model and table to store structured details about each pause. The changes are extensive and touch many parts of the graph engine and repository layers.
While the overall direction is good, the implementation is incomplete. The logic to retrieve and reconstruct pause reasons (get_pause_reasons) is missing, and there's data loss when storing SchedulingPause reasons as the message field is not persisted. Additionally, the node_id is not being saved for pause reasons. These issues should be addressed to make the feature fully functional. I've left specific comments on the relevant files.
| for reason in pause_reasons: | ||
| if isinstance(reason, HumanInputRequired): | ||
| # TODO(QuantumGhost): record node_id for `WorkflowPauseReason` | ||
| pause_reason_model = WorkflowPauseReason( | ||
| pause_id=pause_model.id, | ||
| type_=reason.TYPE, | ||
| form_id=reason.form_id, | ||
| ) | ||
| elif isinstance(reason, SchedulingPause): | ||
| pause_reason_model = WorkflowPauseReason( | ||
| pause_id=pause_model.id, | ||
| type_=reason.TYPE, | ||
| ) | ||
| else: | ||
| raise AssertionError(f"unkown reason type: {type(reason)}") | ||
|
|
||
| pause_reason_models.append(pause_reason_model) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a couple of issues in this loop that will lead to data loss or incorrect data being stored:
- For
SchedulingPause, themessagefield is not being saved. TheWorkflowPauseReasondatabase model is missing a column for it. This should be added to the model and the migration, and then saved here. - For all pause reasons, the
node_idis not being recorded, as noted by theTODO. Thenode_idof the node that triggered the pause should be passed and saved here. The database column is non-nullable.
| def get_pause_reasons(self) -> Sequence[PauseReason]: | ||
| # TODO(QuantumGhost): rebuild PauseReason from models | ||
| return [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not fully implemented and returns an empty list. This prevents consumers of WorkflowPauseEntity from accessing the pause reasons, which is a core goal of this pull request. This method should be implemented to reconstruct the PauseReason objects from the _reason_models data stored in the entity.
| # `puase_id` represents the identifier of the pause, | ||
| # correspond to the `id` field of `WorkflowPause`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid losing pause reason dataa while multiple nodes yielded `PauseRequestedEvent`.
Co-authored-by: Copilot <[email protected]>
add a linting rule to disallow import core.workflow.runtime from core.workflow.entities
- Split `pause_metadata` into a dedicated table `WorkflowPauseReason`. - Save `pause_reasons` directly in the `APIWorkflowRunRepository`.
f6b0153 to
8858fda
Compare
Summary
This PR:
pause_reasontopause_reasonsinGraphExecutionand relevant classes. Change the field from a scalar value to a list that can contain multiplePauseReasonobjects, ensuring all pause events are properly captured.pause_metadatafield to theWorkflowPausemodel to store structured information about specific pause events.Resolves #28195
Screenshots
N/A
Checklist
dev/reformat(backend) andcd web && npx lint-staged(frontend) to appease the lint gods