Skip to content

Commit e654ed4

Browse files
authored
Merge pull request #1437 from trheyi/main
Enhance Execution Control and Error Handling in Robot Manager
2 parents 6886d15 + 387e254 commit e654ed4

File tree

17 files changed

+587
-250
lines changed

17 files changed

+587
-250
lines changed

agent/robot/api/execution.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,12 @@ func PauseExecution(ctx *types.Context, execID string) error {
130130
return err
131131
}
132132

133-
return mgr.PauseExecution(ctx, execID)
133+
if err := mgr.PauseExecution(ctx, execID); err != nil {
134+
return err
135+
}
136+
137+
// Update database status to paused
138+
return getExecutionStore().UpdateStatus(context.Background(), execID, types.ExecPaused, "")
134139
}
135140

136141
// ResumeExecution resumes a paused execution
@@ -144,7 +149,12 @@ func ResumeExecution(ctx *types.Context, execID string) error {
144149
return err
145150
}
146151

147-
return mgr.ResumeExecution(ctx, execID)
152+
if err := mgr.ResumeExecution(ctx, execID); err != nil {
153+
return err
154+
}
155+
156+
// Update database status back to running
157+
return getExecutionStore().UpdateStatus(context.Background(), execID, types.ExecRunning, "")
148158
}
149159

150160
// StopExecution stops a running execution
@@ -158,7 +168,12 @@ func StopExecution(ctx *types.Context, execID string) error {
158168
return err
159169
}
160170

161-
return mgr.StopExecution(ctx, execID)
171+
if err := mgr.StopExecution(ctx, execID); err != nil {
172+
return err
173+
}
174+
175+
// Update database status to cancelled
176+
return getExecutionStore().UpdateStatus(context.Background(), execID, types.ExecCancelled, "User cancelled")
162177
}
163178

164179
// ==================== Execution Status API ====================

agent/robot/executor/dryrun/executor.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,18 @@ func NewWithConfig(config types.DryRunConfig) *Executor {
4242
}
4343
}
4444

