Skip to content

Commit 6b0109f

Browse files
authored
[BREAKING] feat: add Dapr integration (#3)
* feat: add Dapr integration refactor: stop callback handling in Team class to accept whole conversation [BREAKING] * update: README * fix: dapr actors * fix: test case causing issues add: docs for remote and dapr * fix: unit test
1 parent a6df8ea commit 6b0109f

32 files changed

+1018
-265
lines changed

.vscode/settings.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"editor.formatOnSave": true,
55
"[python]": {
66
"editor.codeActionsOnSave": {
7-
"source.organizeImports": true,
7+
"source.organizeImports": "explicit",
88
"source.unusedImports": "always"
99
}
1010
}

README.md

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ This project framework provides the following features:
4444
- Multiple strategies for agent to filter conversation messages (All, last N, top K and Last N, summarize, etc..)
4545
- LLMLingua (`extras` module) support to compress system prompts via strategies
4646
- LLM support for Structured Output
47+
- DAPR integration
48+
- Native submodule to host `Askable` and `Workflow` as Dapr Actors
49+
- Dapr _PubSub_ integration for `Workflow` to enable
50+
- Event sourcing
51+
- Decoupled communication between workflows
52+
- Multi-agent chat with multiple users
53+
- Dapr PubSub integrations allows to move from one-to-many to many-to-many conversations, with different `User` instances impersonating different user profiles
54+
- Demo Repo (_Coming soon_)
4755
- Remoting support ((`remote` module)), allowing agents to be run on a remote server and accessed elsewhere
4856
- REST and gRPC channels supported
4957
- Default implementation to run hosts with agent discovery and registration
@@ -56,8 +64,6 @@ This project framework provides the following features:
5664
- Azure AI Search plugin
5765
- DB plugin
5866
- API plugin
59-
- DAPR integration
60-
- Multi-agent chat with multiple users
6167

6268
## Getting Started
6369

@@ -131,6 +137,22 @@ result = workflow.run("Hello, I'd like to know more about your products.")
131137
print(workflow.conversation.messages)
132138
```
133139

140+
### Submodules
141+
142+
#### `remote`
143+
144+
This module provides a way to run agents on a remote server and access them elsewhere. It includes support for REST and gRPC channels, as well as a default implementation to run hosts with agent discovery and registration.
145+
146+
Additionally, it features Dapr integration, allowing for the hosting of `Askable` and `Workflow` as Dapr Actors, enabling event sourcing and decoupled communication between workflows.
147+
148+
See the [Remote Agents documentation](docs/remote.md) and [Actors documentation](docs/actors.md) for more information.
149+
150+
#### `extras`
151+
152+
This module provides additional features to enhance the functionality of the framework. It includes support for:
153+
154+
- `LLMLingua` to compress system prompts
155+
134156
## Demos
135157

136158
`notebooks` folder contains a few demo notebooks that demonstrate how to use the framework in various scenarios.

docs/actors.md

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# Workflows as Actors
2+
3+
Vanilla also offers a Virtual Actor pattern implementation via Dapr. This allows for the hosting of `Workflow` as Dapr Actors, enabling event sourcing and decoupled communication between workflows, along with at-most-one client-to-actor interactions - only one client can interact with an actor at a time.
4+
5+
Being more specific, accessing an actor is done through a unique ID, which is used to identify the actor instance, that in our case matches a `Workflow` instance with its `Conversation`.
6+
7+
## Dapr integration
8+
9+
Dapr is a portable, event-driven runtime that makes it easy for developers to build resilient, microservice stateful applications. It offers a Virtual Actor pattern implementation, which allows for the hosting of `Workflow` as [Actors](https://docs.dapr.io/developing-applications/building-blocks/actors/actors-overview/) enabling event sourcing and decoupled communication between workflows via [PubSub](https://docs.dapr.io/developing-applications/building-blocks/pubsub/).
10+
11+
Thanks to Dapr components, Vanilla is independent of the underlying infrastructure, and can run on any platform that supports Dapr, such as `Azure Cosmos DB` for state management and `Azure Service Bus` for PubSub.
12+
13+
## Comparing to Remote Agents
14+
15+
The key difference between is standard remoting simply offers **stateless** agents, while Dapr Actors offer **stateful** workflows. This means that Dapr Actors can maintain state across multiple interactions, while remote agents are simply APIs that are called and return a result.
16+
17+
See the [Remote Agents documentation](remote.md) for more information.
18+
19+
## Specification
20+
21+
`vanilla_aiagents.remote.actors` provides the `WorkflowActor` class, which is simple wrapper around a `Workflow` instance with these main methods:
22+
23+
- `run`: Runs the `Workflow` instance with the configured `Askable` over the actor's `Conversation` state.
24+
- `run_stream`: Runs the `Workflow` instance with the configured `Askable` over the actor's `Conversation` state, but in streaming outputs as PubSub events.
25+
- `get_conversation`: Gets the current state of the `Conversation` in the actor.
26+
27+
Additionally, `vanilla_aiagents.remote.run_actors` provides a simple entrypoint to run a Dapr actor host with the `WorkflowActor` instances, and implements a PubSub event handler to receive `input` events to relay to the actors.
28+
29+
## Diagram
30+
31+
When a client calls an actor method (for example, `WorkflowActor.run`), Dapr locates the correct `WorkflowActor` by its unique ID and forwards the request. The actor then runs the `Workflow` with the provided input and can publish interim events to PubSub (if using `run_stream`). Finally, the actor sends its result back to the Dapr runtime, which returns it to the client.
32+
33+
```mermaid
34+
sequenceDiagram
35+
participant Client
36+
participant DaprRuntime
37+
participant WorkflowActor
38+
participant PubSub
39+
40+
Client->>DaprRuntime: Invoke actor method (e.g. run, run_stream)
41+
DaprRuntime->>WorkflowActor: Dispatch request by actor ID
42+
WorkflowActor->>Workflow: Execute Askable logic
43+
WorkflowActor->>PubSub: Publish events (e.g. stream data or stop signal)
44+
WorkflowActor->>DaprRuntime: Return response
45+
DaprRuntime->>Client: Response from actor
46+
```
47+
48+
## Events format
49+
50+
All events are sent as JSON strings following Dapr's PubSub conventions, leveraging CloudEvent v1 format. `vanilla_aiagents.remote.actors` provides a `WorkflowEvent` class to help with event serialization and deserialization, along with proper subclasses for each event type.
51+
52+
All events have the following attributes:
53+
54+
- `id`: The conversation ID, which is unique to the `Workflow` instance.
55+
- `type`: The event type, which can be `input`, `stream`, `update` or `stop`.
56+
57+
Specific event types have additional attributes:
58+
59+
- `input`: The input event, which is the input to the `Workflow` instance.
60+
- `input`: The input data to the `Workflow` instance.
61+
- `stream`: The stream event, which is the partial output of the `Workflow` instance.
62+
- `mark`: The mark of the stream event.
63+
- `content`: The partial output of the `Workflow` instance.
64+
- `stop`: The stop event, which is the signal to stop the `Workflow` instance.
65+
- `source`: The source `Askable` of the stop signal.
66+
- `update`: The update event, which is the signal to update the `Conversation` state.

docs/remote.md

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Remote agents
2+
3+
The framework supports remote agents. This allows you to run agents on a remote host and interact with them from another one, fostering a more distributed architecture and enabling agent reuse across different applications and `Team`.
4+
5+
The `remote` submodule provides a default implementation to run hosts with agent discovery and registration. It supports REST and gRPC channels. Then, you can simply swap any `Agent` with a `RemoteAgent` and interact with it as if it were local.
6+
7+
## Comparing to Actors
8+
9+
Vanilla also offers a Virtual Actor pattern implementation via Dapr. This allows for the hosting of `Workflow` as Dapr Actors, enabling event sourcing and decoupled communication between workflows.
10+
11+
The key difference between is standard remoting simply offers **stateless** agents, while Dapr Actors offer **stateful** workflows. This means that Dapr Actors can maintain state across multiple interactions, while remote agents are simply APIs that are called and return a result.
12+
13+
See the [Actors documentation](actors.md) for more information.
14+
15+
## Example
16+
17+
```python
18+
# agent_entry.py
19+
20+
from vanilla_aiagents.agent import Agent
21+
22+
agent = Agent(id="agent", description="A remote agent", system_message="You are a remote agent.")
23+
```
24+
25+
```dockerfile
26+
# Dockerfile
27+
28+
FROM python:3.12-slim
29+
30+
# Do all the copying and installing
31+
# ...
32+
33+
# Expose the application port (must match the one specified below)
34+
EXPOSE 80
35+
36+
# This is the entrypoint, Vanilla will automatically look for "_entry.py" files and expose them as agents
37+
CMD ["python", "-m", "vanilla_aiagents.remote.run_host", "--source-dir", ".", "--type", "rest", "--host", "0.0.0.0", "--port", "80"]
38+
```
39+
40+
```python
41+
# client.py
42+
43+
from vanilla_aiagents.remote.remote import RemoteAskable, RESTConnection
44+
45+
# Create a remote connection with the given protocol and host
46+
# NOTE: Make sure to set the HOST_URL environment variable to the host URL
47+
remote_connection = RESTConnection(url=os.getenv("HOST_URL"))
48+
49+
# ID here must match the agent ID in the remote host to use
50+
# NOTE: remote must be running and reachable at this point, since the RemoteAskable will try to connect to it
51+
remote = RemoteAskable(id="agent", connection=remote_connection)
52+
53+
team = Team(id="team", description="Contoso team", members=[remote], llm=llm)
54+
```

notebooks/basic_user.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@
9494
"outputs": [],
9595
"source": [
9696
"# Assemble the team and workflow\n",
97-
"flow = Team(id=\"team\", description=\"\", members=[agent1, user], llm=llm, stop_callback=lambda msgs: len(msgs) > 6)\n",
97+
"flow = Team(id=\"team\", description=\"\", members=[agent1, user], llm=llm, stop_callback=lambda conv: len(conv.messages) > 6)\n",
9898
"workflow = Workflow(askable=flow)"
9999
]
100100
},

notebooks/coding_azure.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
"from vanilla_aiagents.azure_coding_agent import AzureCodingAgent\n",
7777
"agent1 = AzureCodingAgent(id=\"agent1\", description=\"Agent 1\", llm=llm)\n",
7878
"\n",
79-
"flow = Team(id=\"team\", description=\"An agent capable of writing and executing code\", members=[agent1], llm=llm, stop_callback=lambda msgs: len(msgs) > 2)\n",
79+
"flow = Team(id=\"team\", description=\"An agent capable of writing and executing code\", members=[agent1], llm=llm, stop_callback=lambda conv: len(conv.messages) > 2)\n",
8080
"workflow = Workflow(askable=flow)\n",
8181
"workflow.conversation.variables[\"conversation_id\"] = \"test\"\n",
8282
"\n",

notebooks/coding_local.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070
"from vanilla_aiagents.coding_agent import LocalCodingAgent\n",
7171
"agent1 = LocalCodingAgent(id=\"agent1\", description=\"Agent 1\", llm=llm)\n",
7272
"\n",
73-
"flow = Team(id=\"team\", description=\"An agent capable of writing and executing code\", members=[agent1], llm=llm, stop_callback=lambda msgs: len(msgs) > 2)\n",
73+
"flow = Team(id=\"team\", description=\"An agent capable of writing and executing code\", members=[agent1], llm=llm, stop_callback=lambda conv: len(conv.messages) > 2)\n",
7474
"workflow = Workflow(askable=flow)\n",
7575
"\n",
7676
"workflow.run(\"What is latest quote for Apple Inc. stock? Generate code to get the quote.\")\n",

notebooks/include_tools.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@
9090
" return \"User balance: $100\"\n",
9191
"\n",
9292
"# include_tools_descriptions is set to True\n",
93-
"flow = Team(id=\"flow\", description=\"\", members=[first, second], llm=llm, stop_callback=lambda x: len(x) > 2, include_tools_descriptions=True, use_structured_output=False)\n"
93+
"flow = Team(id=\"flow\", description=\"\", members=[first, second], llm=llm, stop_callback=lambda conv: len(conv.messages) > 2, include_tools_descriptions=True, use_structured_output=False)\n"
9494
]
9595
},
9696
{

notebooks/nested.ipynb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,15 +146,15 @@
146146
},
147147
{
148148
"cell_type": "code",
149-
"execution_count": 7,
149+
"execution_count": null,
150150
"metadata": {},
151151
"outputs": [],
152152
"source": [
153153
"step1 = Team(id=\"step1\", description=\"Validates the input provided to ensure all required information are collected\", \n",
154154
" members=[data_collector, data_evaluator], \n",
155155
" system_prompt=\"\",\n",
156156
" llm=llm, \n",
157-
" stop_callback=lambda messages: messages[-1]['content'].lower().endswith(\"done\"))"
157+
" stop_callback=lambda conv: conv.messages[-1]['content'].lower().endswith(\"done\"))"
158158
]
159159
},
160160
{
@@ -185,12 +185,12 @@
185185
},
186186
{
187187
"cell_type": "code",
188-
"execution_count": 10,
188+
"execution_count": null,
189189
"metadata": {},
190190
"outputs": [],
191191
"source": [
192192
"step2 = Team(id=\"step2\", description=\"Processes the information gathered to provide the final answer\", members=[approver, approval_manager], llm=llm, \n",
193-
" stop_callback=lambda messages: messages[-1]['content'].lower().endswith(\"approve\") or messages[-1]['content'].lower().endswith(\"reject\"))"
193+
" stop_callback=lambda conv: conv.messages[-1]['content'].lower().endswith(\"approve\") or conv.messages[-1]['content'].lower().endswith(\"reject\"))"
194194
]
195195
},
196196
{

notebooks/streaming.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@
150150
"\n",
151151
"user = User(id=\"user\", mode=\"unattended\")\n",
152152
"\n",
153-
"flow = Team(id=\"team\", description=\"\", members=[agent1, user], llm=llm, stop_callback=lambda msgs: len(msgs) > 4)\n",
153+
"flow = Team(id=\"team\", description=\"\", members=[agent1, user], llm=llm, stop_callback=lambda conv: len(conv.messages) > 4)\n",
154154
"\n"
155155
]
156156
},

notebooks/telco_callcenter.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@
254254
"metadata": {},
255255
"outputs": [],
256256
"source": [
257-
"team = Team(id=\"team\", description=\"\", members=[user, greeter, sales, technical, legal], llm=llm, stop_callback=lambda msgs: len(msgs) > 6)\n",
257+
"team = Team(id=\"team\", description=\"\", members=[user, greeter, sales, technical, legal], llm=llm, stop_callback=lambda conv: len(conv.messages) > 6)\n",
258258
"workflow = Workflow(askable=team)"
259259
]
260260
},

notebooks/telco_callcenter_sequence.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@
195195
"metadata": {},
196196
"outputs": [],
197197
"source": [
198-
"team = Team(id=\"team\", description=\"\", members=[user, greeter, sales, technical], llm=llm, stop_callback=lambda msgs: len(msgs) > 6)\n"
198+
"team = Team(id=\"team\", description=\"\", members=[user, greeter, sales, technical], llm=llm, stop_callback=lambda conv: len(conv.messages) > 6)\n"
199199
]
200200
},
201201
{

requirements.txt

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
1-
openai==1.54.0
2-
httpx==0.27.2
3-
python-dotenv==1.0.1
4-
invoke==2.2.0
5-
pydantic==2.9.1
6-
azure-identity==1.19.0
7-
pytest==8.3.3
8-
coverage==7.6.1
9-
pytest-cov==5.0.0
10-
fastapi==0.115.5
11-
uvicorn==0.32.0
12-
grpcio-tools==1.66.2
13-
grpcio-reflection==1.67.0
14-
starlette_gzip_request==0.1.0
15-
pdoc==15.0.0
16-
llmlingua==0.2.2
17-
pydocstyle==6.3.0
18-
flake8==7.1.1
19-
flake8-docstrings==1.7.0
20-
docformatter==1.7.5
21-
black===24.10.0
1+
openai>=1.54.0
2+
httpx>=0.27.2
3+
python-dotenv>=1.0.1
4+
invoke>=2.2.0
5+
pydantic>=2.9.1
6+
azure-identity>=1.19.0
7+
pytest>=8.3.3
8+
coverage>=7.6.1
9+
pytest-cov>=5.0.0
10+
fastapi>=0.115.5
11+
uvicorn>=0.32.0
12+
grpcio-tools>=1.66.2
13+
grpcio-reflection>=1.67.0
14+
starlette_gzip_request>=0.1.0
15+
pdoc>=15.0.0
16+
llmlingua>=0.2.2
17+
pydocstyle>=6.3.0
18+
flake8>=7.1.1
19+
flake8-docstrings>=1.7.0
20+
docformatter>=1.7.5
21+
black>=24.10.0
22+
cloudevents>=1.11.0
23+
dapr-ext-fastapi>=1.14.0
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from typing import Annotated
2+
import os
3+
from vanilla_aiagents.agent import Agent
4+
from vanilla_aiagents.llm import AzureOpenAILLM
5+
6+
llm = AzureOpenAILLM(
7+
{
8+
"azure_deployment": os.getenv("AZURE_OPENAI_MODEL"),
9+
"azure_endpoint": os.getenv("AZURE_OPENAI_ENDPOINT"),
10+
"api_key": os.getenv("AZURE_OPENAI_KEY"),
11+
"api_version": os.getenv("AZURE_OPENAI_API_VERSION"),
12+
}
13+
)
14+
15+
_actor_askable = Agent(
16+
id="agent",
17+
llm=llm,
18+
description="Call this agent to play guess the number game with the use",
19+
system_message="""You are an AI assistant
20+
Your task is to play a game with the user.
21+
You first generate a random number between 1 and 100. Then save it as a conversation variable named "number".
22+
The user will try to guess the number.
23+
If the user's guess is too high, respond with "Too high".
24+
If the user's guess is too low, respond with "Too low".
25+
""",
26+
)
27+
28+
29+
@_actor_askable.register_tool(description="Generate a random number")
30+
def random() -> Annotated[str, "A random number"]:
31+
import random
32+
33+
return str(random.randint(1, 100))

samples/remote/actor/run.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import sys
2+
import os
3+
from dotenv import load_dotenv
4+
5+
load_dotenv(override=True)
6+
7+
sys.path.append(os.path.abspath(os.path.join("../../../vanilla_aiagents")))
8+
9+
from vanilla_aiagents.remote.dapr.run_actors import main
10+
11+
if __name__ == "__main__":
12+
main()

0 commit comments

Comments
 (0)