Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
b8cb244
feat(platform): add Human In The Loop block with review workflow
majdyz Nov 14, 2025
a27a445
feat(platform): Implement Human In The Loop block with comprehensive …
majdyz Nov 14, 2025
a82449c
fix(backend): Fix HumanInTheLoopBlock test failure due to missing gra…
majdyz Nov 14, 2025
72f757b
fix(backend): Address critical security and reliability issues in Hum…
majdyz Nov 14, 2025
807f6e2
fix(backend): Fix critical mock issue - block now uses real workflow …
majdyz Nov 14, 2025
d2597c4
feat(backend/frontend): Address critical HITL security and UX issues
majdyz Nov 14, 2025
480489c
style: apply lint fixes
majdyz Nov 14, 2025
96219aa
style: apply lint fixes to manager.py
majdyz Nov 14, 2025
a625fc0
fix(backend): optimize execution queue and fix API route conflicts
majdyz Nov 14, 2025
e0ef5e3
feat(frontend): Add maxLength validation to HITL review message textarea
majdyz Nov 14, 2025
b494f3e
fix(frontend): Include WAITING_FOR_REVIEW in active status for Activi…
majdyz Nov 14, 2025
120746a
fix(backend): Improve HITL block execution status handling
majdyz Nov 14, 2025
599582e
feat(backend): Use Literal type for HITL block status field
majdyz Nov 14, 2025
ee7edc5
refactor(backend): Move HITL database operations to data layer
majdyz Nov 14, 2025
8780290
feat(backend): Add status transition validation and improve architecture
majdyz Nov 14, 2025
59df230
remove init
majdyz Nov 14, 2025
8cade32
refactor(backend): Improve Human In The Loop block architecture and d…
majdyz Nov 14, 2025
d2e630a
refactor(backend/executor): Simplify node status update using batch p…
majdyz Nov 14, 2025
f05d480
refactor(backend/data): Inline wrapper methods in human_review.py
majdyz Nov 14, 2025
28ca485
refactor(backend): Major cleanup of human_review.py and execution.py
majdyz Nov 15, 2025
e33a362
cleanuo
majdyz Nov 15, 2025
6f17a95
refactor(backend): Replace chaotic union type with structured Pending…
majdyz Nov 15, 2025
d13a5db
refactor(backend): Clean up review model with proper from_db pattern
majdyz Nov 15, 2025
9998610
style(backend): Apply code formatting to review models
majdyz Nov 15, 2025
5dc4a0e
refactor(backend): Restructure human review data model to eliminate c…
majdyz Nov 15, 2025
6de947f
refactor(backend): Improve consistency and remove unnecessary complexity
majdyz Nov 15, 2025
d70d536
refactor(backend): Inline single-use functions
majdyz Nov 15, 2025
f8dd0c0
refactor(backend): Optimize HITL system with improved database patterns
majdyz Nov 15, 2025
9ae926f
cleanuo
majdyz Nov 15, 2025
38b080c
cleanuo
majdyz Nov 15, 2025
ea05cf4
fix(frontend): Update PendingReviewCard to match API response structure
majdyz Nov 16, 2025
48add47
fix(frontend): Update components to use regenerated API types
majdyz Nov 17, 2025
796c27e
refactor(platform): Improve API design and exception handling
majdyz Nov 17, 2025
a8fead6
fix(frontend): Fix stale closure issue in FloatingReviewsPanel
majdyz Nov 17, 2025
75a98b7
Merge branch 'dev' into feat/human-in-the-loop-block
majdyz Nov 17, 2025
4b0e971
fix(backend): Add critical security fixes for human review system
majdyz Nov 17, 2025
5dda783
fix(backend): Add missing INCOMPLETE status transition rules
majdyz Nov 17, 2025
81de4e5
feat(platform): Enhance Human-in-the-Loop execution resume functionality
majdyz Nov 17, 2025
d0ff313
Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into fe…
majdyz Nov 18, 2025
12145fa
Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into fe…
majdyz Nov 20, 2025
f600574
feat(platform): enhance human-in-the-loop functionality with real-tim…
majdyz Nov 20, 2025
55b9113
fix ci
majdyz Nov 20, 2025
9f1e27a
revert(backend): revert unintended store changes from HITL implementa…
majdyz Nov 20, 2025
23e93fc
feat(frontend): add Human-in-the-Loop block to beta blocks feature flag
majdyz Nov 20, 2025
4ce2f45
Merge branch 'dev' into feat/human-in-the-loop-block
majdyz Nov 21, 2025
39ec38f
Merge branch 'feat/human-in-the-loop-block' of github.com:Significant…
majdyz Nov 21, 2025
8fa5762
feat(platform): enhance Human-in-the-Loop block with improved review UX
majdyz Nov 21, 2025
3c71761
fix(frontend): remove unused reviewData prop from PendingReviewsList
majdyz Nov 21, 2025
0422173
refactor(frontend): critical simplification and cleanup of HITL compo…
majdyz Nov 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions autogpt_platform/backend/backend/blocks/human_in_the_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from typing import Any