45-
// Execute simulates robot execution without real Agent calls
45+
// Execute simulates robot execution without real Agent calls (auto-generates ID)
4646
func (e *Executor) Execute(ctx *robottypes.Context, robot *robottypes.Robot, trigger robottypes.TriggerType, data interface{}) (*robottypes.Execution, error) {
47+
return e.ExecuteWithControl(ctx, robot, trigger, data, "", nil)
48+
}
49+
50+
// ExecuteWithID simulates robot execution with a pre-generated execution ID (no control)
51+
func (e *Executor) ExecuteWithID(ctx *robottypes.Context, robot *robottypes.Robot, trigger robottypes.TriggerType, data interface{}, execID string) (*robottypes.Execution, error) {
52+
return e.ExecuteWithControl(ctx, robot, trigger, data, execID, nil)
53+
}
54+
55+
// ExecuteWithControl simulates robot execution with execution control
56+
func (e *Executor) ExecuteWithControl(ctx *robottypes.Context, robot *robottypes.Robot, trigger robottypes.TriggerType, data interface{}, execID string, control robottypes.ExecutionControl) (*robottypes.Execution, error) {
4757
if robot == nil {
4858
return nil, fmt.Errorf("robot cannot be nil")
4959
}
@@ -54,9 +64,14 @@ func (e *Executor) Execute(ctx *robottypes.Context, robot *robottypes.Robot, tri
5464
startPhaseIndex = 1 // Skip P0
5565
}
5666

67+
// Use provided execID or generate new one
68+
if execID == "" {
69+
execID = fmt.Sprintf("dryrun_%d", time.Now().UnixNano())
70+
}
71+
5772
// Create execution record
5873
exec := &robottypes.Execution{
59-
ID: fmt.Sprintf("dryrun_%d", time.Now().UnixNano()),
74+
ID: execID,
6075
MemberID: robot.MemberID,
6176
TeamID: robot.TeamID,
6277
TriggerType: trigger,
@@ -106,6 +121,24 @@ func (e *Executor) Execute(ctx *robottypes.Context, robot *robottypes.Robot, tri
106121
// Execute phases with mock data
107122
phases := robottypes.AllPhases[startPhaseIndex:]
108123
for _, phase := range phases {
124+
// Check if cancelled
125+
select {
126+
case <-ctx.Context.Done():
127+
exec.Status = robottypes.ExecCancelled
128+
exec.Error = "execution cancelled"
129+
return exec, nil
130+
default:
131+
}
132+
133+
// Wait if paused
134+
if control != nil {
135+
if err := control.WaitIfPaused(); err != nil {
136+
exec.Status = robottypes.ExecCancelled
137+
exec.Error = "execution cancelled while paused"
138+
return exec, nil
139+
}
140+
}
141+
109142
exec.Phase = phase
110143

111144
// Phase start callback

agent/robot/executor/sandbox/executor.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,18 @@ func NewWithConfig(config types.SandboxConfig) *Executor {
4848
}
4949
}
5050

51-
// Execute runs robot execution within sandbox constraints
51+
// Execute runs robot execution within sandbox constraints (auto-generates ID)
5252
func (e *Executor) Execute(ctx *robottypes.Context, robot *robottypes.Robot, trigger robottypes.TriggerType, data interface{}) (*robottypes.Execution, error) {
53+
return e.ExecuteWithControl(ctx, robot, trigger, data, "", nil)
54+
}
55+
56+
// ExecuteWithID runs robot execution within sandbox constraints with a pre-generated execution ID (no control)
57+
func (e *Executor) ExecuteWithID(ctx *robottypes.Context, robot *robottypes.Robot, trigger robottypes.TriggerType, data interface{}, execID string) (*robottypes.Execution, error) {
58+
return e.ExecuteWithControl(ctx, robot, trigger, data, execID, nil)
59+
}
60+
61+
// ExecuteWithControl runs robot execution within sandbox constraints with execution control
62+
func (e *Executor) ExecuteWithControl(ctx *robottypes.Context, robot *robottypes.Robot, trigger robottypes.TriggerType, data interface{}, execID string, control robottypes.ExecutionControl) (*robottypes.Execution, error) {
5363
if robot == nil {
5464
return nil, fmt.Errorf("robot cannot be nil")
5565
}
@@ -67,9 +77,14 @@ func (e *Executor) Execute(ctx *robottypes.Context, robot *robottypes.Robot, tri
6777
startPhaseIndex = 1
6878
}
6979

80+
// Use provided execID or generate new one
81+
if execID == "" {
82+
execID = fmt.Sprintf("sandbox_%d", time.Now().UnixNano())
83+
}
84+
7085
// Create execution record
7186
exec := &robottypes.Execution{
72-
ID: fmt.Sprintf("sandbox_%d", time.Now().UnixNano()),
87+
ID: execID,
7388
MemberID: robot.MemberID,
7489
TeamID: robot.TeamID,
7590
TriggerType: trigger,
@@ -99,7 +114,7 @@ func (e *Executor) Execute(ctx *robottypes.Context, robot *robottypes.Robot, tri
99114
// Execute phases with sandbox constraints
100115
phases := robottypes.AllPhases[startPhaseIndex:]
101116
for _, phase := range phases {
102-
// Check timeout
117+
// Check timeout or cancellation
103118
select {
104119
case <-execCtx.Done():
105120
exec.Status = robottypes.ExecFailed
@@ -108,6 +123,15 @@ func (e *Executor) Execute(ctx *robottypes.Context, robot *robottypes.Robot, tri
108123
default:
109124
}
110125

126+
// Wait if paused
127+
if control != nil {
128+
if err := control.WaitIfPaused(); err != nil {
129+
exec.Status = robottypes.ExecCancelled
130+
exec.Error = "execution cancelled while paused"
131+
return exec, nil
132+
}
133+
}
134+
111135
exec.Phase = phase
112136

113137
if e.config.OnPhaseStart != nil {

agent/robot/executor/standard/executor.go

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,19 @@ func NewWithConfig(config types.Config) *Executor {
4545
}
4646
}
4747

48-
// Execute runs a robot through all applicable phases with real Agent calls
48+
// Execute runs a robot through all applicable phases with real Agent calls (auto-generates ID)
4949
func (e *Executor) Execute(ctx *robottypes.Context, robot *robottypes.Robot, trigger robottypes.TriggerType, data interface{}) (*robottypes.Execution, error) {
50+
return e.ExecuteWithControl(ctx, robot, trigger, data, "", nil)
51+
}
52+
53+
// ExecuteWithID runs a robot through all applicable phases with a pre-generated execution ID (no control)
54+
func (e *Executor) ExecuteWithID(ctx *robottypes.Context, robot *robottypes.Robot, trigger robottypes.TriggerType, data interface{}, execID string) (*robottypes.Execution, error) {
55+
return e.ExecuteWithControl(ctx, robot, trigger, data, execID, nil)
56+
}
57+
58+
// ExecuteWithControl runs a robot through all applicable phases with execution control
59+
// control: optional, allows pause/resume functionality during execution
60+
func (e *Executor) ExecuteWithControl(ctx *robottypes.Context, robot *robottypes.Robot, trigger robottypes.TriggerType, data interface{}, execID string, control robottypes.ExecutionControl) (*robottypes.Execution, error) {
5061
if robot == nil {
5162
return nil, fmt.Errorf("robot cannot be nil")
5263
}
@@ -57,10 +68,15 @@ func (e *Executor) Execute(ctx *robottypes.Context, robot *robottypes.Robot, tri
5768
startPhaseIndex = 1 // Skip P0 (Inspiration)
5869
}
5970

71+
// Use provided execID or generate new one
72+
if execID == "" {
73+
execID = utils.NewID()
74+
}
75+
6076
// Create execution (Job system removed, using ExecutionStore only)
6177
input := types.BuildTriggerInput(trigger, data)
6278
exec := &robottypes.Execution{
63-
ID: utils.NewID(),
79+
ID: execID,
6480
MemberID: robot.MemberID,
6581
TeamID: robot.TeamID,
6682
TriggerType: trigger,
@@ -174,7 +190,31 @@ func (e *Executor) Execute(ctx *robottypes.Context, robot *robottypes.Robot, tri
174190
// Execute phases
175191
phases := robottypes.AllPhases[startPhaseIndex:]
176192
for _, phase := range phases {
177-
if err := e.runPhase(ctx, exec, phase, data); err != nil {
193+
if err := e.runPhase(ctx, exec, phase, data, control); err != nil {
194+
// Check if execution was cancelled
195+
if err == robottypes.ErrExecutionCancelled {
196+
exec.Status = robottypes.ExecCancelled
197+
exec.Error = "execution cancelled by user"
198+
now := time.Now()
199+
exec.EndTime = &now
200+
201+
// Update UI field for cancellation with i18n
202+
e.updateUIFields(ctx, exec, "", getLocalizedMessage(locale, "cancelled"))
203+
204+
log.With(log.F{
205+
"execution_id": exec.ID,
206+
"member_id": exec.MemberID,
207+
"phase": string(phase),
208+
}).Info("Execution cancelled by user")
209+
210+
// Persist cancelled status
211+
if !e.config.SkipPersistence && e.store != nil {
212+
_ = e.store.UpdateStatus(ctx.Context, exec.ID, robottypes.ExecCancelled, "execution cancelled by user")
213+
}
214+
return exec, nil
215+
}
216+
217+
// Normal failure case
178218
exec.Status = robottypes.ExecFailed
179219
exec.Error = err.Error()
180220

@@ -228,7 +268,21 @@ func (e *Executor) Execute(ctx *robottypes.Context, robot *robottypes.Robot, tri
228268
}
229269

230270
// runPhase executes a single phase
231-
func (e *Executor) runPhase(ctx *robottypes.Context, exec *robottypes.Execution, phase robottypes.Phase, data interface{}) error {
271+
func (e *Executor) runPhase(ctx *robottypes.Context, exec *robottypes.Execution, phase robottypes.Phase, data interface{}, control robottypes.ExecutionControl) error {
272+
// Check if context is cancelled before starting this phase
273+
select {
274+
case <-ctx.Context.Done():
275+
return robottypes.ErrExecutionCancelled
276+
default:
277+
}
278+
279+
// Wait if execution is paused (blocks until resumed or cancelled)
280+
if control != nil {
281+
if err := control.WaitIfPaused(); err != nil {
282+
return err // Returns ErrExecutionCancelled if cancelled while paused
283+
}
284+
}
285+
232286
exec.Phase = phase
233287

234288
log.With(log.F{
@@ -422,6 +476,7 @@ var uiMessages = map[string]map[string]string{
422476
"sending_delivery": "Sending delivery...",
423477
"learning_from_exec": "Learning from execution...",
424478
"completed": "Completed",
479+
"cancelled": "Cancelled",
425480
"failed_prefix": "Failed at ",
426481
"task_prefix": "Task",
427482
// Phase names for failure messages
@@ -445,6 +500,7 @@ var uiMessages = map[string]map[string]string{
445500
"sending_delivery": "正在发送...",
446501
"learning_from_exec": "学习执行经验...",
447502
"completed": "已完成",
503+
"cancelled": "已取消",
448504
"failed_prefix": "失败于",
449505
"task_prefix": "任务",
450506
// Phase names for failure messages

agent/robot/executor/types/types.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,22 @@ import (
1212
// - DryRun: Plan-only mode, simulates execution without Agent calls
1313
// - Sandbox: Isolated execution with resource limits and safety controls
1414
type Executor interface {
15-
// Execute runs a robot through all applicable phases
15+
// ExecuteWithControl runs a robot through all applicable phases with execution control
1616
// ctx: Execution context with auth and logging
1717
// robot: Robot configuration and state
1818
// trigger: What triggered this execution (clock, human, event)
1919
// data: Trigger-specific data (human input, event payload, etc.)
20+
// execID: Pre-generated execution ID (empty string to auto-generate)
21+
// control: Optional execution control for pause/resume functionality
2022
// Returns: Execution record with all phase outputs
23+
ExecuteWithControl(ctx *robottypes.Context, robot *robottypes.Robot, trigger robottypes.TriggerType, data interface{}, execID string, control robottypes.ExecutionControl) (*robottypes.Execution, error)
24+
25+
// ExecuteWithID runs a robot through all applicable phases with a pre-generated execution ID
26+
// This is a convenience wrapper around ExecuteWithControl without control
27+
ExecuteWithID(ctx *robottypes.Context, robot *robottypes.Robot, trigger robottypes.TriggerType, data interface{}, execID string) (*robottypes.Execution, error)
28+
29+
// Execute runs a robot through all applicable phases (auto-generates execution ID)
30+
// This is a convenience wrapper around ExecuteWithControl
2131
Execute(ctx *robottypes.Context, robot *robottypes.Robot, trigger robottypes.TriggerType, data interface{}) (*robottypes.Execution, error)
2232

2333
// Metrics and control

agent/robot/manager/integration_concurrent_test.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -545,12 +545,22 @@ type trackingExecutor struct {
545545
}
546546

547547
func (e *trackingExecutor) Execute(ctx *types.Context, robot *types.Robot, trigger types.TriggerType, data interface{}) (*types.Execution, error) {
548+
return e.ExecuteWithControl(ctx, robot, trigger, data, "", nil)
549+
}
550+
551+
func (e *trackingExecutor) ExecuteWithID(ctx *types.Context, robot *types.Robot, trigger types.TriggerType, data interface{}, execID string) (*types.Execution, error) {
552+
return e.ExecuteWithControl(ctx, robot, trigger, data, execID, nil)
553+
}
554+
555+
func (e *trackingExecutor) ExecuteWithControl(ctx *types.Context, robot *types.Robot, trigger types.TriggerType, data interface{}, execID string, control types.ExecutionControl) (*types.Execution, error) {
548556
if robot == nil {
549557
return nil, types.ErrRobotNotFound
550558
}
551559

552-
// Use unique ID for each execution to properly track quota
553-
execID := fmt.Sprintf("exec_%d", time.Now().UnixNano())
560+
// Use provided execID or generate unique ID for each execution to properly track quota
561+
if execID == "" {
562+
execID = fmt.Sprintf("exec_%d", time.Now().UnixNano())
563+
}
554564
exec := &types.Execution{
555565
ID: execID,
556566
MemberID: robot.MemberID,
@@ -604,12 +614,22 @@ type triggerTrackingExecutor struct {
604614
}
605615

606616
func (e *triggerTrackingExecutor) Execute(ctx *types.Context, robot *types.Robot, trigger types.TriggerType, data interface{}) (*types.Execution, error) {
617+
return e.ExecuteWithControl(ctx, robot, trigger, data, "", nil)
618+
}
619+
620+
func (e *triggerTrackingExecutor) ExecuteWithID(ctx *types.Context, robot *types.Robot, trigger types.TriggerType, data interface{}, execID string) (*types.Execution, error) {
621+
return e.ExecuteWithControl(ctx, robot, trigger, data, execID, nil)
622+
}
623+
624+
func (e *triggerTrackingExecutor) ExecuteWithControl(ctx *types.Context, robot *types.Robot, trigger types.TriggerType, data interface{}, execID string, control types.ExecutionControl) (*types.Execution, error) {
607625
if robot == nil {
608626
return nil, types.ErrRobotNotFound
609627
}
610628

611-
// Use unique ID for each execution to properly track quota
612-
execID := fmt.Sprintf("exec_trigger_%s_%d", string(trigger), time.Now().UnixNano())
629+
// Use provided execID or generate unique ID for each execution to properly track quota
630+
if execID == "" {
631+
execID = fmt.Sprintf("exec_trigger_%s_%d", string(trigger), time.Now().UnixNano())
632+
}
613633
exec := &types.Execution{
614634
ID: execID,
615635
MemberID: robot.MemberID,

agent/robot/manager/integration_control_test.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,12 +495,24 @@ type slowExecutor struct {
495495
}
496496

497497
func (e *slowExecutor) Execute(ctx *types.Context, robot *types.Robot, trigger types.TriggerType, data interface{}) (*types.Execution, error) {
498+
return e.ExecuteWithControl(ctx, robot, trigger, data, "", nil)
499+
}
500+
501+
func (e *slowExecutor) ExecuteWithID(ctx *types.Context, robot *types.Robot, trigger types.TriggerType, data interface{}, execID string) (*types.Execution, error) {
502+
return e.ExecuteWithControl(ctx, robot, trigger, data, execID, nil)
503+
}
504+
505+
func (e *slowExecutor) ExecuteWithControl(ctx *types.Context, robot *types.Robot, trigger types.TriggerType, data interface{}, execID string, control types.ExecutionControl) (*types.Execution, error) {
498506
if robot == nil {
499507
return nil, types.ErrRobotNotFound
500508
}
501509

510+
// Use provided execID or generate one
511+
if execID == "" {
512+
execID = "exec_slow_" + robot.MemberID
513+
}
502514
exec := &types.Execution{
503-
ID: "exec_slow_" + robot.MemberID,
515+
ID: execID,
504516
MemberID: robot.MemberID,
505517
TeamID: robot.TeamID,
506518
TriggerType: trigger,

0 commit comments

Comments
 (0)