Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
b8cb244
feat(platform): add Human In The Loop block with review workflow
majdyz Nov 14, 2025
a27a445
feat(platform): Implement Human In The Loop block with comprehensive …
majdyz Nov 14, 2025
a82449c
fix(backend): Fix HumanInTheLoopBlock test failure due to missing gra…
majdyz Nov 14, 2025
72f757b
fix(backend): Address critical security and reliability issues in Hum…
majdyz Nov 14, 2025
807f6e2
fix(backend): Fix critical mock issue - block now uses real workflow …
majdyz Nov 14, 2025
d2597c4
feat(backend/frontend): Address critical HITL security and UX issues
majdyz Nov 14, 2025
480489c
style: apply lint fixes
majdyz Nov 14, 2025
96219aa
style: apply lint fixes to manager.py
majdyz Nov 14, 2025
a625fc0
fix(backend): optimize execution queue and fix API route conflicts
majdyz Nov 14, 2025
e0ef5e3
feat(frontend): Add maxLength validation to HITL review message textarea
majdyz Nov 14, 2025
b494f3e
fix(frontend): Include WAITING_FOR_REVIEW in active status for Activi…
majdyz Nov 14, 2025
120746a
fix(backend): Improve HITL block execution status handling
majdyz Nov 14, 2025
599582e
feat(backend): Use Literal type for HITL block status field
majdyz Nov 14, 2025
ee7edc5
refactor(backend): Move HITL database operations to data layer
majdyz Nov 14, 2025
8780290
feat(backend): Add status transition validation and improve architecture
majdyz Nov 14, 2025
59df230
remove init
majdyz Nov 14, 2025
8cade32
refactor(backend): Improve Human In The Loop block architecture and d…
majdyz Nov 14, 2025
d2e630a
refactor(backend/executor): Simplify node status update using batch p…
majdyz Nov 14, 2025
f05d480
refactor(backend/data): Inline wrapper methods in human_review.py
majdyz Nov 14, 2025
28ca485
refactor(backend): Major cleanup of human_review.py and execution.py
majdyz Nov 15, 2025
e33a362
cleanuo
majdyz Nov 15, 2025
6f17a95
refactor(backend): Replace chaotic union type with structured Pending…
majdyz Nov 15, 2025
d13a5db
refactor(backend): Clean up review model with proper from_db pattern
majdyz Nov 15, 2025
9998610
style(backend): Apply code formatting to review models
majdyz Nov 15, 2025
5dc4a0e
refactor(backend): Restructure human review data model to eliminate c…
majdyz Nov 15, 2025
6de947f
refactor(backend): Improve consistency and remove unnecessary complexity
majdyz Nov 15, 2025
d70d536
refactor(backend): Inline single-use functions
majdyz Nov 15, 2025
f8dd0c0
refactor(backend): Optimize HITL system with improved database patterns
majdyz Nov 15, 2025
9ae926f
cleanuo
majdyz Nov 15, 2025
38b080c
cleanuo
majdyz Nov 15, 2025
ea05cf4
fix(frontend): Update PendingReviewCard to match API response structure
majdyz Nov 16, 2025
48add47
fix(frontend): Update components to use regenerated API types
majdyz Nov 17, 2025
796c27e
refactor(platform): Improve API design and exception handling
majdyz Nov 17, 2025
a8fead6
fix(frontend): Fix stale closure issue in FloatingReviewsPanel
majdyz Nov 17, 2025
75a98b7
Merge branch 'dev' into feat/human-in-the-loop-block
majdyz Nov 17, 2025
4b0e971
fix(backend): Add critical security fixes for human review system
majdyz Nov 17, 2025
5dda783
fix(backend): Add missing INCOMPLETE status transition rules
majdyz Nov 17, 2025
81de4e5
feat(platform): Enhance Human-in-the-Loop execution resume functionality
majdyz Nov 17, 2025
d0ff313
Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into fe…
majdyz Nov 18, 2025
12145fa
Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into fe…
majdyz Nov 20, 2025
f600574
feat(platform): enhance human-in-the-loop functionality with real-tim…
majdyz Nov 20, 2025
55b9113
fix ci
majdyz Nov 20, 2025
9f1e27a
revert(backend): revert unintended store changes from HITL implementa…
majdyz Nov 20, 2025
23e93fc
feat(frontend): add Human-in-the-Loop block to beta blocks feature flag
majdyz Nov 20, 2025
4ce2f45
Merge branch 'dev' into feat/human-in-the-loop-block
majdyz Nov 21, 2025
39ec38f
Merge branch 'feat/human-in-the-loop-block' of github.com:Significant…
majdyz Nov 21, 2025
8fa5762
feat(platform): enhance Human-in-the-Loop block with improved review UX
majdyz Nov 21, 2025
3c71761
fix(frontend): remove unused reviewData prop from PendingReviewsList
majdyz Nov 21, 2025
0422173
refactor(frontend): critical simplification and cleanup of HITL compo…
majdyz Nov 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions autogpt_platform/backend/backend/blocks/human_in_the_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import logging
from typing import Any, Literal