from prisma.models import PendingHumanReview

from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import SchemaField
from backend.util.json import SafeJson
from backend.util.type import convert


class HumanInTheLoopBlock(Block):
"""
This block pauses execution and waits for human approval or modification of the data.
When executed, it creates a pending review entry and sets the node execution status
to WAITING_FOR_REVIEW. The execution will remain paused until a human user either:
- Approves the data (with or without modifications)
- Rejects the data
This is useful for workflows that require human validation or intervention before
proceeding to the next steps.
"""

class Input(BlockSchemaInput):
data: Any = SchemaField(description="The data to be reviewed by a human user")
message: str = SchemaField(
description="Instructions or message for the human reviewer",
default="Please review and approve or modify the following data:",
)
editable: bool = SchemaField(
description="Whether the human reviewer can edit the data",
default=True,
advanced=True,
)

class Output(BlockSchemaOutput):
reviewed_data: Any = SchemaField(
description="The data after human review (may be modified)"
)
status: str = SchemaField(
description="Status of the review: 'approved' or 'rejected'"
)
review_message: str = SchemaField(
description="Any message provided by the reviewer", default=""
)

def __init__(self):
super().__init__(
id="8b2a7b3c-6e9d-4a5f-8c1b-2e3f4a5b6c7d",
description="Pause execution and wait for human approval or modification of data",
categories={BlockCategory.BASIC},
input_schema=HumanInTheLoopBlock.Input,
output_schema=HumanInTheLoopBlock.Output,
test_input={
"data": {"name": "John Doe", "age": 30},
"message": "Please verify this user data",
"editable": True,
},
test_output=[
("reviewed_data", {"name": "John Doe", "age": 30}),
("status", "approved"),
("review_message", ""),
],
)

async def run(
self,
input_data: Input,
*,
user_id: str,
node_exec_id: str,
graph_exec_id: str,
graph_id: str,
graph_version: int,
**kwargs
) -> BlockOutput:
# Check if there's already an approved review for this node execution
existing_review = await PendingHumanReview.prisma().find_unique(
where={"nodeExecId": node_exec_id}
)

if existing_review and existing_review.status == "APPROVED":
# Return the approved data (which may have been modified by the reviewer)
# The data field now contains the approved/modified data from the review
if (
isinstance(existing_review.data, dict)
and "data" in existing_review.data
):
# Extract the actual data from the review data structure
approved_data = existing_review.data["data"]
else:
# Fallback to the stored data directly
approved_data = existing_review.data

approved_data = convert(approved_data, type(input_data.data))
yield "reviewed_data", approved_data
yield "status", "approved"
yield "review_message", existing_review.reviewMessage or ""

# Clean up the review record as it's been processed
await PendingHumanReview.prisma().delete(where={"id": existing_review.id})
return

elif existing_review and existing_review.status == "REJECTED":
# Return rejection status without data
yield "status", "rejected"
yield "review_message", existing_review.reviewMessage or ""

# Clean up the review record
await PendingHumanReview.prisma().delete(where={"id": existing_review.id})
return

# No existing approved review, create a pending review
review_data = {
"data": input_data.data,
"message": input_data.message,
"editable": input_data.editable,
}

await PendingHumanReview.prisma().upsert(
where={"nodeExecId": node_exec_id},
data={
"create": {
"userId": user_id,
"nodeExecId": node_exec_id,
"graphExecId": graph_exec_id,
"graphId": graph_id,
"graphVersion": graph_version,
"data": SafeJson(review_data),
"status": "WAITING",
},
"update": {"data": SafeJson(review_data), "status": "WAITING"},
},
)

