Skip to content

Commit d450d0d

Browse files
hasinaxpspandan_mondal
and
spandan_mondal
authored
Agentic flow integration2 (#1861)
* Agentic flow added to - whatsapp broadcast, mail channel, schedule action * Agentic flow added to - whatsapp broadcast, mail channel, schedule action * Agentic flow added to - whatsapp broadcast, mail channel, schedule action * Agentic flow global slots, tagging, context - --------- Co-authored-by: spandan_mondal <[email protected]>
1 parent 0c45176 commit d450d0d

29 files changed

+1143
-93
lines changed

kairon/__init__.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545

4646
def create_argument_parser():
4747
from kairon.cli import importer, training, testing, conversations_deletion, translator, delete_logs,\
48-
message_broadcast,content_importer, mail_channel_read
48+
message_broadcast,content_importer, mail_channel_read, agentic_flow
4949

5050
parser = ArgumentParser(
5151
prog="kairon",
@@ -64,6 +64,8 @@ def create_argument_parser():
6464
message_broadcast.add_subparser(subparsers, parents=parent_parsers)
6565
content_importer.add_subparser(subparsers, parents=parent_parsers)
6666
mail_channel_read.add_subparser(subparsers, parents=parent_parsers)
67+
agentic_flow.add_subparser(subparsers, parents=parent_parsers)
68+
6769
return parser
6870

6971

kairon/actions/definitions/schedule.py

+43-13
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
from calendar import timegm
23
from datetime import datetime
34
import pickle
@@ -20,11 +21,12 @@
2021
from kairon.actions.definitions.base import ActionsBase
2122
from kairon.events.executors.factory import ExecutorFactory
2223
from kairon.exceptions import AppException
23-
from kairon.shared.actions.data_objects import ActionServerLogs, ScheduleAction
24+
from kairon.shared.actions.data_objects import ActionServerLogs, ScheduleAction, ScheduleActionType
2425
from kairon.shared.actions.exception import ActionFailure
2526
from kairon.shared.actions.models import ActionType
2627
from kairon.shared.actions.utils import ActionUtility
2728
from kairon.shared.callback.data_objects import CallbackConfig
29+
from kairon.shared.constants import EventClass
2830
from kairon.shared.data.constant import TASK_TYPE
2931

3032

@@ -78,7 +80,8 @@ async def execute(self, dispatcher: CollectingDispatcher, tracker: Tracker, doma
7880
schedule_action = None
7981
schedule_time = None
8082
timezone = None
81-
pyscript_code = None
83+
execution_info = None
84+
event_data = None
8285
try:
8386
action_config = self.retrieve_config()
8487
dispatch_bot_response = action_config.get('dispatch_bot_response', True)
@@ -87,8 +90,6 @@ async def execute(self, dispatcher: CollectingDispatcher, tracker: Tracker, doma
8790
tracker_data.update({'bot': self.bot})
8891
schedule_action = action_config['schedule_action']
8992
timezone = action_config['timezone']
90-
callback = CallbackConfig.get_entry(name=schedule_action, bot=self.bot)
91-
pyscript_code = callback['pyscript_code']
9293
schedule_time, _ = ActionUtility.get_parameter_value(tracker_data,
9394
action_config['schedule_time'],
9495
self.bot)
@@ -98,12 +99,35 @@ async def execute(self, dispatcher: CollectingDispatcher, tracker: Tracker, doma
9899
action_config['params_list'],
99100
self.bot)
100101
logger.info("schedule_data: " + str(schedule_data_log))
101-
event_data = {'data': {'source_code': callback['pyscript_code'],
102-
'predefined_objects': schedule_data
103-
},
104-
'date_time': date_parser.parse(schedule_time),
105-
'timezone': action_config['timezone']
106-
}
102+
103+
if action_config['schedule_action_type'] == ScheduleActionType.PYSCRIPT.value:
104+
callback = CallbackConfig.get_entry(name=schedule_action, bot=self.bot)
105+
event_data = {'data': {'source_code': callback['pyscript_code'],
106+
'predefined_objects': schedule_data
107+
},
108+
'date_time': date_parser.parse(schedule_time),
109+
'timezone': action_config['timezone']
110+
}
111+
execution_info = {
112+
'pyscript_code': callback['pyscript_code'],
113+
'type': ScheduleActionType.PYSCRIPT.value
114+
}
115+
elif action_config['schedule_action_type'] == ScheduleActionType.FLOW.value:
116+
event_data = {
117+
'data': {
118+
'slot_data': json.dumps(schedule_data),
119+
'flow_name': schedule_action,
120+
'bot': self.bot,
121+
'user': tracker.sender_id
122+
},
123+
'date_time': date_parser.parse(schedule_time),
124+
'timezone': action_config['timezone'],
125+
'is_flow': True
126+
}
127+
execution_info = {
128+
'flow': schedule_action,
129+
'type': ScheduleActionType.FLOW.value
130+
}
107131
await self.add_schedule_job(**event_data)
108132
except Exception as e:
109133
exception = e
@@ -128,7 +152,7 @@ async def execute(self, dispatcher: CollectingDispatcher, tracker: Tracker, doma
128152
schedule_action=schedule_action,
129153
schedule_time=schedule_time,
130154
timezone=timezone,
131-
pyscript_code=pyscript_code,
155+
execution_info=execution_info,
132156
data=schedule_data_log
133157
).save()
134158
return {}
@@ -137,13 +161,19 @@ async def add_schedule_job(self,
137161
date_time: datetime,
138162
data: Dict,
139163
timezone: Text,
164+
is_flow=False,
140165
**kwargs):
141166
func = obj_to_ref(ExecutorFactory.get_executor().execute_task)
142167

143168
_id = uuid7().hex
144-
data['predefined_objects']['event'] = _id
145169
args = (func, "scheduler_evaluator", data,)
146-
kwargs.update({'task_type': TASK_TYPE.ACTION.value})
170+
if is_flow:
171+
args = (ExecutorFactory.get_executor(), EventClass.agentic_flow, data,)
172+
kwargs.update({'task_type': TASK_TYPE.EVENT.value})
173+
else:
174+
kwargs.update({'task_type': TASK_TYPE.ACTION.value})
175+
data['predefined_objects']['event'] = _id
176+
147177
trigger = DateTrigger(run_date=date_time, timezone=timezone)
148178

149179
next_run_time = trigger.get_next_fire_time(None, datetime.now(astimezone(timezone) or get_localzone()))

kairon/api/app/routers/bot/bot.py

+31
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from kairon.shared.data.audit.processor import AuditDataProcessor
3636
from kairon.shared.data.constant import ENDPOINT_TYPE, ModelTestType, \
3737
AuditlogActions
38+
from kairon.shared.data.data_models import FlowTagChangeRequest
3839
from kairon.shared.data.data_objects import TrainingExamples, ModelTraining, Rules
3940
from kairon.shared.data.model_processor import ModelProcessor
4041
from kairon.shared.data.processor import MongoProcessor
@@ -1684,3 +1685,33 @@ async def trigger_mail_channel_read(
16841685
event.validate()
16851686
event.enqueue()
16861687
return Response(message="mail channel read triggered")
1688+
1689+
1690+
@router.post("/change_flow_tag", response_model=Response)
1691+
async def change_flow_tag(
1692+
request: FlowTagChangeRequest,
1693+
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS)
1694+
):
1695+
"""
1696+
change tag or rule or multiflow
1697+
"""
1698+
data = request.dict()
1699+
mongo_processor.change_flow_tag(
1700+
bot = current_user.get_bot(),
1701+
flow_name=data['name'],
1702+
tag=data['tag'],
1703+
flow_type=data['type']
1704+
)
1705+
return Response(message=f"Flow tag changed to '{data['tag']}'")
1706+
1707+
@router.get("/flow_tag/{tag}", response_model=Response)
1708+
async def get_flow_tag(
1709+
tag: str = Path(description="flow tag"),
1710+
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=TESTER_ACCESS)
1711+
):
1712+
"""
1713+
Fetches the flows with the given tag
1714+
"""
1715+
flows = mongo_processor.get_flows_by_tag(current_user.get_bot(), tag)
1716+
return Response(data=flows)
1717+

kairon/cli/agentic_flow.py

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
2+
from typing import List
3+
from rasa.cli import SubParsersAction
4+
5+
from kairon.events.definitions.agentic_flow import AgenticFlowEvent
6+
7+
8+
def exec_agentic_flow(args):
9+
AgenticFlowEvent(args.bot, args.user).execute(flow_name=args.flow_name, slot_data=args.slot_data)
10+
11+
12+
def add_subparser(subparsers: SubParsersAction, parents: List[ArgumentParser]):
13+
agentic_fow_parser = subparsers.add_parser(
14+
"agentic-flow",
15+
conflict_handler="resolve",
16+
formatter_class=ArgumentDefaultsHelpFormatter,
17+
parents=parents,
18+
help="Mail channel initiate reading"
19+
)
20+
agentic_fow_parser.add_argument('bot',
21+
type=str,
22+
help="Bot id for which command is executed", action='store')
23+
24+
agentic_fow_parser.add_argument('user',
25+
type=str,
26+
help="Kairon user who is initiating the command", action='store')
27+
28+
agentic_fow_parser.add_argument('flow_name',
29+
type=str,
30+
help="Kairon flow name to execute", action='store')
31+
32+
agentic_fow_parser.add_argument('slot_data',
33+
type=str,
34+
help="json containing slot values dictionary", action='store')
35+
36+
agentic_fow_parser.set_defaults(func=exec_agentic_flow)
+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import asyncio
2+
import json
3+
from typing import Text
4+
from loguru import logger
5+
from kairon import Utility
6+
from kairon.events.definitions.base import EventsBase
7+
from kairon.exceptions import AppException
8+
from kairon.shared.chat.agent.agent_flow import AgenticFlow
9+
from kairon.shared.constants import EventClass
10+
11+
12+
class AgenticFlowEvent(EventsBase):
13+
"""
14+
Event to execute an agentic flow
15+
"""
16+
17+
def __init__(self, bot: Text, user: Text, **kwargs):
18+
"""
19+
Initialise event.
20+
"""
21+
self.bot = bot
22+
self.user = user
23+
self.flow_name = kwargs.get('flow_name')
24+
25+
def validate(self):
26+
"""
27+
validate mail channel exists and works properly
28+
"""
29+
if self.flow_name:
30+
return AgenticFlow.flow_exists(self.bot, self.flow_name)
31+
32+
def enqueue(self, **kwargs):
33+
"""
34+
Send event to event server.
35+
"""
36+
try:
37+
payload = {'bot': self.bot, 'user': self.user}
38+
if flow_name := kwargs.get('flow_name'):
39+
payload['flow_name'] = flow_name
40+
self.flow_name = flow_name
41+
if slot_data := kwargs.get('slot_data'):
42+
payload['slot_data'] = slot_data
43+
self.validate()
44+
Utility.request_event_server(EventClass.agentic_flow, payload)
45+
except Exception as e:
46+
logger.error(str(e))
47+
raise AppException(e)
48+
49+
def execute(self, **kwargs):
50+
"""
51+
Execute the event.
52+
"""
53+
try:
54+
if flow_name := kwargs.get('flow_name'):
55+
self.flow_name = flow_name
56+
57+
slot_vals = {}
58+
if slot_data := kwargs.get('slot_data'):
59+
if isinstance(slot_data, str):
60+
slot_data = json.loads(slot_data)
61+
slot_vals = slot_data
62+
flow = AgenticFlow(bot=self.bot, slot_vals=slot_vals)
63+
resp, errors = asyncio.run(flow.execute_rule(self.flow_name))
64+
logger.info(resp)
65+
if errors:
66+
logger.error(f"Failed to execute flow {self.flow_name}. Errors: {errors}")
67+
raise AppException(f"Failed to execute flow {self.flow_name}. Errors: {errors}")
68+
except Exception as e:
69+
logger.error(str(e))
70+
raise AppException(f"Failed to execute flow {self.flow_name} for bot {self.bot}. Error: {str(e)}")

kairon/events/definitions/factory.py

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from kairon.events.definitions.agentic_flow import AgenticFlowEvent
12
from kairon.events.definitions.content_importer import DocContentImporterEvent
23
from kairon.events.definitions.data_importer import TrainingDataImporterEvent
34
from kairon.events.definitions.faq_importer import FaqDataImporterEvent
@@ -23,6 +24,7 @@ class EventFactory:
2324
EventClass.message_broadcast: MessageBroadcastEvent,
2425
EventClass.content_importer: DocContentImporterEvent,
2526
EventClass.mail_channel_read_mails: MailReadEvent,
27+
EventClass.agentic_flow: AgenticFlowEvent
2628
}
2729

2830
@staticmethod

kairon/shared/actions/data_objects.py

+8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from datetime import datetime
2+
from enum import Enum
23

34
from mongoengine import (
45
EmbeddedDocument,
@@ -963,6 +964,10 @@ def clean(self):
963964
raise ValidationError("Action name cannot start with utter_")
964965

965966

967+
class ScheduleActionType(Enum):
968+
PYSCRIPT = "pyscript"
969+
FLOW = "flow"
970+
966971
@auditlogger.log
967972
@push_notification.apply
968973
class ScheduleAction(Auditlog):
@@ -972,6 +977,9 @@ class ScheduleAction(Auditlog):
972977
schedule_time = EmbeddedDocumentField(CustomActionDynamicParameters)
973978
timezone = StringField(default="UTC", required=True)
974979
schedule_action = StringField(required=True)
980+
schedule_action_type = StringField(
981+
default=ScheduleActionType.PYSCRIPT.value, choices=[type.value for type in ScheduleActionType]
982+
)
975983
response_text = StringField(required=False)
976984
params_list = ListField(
977985
EmbeddedDocumentField(CustomActionRequestParameters), required=False

kairon/shared/callback/data_objects.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def get_all_names(bot) -> list[str]:
9999
return list(names)
100100

101101
@staticmethod
102-
def get_entry(bot, name) -> dict:
102+
def get_entry(bot :str, name :str) -> dict:
103103
entry = CallbackConfig.objects(bot=bot, name__iexact=name).first()
104104
if not entry:
105105
raise AppException(f"Callback Configuration with name '{name}' does not exist!")
@@ -147,7 +147,7 @@ def create_entry(bot: str,
147147
return config.to_mongo().to_dict()
148148

149149
@staticmethod
150-
def get_auth_token(bot, name) -> tuple[str, bool]:
150+
def get_auth_token(bot: str, name: str) -> tuple[str, bool]:
151151
entry = CallbackConfig.objects(bot=bot, name__iexact=name).first()
152152
if not entry:
153153
raise AppException(f"Callback Configuration with name '{name}' does not exist!")
@@ -281,7 +281,7 @@ def create_entry(name: str, callback_config_name: str, bot: str, sender_id: str,
281281
return callback_url, identifier, is_standalone
282282

283283
@staticmethod
284-
def get_value_from_json(json_obj, path):
284+
def get_value_from_json(json_obj: Any, path: str):
285285
keys = path.split('.')
286286
value = json_obj
287287
try:
@@ -295,7 +295,7 @@ def get_value_from_json(json_obj, path):
295295
return value
296296

297297
@staticmethod
298-
def validate_entry(token: str, identifier: Optional[str] = None, request_body: Any = None):
298+
def validate_entry(token: str, identifier: Optional[str] = None, request_body: Any = None) -> tuple[dict, dict]:
299299
check_nonempty_string(token)
300300
config_entry = CallbackConfig.verify_auth_token(token)
301301

@@ -363,7 +363,7 @@ def create_success_entry(name: str,
363363
request_data: Any,
364364
metadata: dict,
365365
callback_url: str,
366-
callback_source: str):
366+
callback_source: str) -> dict:
367367
check_nonempty_string(name)
368368
check_nonempty_string(bot)
369369
check_nonempty_string(identifier)
@@ -396,7 +396,7 @@ def create_failure_entry(name: str,
396396
request_data: Any,
397397
metadata: dict,
398398
callback_url: str,
399-
callback_source: str):
399+
callback_source: str) -> dict:
400400
check_nonempty_string(name)
401401
check_nonempty_string(bot)
402402
check_nonempty_string(identifier)
@@ -419,7 +419,7 @@ def create_failure_entry(name: str,
419419
return record.to_mongo().to_dict()
420420

421421
@staticmethod
422-
def get_logs(query: dict, offset: int, limit: int):
422+
def get_logs(query: dict, offset: int, limit: int) -> tuple[list[dict], int]:
423423
logs = CallbackLog.objects(**query).skip(offset).limit(limit).exclude('id').order_by('-timestamp').to_json()
424424
logs_dict_list = json.loads(logs)
425425
for log in logs_dict_list:

0 commit comments

Comments
 (0)