from prisma.enums import ReviewStatus

from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.execution import ExecutionStatus
from backend.data.human_review import ReviewResult
from backend.data.model import SchemaField
from backend.executor.manager import async_update_node_execution_status
from backend.util.clients import get_database_manager_async_client

logger = logging.getLogger(__name__)


class HumanInTheLoopBlock(Block):
"""
This block pauses execution and waits for human approval or modification of the data.

When executed, it creates a pending review entry and sets the node execution status
to REVIEW. The execution will remain paused until a human user either:
- Approves the data (with or without modifications)
- Rejects the data

This is useful for workflows that require human validation or intervention before
proceeding to the next steps.
"""

class Input(BlockSchemaInput):
data: Any = SchemaField(description="The data to be reviewed by a human user")
name: str = SchemaField(
description="A descriptive name for what this data represents",
)
editable: bool = SchemaField(
description="Whether the human reviewer can edit the data",
default=True,
advanced=True,
)

class Output(BlockSchemaOutput):
reviewed_data: Any = SchemaField(
description="The data after human review (may be modified)"
)
status: Literal["approved", "rejected"] = SchemaField(
description="Status of the review: 'approved' or 'rejected'"
)
review_message: str = SchemaField(
description="Any message provided by the reviewer", default=""
)

def __init__(self):
super().__init__(
id="8b2a7b3c-6e9d-4a5f-8c1b-2e3f4a5b6c7d",
description="Pause execution and wait for human approval or modification of data",
categories={BlockCategory.BASIC},
input_schema=HumanInTheLoopBlock.Input,
output_schema=HumanInTheLoopBlock.Output,
test_input={
"data": {"name": "John Doe", "age": 30},
"name": "User profile data",
"editable": True,
},
test_output=[
("reviewed_data", {"name": "John Doe", "age": 30}),
("status", "approved"),
("review_message", ""),
],
test_mock={
"get_or_create_human_review": lambda *_args, **_kwargs: ReviewResult(
data={"name": "John Doe", "age": 30},
status=ReviewStatus.APPROVED,
message="",
processed=False,
node_exec_id="test-node-exec-id",
),
"update_node_execution_status": lambda *_args, **_kwargs: None,
},
)

async def run(
self,
input_data: Input,
*,
user_id: str,
node_exec_id: str,
graph_exec_id: str,
graph_id: str,
graph_version: int,
**kwargs,
) -> BlockOutput:
"""
Execute the Human In The Loop block.

This method uses one function to handle the complete workflow - checking existing reviews
and creating pending ones as needed.
"""
try:
logger.debug(f"HITL block executing for node {node_exec_id}")

# Use the data layer to handle the complete workflow
db_client = get_database_manager_async_client()
result = await db_client.get_or_create_human_review(
user_id=user_id,
node_exec_id=node_exec_id,
graph_exec_id=graph_exec_id,
graph_id=graph_id,
graph_version=graph_version,
input_data=input_data.data,
message=input_data.name,
editable=input_data.editable,
)
except Exception as e:
logger.error(f"Error in HITL block for node {node_exec_id}: {str(e)}")
raise

# Check if we're waiting for human input
if result is None:
logger.info(
f"HITL block pausing execution for node {node_exec_id} - awaiting human review"
)
try:
# Set node status to REVIEW so execution manager can't mark it as COMPLETED
# The VALID_STATUS_TRANSITIONS will then prevent any unwanted status changes
# Use the proper wrapper function to ensure websocket events are published
await async_update_node_execution_status(
db_client=db_client,
exec_id=node_exec_id,
status=ExecutionStatus.REVIEW,
)
# Execution pauses here until API routes process the review
return
except Exception as e:
logger.error(
f"Failed to update node status for HITL block {node_exec_id}: {str(e)}"
)
raise

# Review is complete (approved or rejected) - check if unprocessed
if not result.processed:
# Mark as processed before yielding
await db_client.update_review_processed_status(
node_exec_id=node_exec_id, processed=True
)