# This will effectively pause the execution here
# The execution will be resumed when the review is approved
# The manager will detect the pending review and set the status to WAITING_FOR_REVIEW
return
36 changes: 36 additions & 0 deletions autogpt_platform/backend/backend/data/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def error_rate(self) -> float:
ExecutionStatus.INCOMPLETE,
ExecutionStatus.QUEUED,
ExecutionStatus.TERMINATED, # For resuming halted execution
ExecutionStatus.WAITING_FOR_REVIEW, # For resuming after review
],
ExecutionStatus.COMPLETED: [
ExecutionStatus.RUNNING,
Expand All @@ -115,6 +116,9 @@ def error_rate(self) -> float:
ExecutionStatus.QUEUED,
ExecutionStatus.RUNNING,
],
ExecutionStatus.WAITING_FOR_REVIEW: [
ExecutionStatus.RUNNING,
],
}


Expand Down Expand Up @@ -1002,6 +1006,38 @@ async def get_node_executions(
return res


async def get_node_executions_count(
graph_exec_id: str | None = None,
node_id: str | None = None,
block_ids: list[str] | None = None,
statuses: list[ExecutionStatus] | None = None,
created_time_gte: datetime | None = None,
created_time_lte: datetime | None = None,
) -> int:
"""
Get count of node executions with optional filters.
⚠️ No `user_id` check: DO NOT USE without check in user-facing endpoints.
"""
where_clause: AgentNodeExecutionWhereInput = {}
if graph_exec_id:
where_clause["agentGraphExecutionId"] = graph_exec_id
if node_id:
where_clause["agentNodeId"] = node_id
if block_ids:
where_clause["Node"] = {"is": {"agentBlockId": {"in": block_ids}}}
if statuses:
where_clause["OR"] = [{"executionStatus": status} for status in statuses]

if created_time_gte or created_time_lte:
where_clause["addedTime"] = {
"gte": created_time_gte or datetime.min.replace(tzinfo=timezone.utc),
"lte": created_time_lte or datetime.max.replace(tzinfo=timezone.utc),
}

count = await AgentNodeExecution.prisma().count(where=where_clause)
return count


async def get_latest_node_execution(
node_id: str, graph_eid: str
) -> NodeExecutionResult | None:
Expand Down
4 changes: 4 additions & 0 deletions autogpt_platform/backend/backend/executor/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
get_latest_node_execution,
get_node_execution,
get_node_executions,
get_node_executions_count,
set_execution_kv_data,
update_graph_execution_start_time,
update_graph_execution_stats,
Expand Down Expand Up @@ -129,6 +130,7 @@ def _(
create_graph_execution = _(create_graph_execution)
get_node_execution = _(get_node_execution)
get_node_executions = _(get_node_executions)
get_node_executions_count = _(get_node_executions_count)
get_latest_node_execution = _(get_latest_node_execution)
update_node_execution_status = _(update_node_execution_status)
update_node_execution_status_batch = _(update_node_execution_status_batch)
Expand Down Expand Up @@ -200,6 +202,7 @@ def get_service_type(cls):
get_graph_executions_count = _(d.get_graph_executions_count)
get_graph_execution_meta = _(d.get_graph_execution_meta)
get_node_executions = _(d.get_node_executions)
get_node_executions_count = _(d.get_node_executions_count)
update_node_execution_status = _(d.update_node_execution_status)
update_graph_execution_start_time = _(d.update_graph_execution_start_time)
update_graph_execution_stats = _(d.update_graph_execution_stats)
Expand Down Expand Up @@ -245,6 +248,7 @@ def get_service_type(cls):
get_node = d.get_node
get_node_execution = d.get_node_execution
get_node_executions = d.get_node_executions
get_node_executions_count = d.get_node_executions_count
get_user_by_id = d.get_user_by_id
get_user_integrations = d.get_user_integrations
upsert_execution_input = d.upsert_execution_input
Expand Down
36 changes: 34 additions & 2 deletions autogpt_platform/backend/backend/executor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,21 @@ async def persist_output(output_name: str, output_data: Any) -> None:
await persist_output(output_name, output_data)

log_metadata.info(f"Finished node execution {node_exec.node_exec_id}")
status = ExecutionStatus.COMPLETED

# Check if this node has pending reviews (Human In The Loop block)
try:
from prisma.models import PendingHumanReview

pending_review = await PendingHumanReview.prisma().find_first(
where={"nodeExecId": node_exec.node_exec_id, "status": "WAITING"}
)
if pending_review:
status = ExecutionStatus.WAITING_FOR_REVIEW
else:
status = ExecutionStatus.COMPLETED
except Exception:
# If there's any issue checking for pending reviews, default to COMPLETED
status = ExecutionStatus.COMPLETED

except BaseException as e:
stats.error = e
Expand Down Expand Up @@ -660,6 +674,16 @@ def on_graph_execution(
log_metadata.info(
f"⚙️ Graph execution #{graph_exec.graph_exec_id} is already running, continuing where it left off."
)
elif exec_meta.status == ExecutionStatus.WAITING_FOR_REVIEW:
exec_meta.status = ExecutionStatus.RUNNING
log_metadata.info(
f"⚙️ Graph execution #{graph_exec.graph_exec_id} was waiting for review, resuming execution."
)
update_graph_execution_state(
db_client=db_client,
graph_exec_id=graph_exec.graph_exec_id,
status=ExecutionStatus.RUNNING,
)
elif exec_meta.status == ExecutionStatus.FAILED:
exec_meta.status = ExecutionStatus.RUNNING
log_metadata.info(
Expand Down Expand Up @@ -1006,7 +1030,15 @@ def _on_graph_execution(
elif error is not None:
execution_status = ExecutionStatus.FAILED
else:
execution_status = ExecutionStatus.COMPLETED
# Check if there are any nodes waiting for review
waiting_nodes_count = db_client.get_node_executions_count(
graph_exec_id=graph_exec.graph_exec_id,
statuses=[ExecutionStatus.WAITING_FOR_REVIEW],
)
if waiting_nodes_count > 0:
execution_status = ExecutionStatus.WAITING_FOR_REVIEW
else:
execution_status = ExecutionStatus.COMPLETED

if error:
execution_stats.error = str(error) or type(error).__name__
Expand Down
6 changes: 6 additions & 0 deletions autogpt_platform/backend/backend/server/rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import backend.server.v2.builder
import backend.server.v2.builder.routes
import backend.server.v2.chat.routes as chat_routes
import backend.server.v2.executions.review.routes
import backend.server.v2.library.db
import backend.server.v2.library.model
import backend.server.v2.library.routes
Expand Down Expand Up @@ -286,6 +287,11 @@ async def validation_error_handler(
tags=["v2", "turnstile"],
prefix="/api/turnstile",
)
app.include_router(
backend.server.v2.executions.review.routes.router,
tags=["v2", "executions"],
prefix="/api/executions",
)

app.include_router(
backend.server.routers.postmark.postmark.router,
Expand Down
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from datetime import datetime
from typing import Any, Literal

from pydantic import BaseModel, Field


class PendingHumanReviewResponse(BaseModel):
"""Response model for pending human review."""

id: str = Field(description="Unique ID of the pending review")
user_id: str = Field(description="User ID associated with the review")
node_exec_id: str = Field(description="Node execution ID")
graph_exec_id: str = Field(description="Graph execution ID")
graph_id: str = Field(description="Graph ID")
graph_version: int = Field(description="Graph version")
data: Any = Field(description="Data waiting for review")
status: Literal["WAITING", "APPROVED", "REJECTED"] = Field(
description="Review status"
)
review_message: str | None = Field(
description="Optional message from the reviewer", default=None
)
created_at: datetime = Field(description="When the review was created")
updated_at: datetime | None = Field(
description="When the review was last updated", default=None
)
reviewed_at: datetime | None = Field(
description="When the review was completed", default=None
)


class ReviewActionRequest(BaseModel):
"""Request model for reviewing data."""

action: Literal["approve", "reject"] = Field(description="Action to take")
reviewed_data: Any | None = Field(
description="Modified data (only for approve action)", default=None
)
message: str | None = Field(
description="Optional message from the reviewer", default=None
)
Loading
Loading