@@ -59,11 +59,13 @@ use super::model_selector::autopilot::AutoPilot;
5959use super :: platform_tools;
6060use super :: tool_execution:: { ToolCallResult , CHAT_MODE_TOOL_SKIPPED_RESPONSE , DECLINED_RESPONSE } ;
6161use crate :: agents:: subagent_task_config:: TaskConfig ;
62- use crate :: conversation:: message:: { Message , ToolRequest } ;
62+ use crate :: conversation:: message:: { Message , MessageContent , SystemNotificationType , ToolRequest } ;
6363use crate :: session:: extension_data:: { EnabledExtensionsState , ExtensionState } ;
6464use crate :: session:: SessionManager ;
6565
6666const DEFAULT_MAX_TURNS : u32 = 1000 ;
67+ const COMPACTION_THINKING_TEXT : & str = "goose is compacting the conversation..." ;
68+ const MANUAL_COMPACT_TRIGGER : & str = "Please compact this conversation" ;
6769
6870/// Context needed for the reply function
6971pub struct ReplyContext {
@@ -745,77 +747,99 @@ impl Agent {
745747 session : Option < SessionConfig > ,
746748 cancel_token : Option < CancellationToken > ,
747749 ) -> Result < BoxStream < ' _ , Result < AgentEvent > > > {
748- // Try to get session metadata for more accurate token counts
749- let session_metadata = if let Some ( session_config) = & session {
750- SessionManager :: get_session ( & session_config. id , false )
751- . await
752- . ok ( )
753- } else {
754- None
755- } ;
756-
757- let check_result = crate :: context_mgmt:: check_if_compaction_needed (
758- self ,
759- & unfixed_conversation,
760- None ,
761- session_metadata. as_ref ( ) ,
762- )
763- . await ;
750+ let is_manual_compact = unfixed_conversation. messages ( ) . last ( ) . is_some_and ( |msg| {
751+ msg. content . iter ( ) . any ( |c| {
752+ if let MessageContent :: Text ( text) = c {
753+ text. text . trim ( ) == MANUAL_COMPACT_TRIGGER
754+ } else {
755+ false
756+ }
757+ } )
758+ } ) ;
764759
765- let ( did_compact, compacted_conversation, compaction_error) = match check_result {
766- // TODO(dkatz): send a notification that we are starting compaction here.
767- Ok ( true ) => {
768- match crate :: context_mgmt:: compact_messages ( self , & unfixed_conversation, false )
760+ if !is_manual_compact {
761+ let session_metadata = if let Some ( session_config) = & session {
762+ SessionManager :: get_session ( & session_config. id , false )
769763 . await
770- {
771- Ok ( ( conversation, _token_counts, _summarization_usage) ) => {
772- ( true , conversation, None )
773- }
774- Err ( e) => ( false , unfixed_conversation. clone ( ) , Some ( e) ) ,
775- }
764+ . ok ( )
765+ } else {
766+ None
767+ } ;
768+
769+ let needs_auto_compact = crate :: context_mgmt:: check_if_compaction_needed (
770+ self ,
771+ & unfixed_conversation,
772+ None ,
773+ session_metadata. as_ref ( ) ,
774+ )
775+ . await ?;
776+
777+ if !needs_auto_compact {
778+ return self
779+ . reply_internal ( unfixed_conversation, session, cancel_token)
780+ . await ;
776781 }
777- Ok ( false ) => ( false , unfixed_conversation, None ) ,
778- Err ( e) => ( false , unfixed_conversation. clone ( ) , Some ( e) ) ,
779- } ;
782+ }
780783
781- if did_compact {
782- // Get threshold from config to include in message
783- let config = crate :: config:: Config :: global ( ) ;
784- let threshold = config
785- . get_param :: < f64 > ( "GOOSE_AUTO_COMPACT_THRESHOLD" )
786- . unwrap_or ( DEFAULT_COMPACTION_THRESHOLD ) ;
787- let threshold_percentage = ( threshold * 100.0 ) as u32 ;
788-
789- let compaction_msg = format ! (
790- "Exceeded auto-compact threshold of {}%. Context has been summarized and reduced.\n \n " ,
791- threshold_percentage
792- ) ;
784+ let conversation_to_compact = unfixed_conversation. clone ( ) ;
785+
786+ Ok ( Box :: pin ( async_stream:: try_stream! {
787+ if !is_manual_compact {
788+ let config = crate :: config:: Config :: global( ) ;
789+ let threshold = config
790+ . get_param:: <f64 >( "GOOSE_AUTO_COMPACT_THRESHOLD" )
791+ . unwrap_or( DEFAULT_COMPACTION_THRESHOLD ) ;
792+ let threshold_percentage = ( threshold * 100.0 ) as u32 ;
793+
794+ let inline_msg = format!(
795+ "Exceeded auto-compact threshold of {}%. Performing auto-compaction..." ,
796+ threshold_percentage
797+ ) ;
793798
794- Ok ( Box :: pin ( async_stream:: try_stream! {
795- // TODO(Douwe): send this before we actually compact:
796799 yield AgentEvent :: Message (
797- Message :: assistant( ) . with_conversation_compacted( compaction_msg)
800+ Message :: assistant( ) . with_system_notification(
801+ SystemNotificationType :: InlineMessage ,
802+ inline_msg,
803+ )
798804 ) ;
799- yield AgentEvent :: HistoryReplaced ( compacted_conversation. clone( ) ) ;
800- if let Some ( session_to_store) = & session {
801- SessionManager :: replace_conversation( & session_to_store. id, & compacted_conversation) . await ?
802- }
805+ }
806+
807+ yield AgentEvent :: Message (
808+ Message :: assistant( ) . with_system_notification(
809+ SystemNotificationType :: ThinkingMessage ,
810+ COMPACTION_THINKING_TEXT ,
811+ )
812+ ) ;
813+
814+ match crate :: context_mgmt:: compact_messages( self , & conversation_to_compact, false ) . await {
815+ Ok ( ( compacted_conversation, _token_counts, _summarization_usage) ) => {
816+ if let Some ( session_to_store) = & session {
817+ SessionManager :: replace_conversation( & session_to_store. id, & compacted_conversation) . await ?;
818+ }
819+
820+ yield AgentEvent :: HistoryReplaced ( compacted_conversation. clone( ) ) ;
803821
804- let mut reply_stream = self . reply_internal( compacted_conversation, session, cancel_token) . await ?;
805- while let Some ( event) = reply_stream. next( ) . await {
806- yield event?;
822+ yield AgentEvent :: Message (
823+ Message :: assistant( ) . with_system_notification(
824+ SystemNotificationType :: InlineMessage ,
825+ "Compaction complete" ,
826+ )
827+ ) ;
828+
829+ if !is_manual_compact {
830+ let mut reply_stream = self . reply_internal( compacted_conversation, session, cancel_token) . await ?;
831+ while let Some ( event) = reply_stream. next( ) . await {
832+ yield event?;
833+ }
834+ }
807835 }
808- } ) )
809- } else if let Some ( error) = compaction_error {
810- Ok ( Box :: pin ( async_stream:: try_stream! {
811- yield AgentEvent :: Message ( Message :: assistant( ) . with_text(
812- format!( "Ran into this error trying to auto-compact: {error}.\n \n Please try again or create a new session" )
813- ) ) ;
814- } ) )
815- } else {
816- self . reply_internal ( compacted_conversation, session, cancel_token)
817- . await
818- }
836+ Err ( e) => {
837+ yield AgentEvent :: Message ( Message :: assistant( ) . with_text(
838+ format!( "Ran into this error trying to compact: {e}.\n \n Please try again or create a new session" )
839+ ) ) ;
840+ }
841+ }
842+ } ) )
819843 }
820844
821845 /// Main reply method that handles the actual agent processing
@@ -1138,23 +1162,29 @@ impl Agent {
11381162 }
11391163 }
11401164 Err ( ProviderError :: ContextLengthExceeded ( _error_msg) ) => {
1141- info!( "Context length exceeded, attempting compaction" ) ;
1165+ yield AgentEvent :: Message (
1166+ Message :: assistant( ) . with_system_notification(
1167+ SystemNotificationType :: InlineMessage ,
1168+ "Context limit reached. Compacting to continue conversation..." ,
1169+ )
1170+ ) ;
1171+ yield AgentEvent :: Message (
1172+ Message :: assistant( ) . with_system_notification(
1173+ SystemNotificationType :: ThinkingMessage ,
1174+ COMPACTION_THINKING_TEXT ,
1175+ )
1176+ ) ;
11421177
1143- // TODO(dkatz): send a notification that we are starting compaction here.
11441178 match crate :: context_mgmt:: compact_messages( self , & conversation, true ) . await {
11451179 Ok ( ( compacted_conversation, _token_counts, _usage) ) => {
1180+ if let Some ( session_to_store) = & session {
1181+ SessionManager :: replace_conversation( & session_to_store. id, & compacted_conversation) . await ?
1182+ }
1183+
11461184 conversation = compacted_conversation;
11471185 did_recovery_compact_this_iteration = true ;
11481186
1149- yield AgentEvent :: Message (
1150- Message :: assistant( ) . with_conversation_compacted(
1151- "Context limit reached. Conversation has been automatically compacted to continue."
1152- )
1153- ) ;
11541187 yield AgentEvent :: HistoryReplaced ( conversation. clone( ) ) ;
1155- if let Some ( session_to_store) = & session {
1156- SessionManager :: replace_conversation( & session_to_store. id, & conversation) . await ?
1157- }
11581188 continue ;
11591189 }
11601190 Err ( e) => {
0 commit comments