if result.status == ReviewStatus.APPROVED:
yield "status", "approved"
yield "reviewed_data", result.data
if result.message:
yield "review_message", result.message

elif result.status == ReviewStatus.REJECTED:
yield "status", "rejected"
if result.message:
yield "review_message", result.message
2 changes: 2 additions & 0 deletions autogpt_platform/backend/backend/data/credit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async def test_block_credit_usage(server: SpinTestServer):
NodeExecutionEntry(
user_id=DEFAULT_USER_ID,
graph_id="test_graph",
graph_version=1,
node_id="test_node",
graph_exec_id="test_graph_exec",
node_exec_id="test_node_exec",
Expand All @@ -94,6 +95,7 @@ async def test_block_credit_usage(server: SpinTestServer):
NodeExecutionEntry(
user_id=DEFAULT_USER_ID,
graph_id="test_graph",
graph_version=1,
node_id="test_node",
graph_exec_id="test_graph_exec",
node_exec_id="test_node_exec",
Expand Down
106 changes: 87 additions & 19 deletions autogpt_platform/backend/backend/data/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
AgentNodeExecutionKeyValueDataCreateInput,
AgentNodeExecutionUpdateInput,
AgentNodeExecutionWhereInput,
AgentNodeExecutionWhereUniqueInput,
)
from pydantic import BaseModel, ConfigDict, JsonValue, ValidationError
from pydantic.fields import Field
Expand Down Expand Up @@ -96,11 +97,14 @@ def error_rate(self) -> float:
VALID_STATUS_TRANSITIONS = {
ExecutionStatus.QUEUED: [
ExecutionStatus.INCOMPLETE,
ExecutionStatus.TERMINATED, # For resuming halted execution
ExecutionStatus.REVIEW, # For resuming after review
],
ExecutionStatus.RUNNING: [
ExecutionStatus.INCOMPLETE,
ExecutionStatus.QUEUED,
ExecutionStatus.TERMINATED, # For resuming halted execution
ExecutionStatus.REVIEW, # For resuming after review
],
ExecutionStatus.COMPLETED: [
ExecutionStatus.RUNNING,
Expand All @@ -109,11 +113,16 @@ def error_rate(self) -> float:
ExecutionStatus.INCOMPLETE,
ExecutionStatus.QUEUED,
ExecutionStatus.RUNNING,
ExecutionStatus.REVIEW,
],
ExecutionStatus.TERMINATED: [
ExecutionStatus.INCOMPLETE,
ExecutionStatus.QUEUED,
ExecutionStatus.RUNNING,
ExecutionStatus.REVIEW,
],
ExecutionStatus.REVIEW: [
ExecutionStatus.RUNNING,
],
}

Expand Down Expand Up @@ -446,6 +455,7 @@ def to_node_execution_entry(
user_id=self.user_id,
graph_exec_id=self.graph_exec_id,
graph_id=self.graph_id,
graph_version=self.graph_version,
node_exec_id=self.node_exec_id,
node_id=self.node_id,
block_id=self.block_id,
Expand Down Expand Up @@ -728,7 +738,7 @@ async def upsert_execution_input(
input_name: str,
input_data: JsonValue,
node_exec_id: str | None = None,
) -> tuple[str, BlockInput]:
) -> tuple[NodeExecutionResult, BlockInput]:
"""
Insert AgentNodeExecutionInputOutput record for as one of AgentNodeExecution.Input.
If there is no AgentNodeExecution that has no `input_name` as input, create new one.
Expand Down Expand Up @@ -761,7 +771,7 @@ async def upsert_execution_input(
existing_execution = await AgentNodeExecution.prisma().find_first(
where=existing_exec_query_filter,
order={"addedTime": "asc"},
include={"Input": True},
include={"Input": True, "GraphExecution": True},
)
json_input_data = SafeJson(input_data)

Expand All @@ -773,7 +783,7 @@ async def upsert_execution_input(
referencedByInputExecId=existing_execution.id,
)
)
return existing_execution.id, {
return NodeExecutionResult.from_db(existing_execution), {
**{
input_data.name: type_utils.convert(input_data.data, JsonValue)
for input_data in existing_execution.Input or []
Expand All @@ -788,9 +798,10 @@ async def upsert_execution_input(
agentGraphExecutionId=graph_exec_id,
executionStatus=ExecutionStatus.INCOMPLETE,
Input={"create": {"name": input_name, "data": json_input_data}},
)
),
include={"GraphExecution": True},
)
return result.id, {input_name: input_data}
return NodeExecutionResult.from_db(result), {input_name: input_data}

else:
raise ValueError(
Expand Down Expand Up @@ -886,9 +897,25 @@ async def update_node_execution_status_batch(
node_exec_ids: list[str],
status: ExecutionStatus,
stats: dict[str, Any] | None = None,
):
await AgentNodeExecution.prisma().update_many(
where={"id": {"in": node_exec_ids}},
) -> int:
# Validate status transitions - allowed_from should never be empty for valid statuses
allowed_from = VALID_STATUS_TRANSITIONS.get(status, [])
if not allowed_from:
raise ValueError(
f"Invalid status transition: {status} has no valid source statuses"
)

# For batch updates, we filter to only update nodes with valid current statuses
where_clause = cast(
AgentNodeExecutionWhereInput,
{
"id": {"in": node_exec_ids},
"executionStatus": {"in": [s.value for s in allowed_from]},
},
)

return await AgentNodeExecution.prisma().update_many(
where=where_clause,
data=_get_update_status_data(status, None, stats),
)

Expand All @@ -902,15 +929,32 @@ async def update_node_execution_status(
if status == ExecutionStatus.QUEUED and execution_data is None:
raise ValueError("Execution data must be provided when queuing an execution.")

res = await AgentNodeExecution.prisma().update(
where={"id": node_exec_id},
# Validate status transitions - allowed_from should never be empty for valid statuses
allowed_from = VALID_STATUS_TRANSITIONS.get(status, [])
if not allowed_from:
raise ValueError(
f"Invalid status transition: {status} has no valid source statuses"
)

if res := await AgentNodeExecution.prisma().update(
where=cast(
AgentNodeExecutionWhereUniqueInput,
{
"id": node_exec_id,
"executionStatus": {"in": [s.value for s in allowed_from]},
},
),
data=_get_update_status_data(status, execution_data, stats),
include=EXECUTION_RESULT_INCLUDE,
)
if not res:
raise ValueError(f"Execution {node_exec_id} not found.")
):
return NodeExecutionResult.from_db(res)

if res := await AgentNodeExecution.prisma().find_unique(
where={"id": node_exec_id}, include=EXECUTION_RESULT_INCLUDE
):
return NodeExecutionResult.from_db(res)

return NodeExecutionResult.from_db(res)
raise ValueError(f"Execution {node_exec_id} not found.")


def _get_update_status_data(
Expand Down Expand Up @@ -964,17 +1008,17 @@ async def get_node_execution(node_exec_id: str) -> NodeExecutionResult | None:
return NodeExecutionResult.from_db(execution)


async def get_node_executions(
def _build_node_execution_where_clause(
graph_exec_id: str | None = None,
node_id: str | None = None,
block_ids: list[str] | None = None,
statuses: list[ExecutionStatus] | None = None,
limit: int | None = None,
created_time_gte: datetime | None = None,
created_time_lte: datetime | None = None,
include_exec_data: bool = True,
) -> list[NodeExecutionResult]:
"""⚠️ No `user_id` check: DO NOT USE without check in user-facing endpoints."""
) -> AgentNodeExecutionWhereInput:
"""
Build where clause for node execution queries.
"""
where_clause: AgentNodeExecutionWhereInput = {}
if graph_exec_id:
where_clause["agentGraphExecutionId"] = graph_exec_id
Expand All @@ -991,6 +1035,29 @@ async def get_node_executions(
"lte": created_time_lte or datetime.max.replace(tzinfo=timezone.utc),
}

return where_clause


async def get_node_executions(
graph_exec_id: str | None = None,
node_id: str | None = None,
block_ids: list[str] | None = None,
statuses: list[ExecutionStatus] | None = None,
limit: int | None = None,
created_time_gte: datetime | None = None,
created_time_lte: datetime | None = None,
include_exec_data: bool = True,
) -> list[NodeExecutionResult]:
"""⚠️ No `user_id` check: DO NOT USE without check in user-facing endpoints."""
where_clause = _build_node_execution_where_clause(
graph_exec_id=graph_exec_id,
node_id=node_id,
block_ids=block_ids,
statuses=statuses,
created_time_gte=created_time_gte,
created_time_lte=created_time_lte,
)

executions = await AgentNodeExecution.prisma().find_many(
where=where_clause,
include=(
Expand Down Expand Up @@ -1052,6 +1119,7 @@ class NodeExecutionEntry(BaseModel):
user_id: str
graph_exec_id: str
graph_id: str
graph_version: int
node_exec_id: str
node_id: str
block_id: str
Expand Down
Loading
Loading