Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions livekit-agents/livekit/agents/beta/workflows/task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,13 @@ async def on_enter(self) -> None:

# when a task is done, the chat_ctx is going to be merged with the "caller" chat_ctx
# enabling summarization will result on only one ChatMessage added.
# keep every item to allow summarization to be more action-aware.
summarized_chat_ctx = await self.chat_ctx.copy(
exclude_instructions=True,
exclude_handoff=True,
exclude_config_update=True,
exclude_empty_message=True,
exclude_function_call=True,
exclude_instructions=False,
exclude_handoff=False,
exclude_config_update=False,
exclude_empty_message=False,
exclude_function_call=False,
)._summarize(llm_v=self.session.llm, keep_last_turns=0)
await self.update_chat_ctx(summarized_chat_ctx)
except Exception as e:
Expand Down
174 changes: 138 additions & 36 deletions livekit-agents/livekit/agents/llm/chat_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from __future__ import annotations

import textwrap
import time
from collections.abc import Generator, Sequence
from typing import TYPE_CHECKING, Annotated, Any, Literal, TypeAlias, overload
Expand Down Expand Up @@ -678,40 +679,83 @@ async def _summarize(
*,
keep_last_turns: int = 2,
) -> ChatContext:
to_summarize: list[ChatMessage] = []
for msg in self.messages():
if msg.role not in ("user", "assistant"):
continue
if msg.extra.get("is_summary") is True: # avoid making summary of summaries
continue
# Split self.items into head/tail. Walk backward, counting only
# user/assistant ChatMessages toward the keep_last_turns budget (each
# turn = one user + one assistant message, so budget = keep_last_turns * 2).
# Everything from the split point onward — including any interleaved
# FunctionCall/FunctionCallOutput items — is preserved as-is in the tail.
msg_budget = keep_last_turns * 2
split_idx = len(self.items)

if msg_budget > 0:
msg_count = 0
for i in range(len(self.items) - 1, -1, -1):
item = self.items[i]
if isinstance(item, ChatMessage) and item.role in ("user", "assistant"):
msg_count += 1
if msg_count >= msg_budget:
split_idx = i
break
else:
# Not enough messages to fill the budget — nothing to summarize
return self

if split_idx == 0:
return self

head_items, tail_items = self.items[:split_idx], self.items[split_idx:]

# Build summarization input from head_items only.
to_summarize: list[ChatMessage | FunctionCall | FunctionCallOutput] = []
for item in head_items:
if isinstance(item, ChatMessage):
if item.role not in ("user", "assistant"):
continue
if item.extra.get("is_summary") is True: # avoid making summary of summaries
continue

text = (item.text_content or "").strip()
if text:
to_summarize.append(item)
elif isinstance(item, (FunctionCall, FunctionCallOutput)):
to_summarize.append(item)

text = (msg.text_content or "").strip()
if text:
to_summarize.append(msg)
if not to_summarize:
return self

tail_n = max(0, min(len(to_summarize), keep_last_turns * 2))
if tail_n == 0:
head, tail = to_summarize, []
else:
head, tail = to_summarize[:-tail_n], to_summarize[-tail_n:]
# Render items to XML format and collect the contents.
contents: list[str] = []
for m in to_summarize:
if isinstance(m, (FunctionCall, FunctionCallOutput)):
contents.append(_function_call_item_to_message(m).text_content or "")
else:
contents.append(to_xml(m.role, (m.text_content or "").strip()))

if not head:
return self
source_text = "\n".join(contents).strip()

source_text = "\n".join(f"{m.role}: {(m.text_content or '').strip()}" for m in head).strip()
if not source_text:
return self

chat_ctx = ChatContext()
chat_ctx.add_message(
role="system",
content=(
"Compress older chat history into a short, faithful summary.\n"
"Focus on user goals, constraints, decisions, key facts/preferences/entities, and pending tasks.\n"
"Exclude chit-chat and greetings. Be concise."
),
content=textwrap.dedent("""\
Compress older conversation history into a short, faithful summary.

The conversation is formatted as XML. Here is how to read it:
- <user>…</user> — something the user said.
- <assistant>…</assistant> — something the assistant said.
- <function_call name="…" call_id="…">…</function_call> — the assistant invoked an action.
- <function_call_output name="…" call_id="…">…</function_call_output> — the result of that \
action. May contain <error>…</error> if it failed.

Guidelines:
- Distill the *information learned* from function call outputs into the summary. \
Do not mention that a tool/function was called — just preserve the knowledge gained.
- Focus on: user goals, constraints, decisions, key facts, preferences, entities, \
and any pending or unresolved tasks.
- Omit greetings, filler, and chit-chat.
- Be concise."""),
)
chat_ctx.add_message(
role="user",
Expand All @@ -728,33 +772,31 @@ async def _summarize(
if not summary:
return self

tail_start_ts = tail[0].created_at if tail else float("inf")

# Rebuild self._items. From head_items, keep only structural
# items (system messages, agent handoffs, config updates, prior
# summaries) — everything summarizable is replaced by the summary.
# Tail items are appended as-is.
preserved: list[ChatItem] = []
for it in self.items:
if (
it.type in ("function_call", "function_call_output")
and it.created_at < tail_start_ts
):
for it in head_items:
if isinstance(it, ChatMessage) and it.role in ("user", "assistant"):
continue

if it.type == "message" and it.role in ("user", "assistant"):
if isinstance(it, (FunctionCall, FunctionCallOutput)):
continue

preserved.append(it)

self._items = preserved

created_at_hint = (tail[0].created_at - 1e-6) if tail else (head[-1].created_at + 1e-6)
created_at_hint = (
(tail_items[0].created_at - 1e-6) if tail_items else (head_items[-1].created_at + 1e-6)
)
self.add_message(
role="assistant",
content=f"[history summary]\n{summary}",
content=to_xml("chat_history_summary", summary),
created_at=created_at_hint,
extra={"is_summary": True},
)

for msg in tail:
self._items.append(msg)
self._items.extend(tail_items)

return self

Expand Down Expand Up @@ -836,3 +878,63 @@ def __init__(self, items: list[ChatItem]):
@property
def readonly(self) -> bool:
return True


def _to_attrs_str(attrs: dict[str, Any] | None = None) -> str | None:
if attrs:
return " ".join([f'{k}="{v}"' for k, v in attrs.items()])
return None


def to_xml(
tag_name: str,
content: str | None = None,
attrs: dict[str, Any] | None = None,
) -> str:
attrs_str = _to_attrs_str(attrs)

if content:
return "\n".join(
[
f"<{tag_name} {attrs_str}>" if attrs_str else f"<{tag_name}>",
content,
f"</{tag_name}>",
]
)
else:
return f"<{tag_name} {attrs_str} />" if attrs_str else f"<{tag_name} />"


def _function_call_item_to_message(item: FunctionCall | FunctionCallOutput) -> ChatMessage:
if isinstance(item, FunctionCall):
return ChatMessage(
role="user",
content=[
to_xml(
"function_call",
item.arguments,
attrs={
"name": item.name,
"call_id": item.call_id,
},
)
],
created_at=item.created_at,
extra={"is_function_call": True},
)
elif isinstance(item, FunctionCallOutput):
return ChatMessage(
role="assistant",
content=[
to_xml(
"function_call_output",
item.output if not item.is_error else to_xml("error", item.output),
attrs={
"call_id": item.call_id,
"name": item.name,
},
)
],
created_at=item.created_at,
extra={"is_function_call_output": True},
)
Loading
Loading