@@ -220,6 +220,7 @@ async def step_stream_no_tokens(
220220 actor = self .actor ,
221221 )
222222 stop_reason = None
223+ job_update_metadata = None
223224 usage = LettaUsageStatistics ()
224225
225226 # span for request
@@ -367,6 +368,7 @@ async def step_stream_no_tokens(
367368 except Exception as e :
368369 # Handle any unexpected errors during step processing
369370 self .logger .error (f"Error during step processing: { e } " )
371+ job_update_metadata = {"error" : str (e )}
370372
371373 # This indicates we failed after we decided to stop stepping, which indicates a bug with our flow.
372374 if not stop_reason :
@@ -429,7 +431,7 @@ async def step_stream_no_tokens(
429431 self .logger .error ("Invalid StepProgression value" )
430432
431433 if settings .track_stop_reason :
432- await self ._log_request (request_start_timestamp_ns , request_span )
434+ await self ._log_request (request_start_timestamp_ns , request_span , job_update_metadata , is_error = True )
433435
434436 except Exception as e :
435437 self .logger .error ("Failed to update step: %s" , e )
@@ -447,7 +449,7 @@ async def step_stream_no_tokens(
447449 force = False ,
448450 )
449451
450- await self ._log_request (request_start_timestamp_ns , request_span )
452+ await self ._log_request (request_start_timestamp_ns , request_span , job_update_metadata , is_error = False )
451453
452454 # Return back usage
453455 for finish_chunk in self .get_finish_chunks_for_stream (usage , stop_reason ):
@@ -485,6 +487,7 @@ async def _step(
485487 request_span .set_attributes ({f"llm_config.{ k } " : v for k , v in agent_state .llm_config .model_dump ().items () if v is not None })
486488
487489 stop_reason = None
490+ job_update_metadata = None
488491 usage = LettaUsageStatistics ()
489492 for i in range (max_steps ):
490493 # If dry run, build request data and return it without making LLM call
@@ -622,6 +625,7 @@ async def _step(
622625 except Exception as e :
623626 # Handle any unexpected errors during step processing
624627 self .logger .error (f"Error during step processing: { e } " )
628+ job_update_metadata = {"error" : str (e )}
625629
626630 # This indicates we failed after we decided to stop stepping, which indicates a bug with our flow.
627631 if not stop_reason :
@@ -680,7 +684,7 @@ async def _step(
680684 self .logger .error ("Invalid StepProgression value" )
681685
682686 if settings .track_stop_reason :
683- await self ._log_request (request_start_timestamp_ns , request_span )
687+ await self ._log_request (request_start_timestamp_ns , request_span , job_update_metadata , is_error = True )
684688
685689 except Exception as e :
686690 self .logger .error ("Failed to update step: %s" , e )
@@ -698,7 +702,7 @@ async def _step(
698702 force = False ,
699703 )
700704
701- await self ._log_request (request_start_timestamp_ns , request_span )
705+ await self ._log_request (request_start_timestamp_ns , request_span , job_update_metadata , is_error = False )
702706
703707 return current_in_context_messages , new_in_context_messages , stop_reason , usage
704708
@@ -748,6 +752,7 @@ async def step_stream(
748752 actor = self .actor ,
749753 )
750754 stop_reason = None
755+ job_update_metadata = None
751756 usage = LettaUsageStatistics ()
752757 first_chunk , request_span = True , None
753758 if request_start_timestamp_ns :
@@ -977,6 +982,7 @@ async def step_stream(
977982 except Exception as e :
978983 # Handle any unexpected errors during step processing
979984 self .logger .error (f"Error during step processing: { e } " )
985+ job_update_metadata = {"error" : str (e )}
980986
981987 # This indicates we failed after we decided to stop stepping, which indicates a bug with our flow.
982988 if not stop_reason :
@@ -1039,7 +1045,7 @@ async def step_stream(
10391045
10401046 # Do tracking for failure cases. Can consolidate with success conditions later.
10411047 if settings .track_stop_reason :
1042- await self ._log_request (request_start_timestamp_ns , request_span )
1048+ await self ._log_request (request_start_timestamp_ns , request_span , job_update_metadata , is_error = True )
10431049
10441050 except Exception as e :
10451051 self .logger .error ("Failed to update step: %s" , e )
@@ -1056,20 +1062,28 @@ async def step_stream(
10561062 force = False ,
10571063 )
10581064
1059- await self ._log_request (request_start_timestamp_ns , request_span )
1065+ await self ._log_request (request_start_timestamp_ns , request_span , job_update_metadata , is_error = False )
10601066
10611067 for finish_chunk in self .get_finish_chunks_for_stream (usage , stop_reason ):
10621068 yield f"data: { finish_chunk } \n \n "
10631069
1064- async def _log_request (self , request_start_timestamp_ns : int , request_span : "Span | None" ):
1070+ async def _log_request (
1071+ self , request_start_timestamp_ns : int , request_span : "Span | None" , job_update_metadata : dict | None , is_error : bool
1072+ ):
10651073 if request_start_timestamp_ns :
10661074 now_ns , now = get_utc_timestamp_ns (), get_utc_time ()
10671075 duration_ns = now_ns - request_start_timestamp_ns
10681076 if request_span :
10691077 request_span .add_event (name = "letta_request_ms" , attributes = {"duration_ms" : ns_to_ms (duration_ns )})
10701078 await self ._update_agent_last_run_metrics (now , ns_to_ms (duration_ns ))
1071- if self .current_run_id :
1079+ if settings . track_agent_run and self .current_run_id :
10721080 await self .job_manager .record_response_duration (self .current_run_id , duration_ns , self .actor )
1081+ await self .job_manager .safe_update_job_status_async (
1082+ job_id = self .current_run_id ,
1083+ new_status = JobStatus .failed if is_error else JobStatus .completed ,
1084+ actor = self .actor ,
1085+ metadata = job_update_metadata ,
1086+ )
10731087 if request_span :
10741088 request_span .end ()
10751089
@@ -1507,8 +1521,6 @@ async def _execute_tool(
15071521 status = "error" ,
15081522 )
15091523
1510- print (target_tool )
1511-
15121524 # TODO: This temp. Move this logic and code to executors
15131525
15141526 if agent_step_span :
0 commit comments