-
Notifications
You must be signed in to change notification settings - Fork 13
feat: introduce plan and route with supervisor #140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
* change it to /conversations/{conversation_id}/messages * Refactor some model code
* Build plan & route logic with langgraph for k8s and kyma agents
767247e
to
296474c
Compare
* Implement a langgraph node which summarizes the agent conversation as a final response * Upgrade langgraph
Main Kyma graph will have all the workflow of the companion. It'll have init node too.
* Write unit tests * Improve supervisor prompt to consider subtask statuses * Implement script to generate langgraph diagramm
* Create tests for all agents * Create tests for util functions * Create tests for kyma graph * Update the existing tests
* As we have only kyma and k8s questions deny irrelevant questions * Add coversation/id/messages route rest tests
* Improve plan & route with k8s and kyma terminology to route properly * Add common node to answer general questions * Improve error handling
2aab769
to
71feb75
Compare
71feb75
to
ec98aee
Compare
|
||
from agents.graph import KymaGraph | ||
|
||
load_dotenv() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you write comments on how to run this script and what env variables are needed to run this script?
|
||
messages: Annotated[Sequence[BaseMessage], operator.add] | ||
next: str | None | ||
subtasks: list[SubTask] | None = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: sub_tasks
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's leave as it is as the word subtask
really exists :)
class Plan(BaseModel): | ||
"""Plan to follow in future""" | ||
|
||
subtasks: list[SubTask] = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: sub_tasks?
supervisor_agent.py
Outdated
@@ -0,0 +1,233 @@ | |||
import functools |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this file supposed to be merged to main?
If not, please delete it.
If yes, please move to /examples
or /hack
directory.
src/agents/supervisor/agent.py
Outdated
|
||
model: Model | ||
_name: str = SUPERVISOR | ||
parser = PydanticOutputParser(pydantic_object=Plan) # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this parser used anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, below, you can search for it with state.subtasks = self.parser.parse(plan.content).subtasks
], | ||
} | ||
|
||
def common_node(self, state: AgentState) -> dict[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain what is the purpose of this node?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it answer the general questions. We'll move this to planner if possible in the next PR.
src/agents/graph.py
Outdated
PLANNER = "Planner" | ||
|
||
|
||
def should_continue(state: MessagesState) -> Literal["action", "__end__"]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have should_exit in the utils, and should_continue here? Shouldn't it be moved to utils as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll delete it
class CustomJSONEncoder(json.JSONEncoder): | ||
"""Custom JSON encoder.""" | ||
|
||
def default(self, obj): # noqa D102 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intermediate steps have to be streamed correctly according to an agreement in our API documentation. Currently, we receive everything in our output
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. There are mainly two reasons I haven't done it yet:
- first, didn't have enough time
- second, we need to add status code (error or success) for the chunks as anytime error may occur during streaming. The status code in the begining is almost always 200 due asynchronous behavior of FastAPI routes.
For that reason, I'd suggest to have follow-up ticket for this? We can think how to add the code and implement it in the follow-up ticket.
I'll also document this in the issue.
def common_node(self, state: AgentState) -> dict[str, Any]: | ||
"""Breaks down the given user query into sub-tasks.""" | ||
|
||
prompt = ChatPromptTemplate.from_messages( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing as the commend above about the common node. Consider using history during the planning. Consider merging those two nodes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need think whether we should consider the conversation or not for the planning. I am currently not sure.
I will document the points in the issue and improve them further in the next sprints.
Let's keep it simple for the start?
I thought we discussed this already, I've avoided this re-planning by separating out the.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if common questions, don't plan and directly send to Finalizer!
from langfuse.callback import CallbackHandler | ||
|
||
handler = CallbackHandler( | ||
secret_key=os.getenv("LANGFUSE_SECRET_KEY"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move to some constant variables
|
||
prompt = ChatPromptTemplate.from_messages( | ||
[ | ||
MessagesPlaceholder(variable_name="messages"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Supervisor agent should have an ability to fail properly in case other agents can't provide relevant information. Therefore it should probably analyze provided information and make a decision whether this information is sufficient and it can generate a comprehensive response or it should fail. I suspect, additional prompting should be done in the finalizer. Here is an example where an agent fails in doing that:
request_response1.txt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case there 4 subtasks are created how to analyse the failing pod. We should discuss and optimize the planner accordingly in the next sprints. For that we need to first discuss whether we need to have detailed plan or keep the original sentences.
With this ticket, we know how we can do the planning and routing now. I'd suggest to optimize it step-by-step.
I'll document this too.
|
||
result = agent.invoke(state) | ||
return {"messages": [HumanMessage(content=result["output"], name=name)]} | ||
logger = get_logger(__name__) | ||
|
||
|
||
class SupervisorAgent: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider renaming to task executor. Then we will merge planner, task executor, replan, and finalizer into the subgraph called SupervisorAgent in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would leave this as SupervisorAgent as it eventually will have more functionality. Planning is just temporarily separated out for the start. It will be part of SupervisorAgent too soon.
It is currently not executing any task, it is looking at the conversation and subtasks and managing the communication to all other members of the supervisor team.
fdf49a6
to
fd842aa
Compare
} | ||
|
||
|
||
DEFAULT_NUMBER_OF_MESSAGES = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets move this to the common file?
|
||
logger = get_logger(__name__) | ||
|
||
REDIS_URL = f"{os.getenv('REDIS_URL')}/0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, this should be part of a separate module Config
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I already have this in mind for the next PR. I will create a TODO for this too.
class SingletonMeta(type): | ||
"""Singleton metaclass.""" | ||
|
||
_instances: dict[type, object] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why it needs to be a dict? Can it be a single _instance: object
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this meta class have class field _instances which can be used by the other classes too. It means it stores single instances for multiple classes.
Description
Introduces first implementation of Plan and Route features of Supervisor Agent.
Changes proposed in this pull request:
Related issue(s)
#112