Skip to content

Commit

Permalink
async http client for api fulfillment
Browse files Browse the repository at this point in the history
  • Loading branch information
alfredfrancis committed Jan 11, 2025
1 parent e9725d7 commit 02a9d8d
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 63 deletions.
10 changes: 10 additions & 0 deletions app/admin/entities/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,13 @@ async def edit_entity(entity_id: str, entity_data: dict) -> dict:
async def delete_entity(entity_id: str):
await entity_collection.delete_one({"_id": ObjectId(entity_id)})

async def list_synonyms():
""" list all synonyms across the entities"""
synonyms = {}

entities = await list_entities()
for entity in entities:
for value in entity.entity_values:
for synonym in value.synonyms:
synonyms[synonym] = value.value
return synonyms
2 changes: 1 addition & 1 deletion app/bot/chat/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async def chat(request: Request, body: dict):
try:
# Access the dialogue manager from the fast api application state.
chat_request = ChatModel.from_json(body)
chat_response = request.app.state.dialogue_manager.process(chat_request)
chat_response = await request.app.state.dialogue_manager.process(chat_request)
return chat_response.to_json()
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing request: {e}")
26 changes: 14 additions & 12 deletions app/bot/dialogue_manager/dialogue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
from app.bot.nlu.featurizers import SpacyFeaturizer
from app.bot.nlu.intent_classifiers import IntentClassifier
from app.bot.nlu.entity_extractors import EntityExtractor
from app.bot.dialogue_manager.utils import SilentUndefined, call_api, get_synonyms, split_sentence
from app.bot.dialogue_manager.utils import SilentUndefined, split_sentence
from app.admin.entities.store import list_synonyms
from app.bot.dialogue_manager.models import ChatModel, IntentModel, ParameterModel
from app.bot.dialogue_manager.http_client import call_api
from app.config import app_config

logger = logging.getLogger('dialogue_manager')
Expand All @@ -32,7 +34,7 @@ async def from_config(cls):
Initialize DialogueManager with all required dependencies
"""

synonyms = await get_synonyms()
synonyms = await list_synonyms()

# Initialize pipeline with components
nlu_pipeline = NLUPipeline([
Expand Down Expand Up @@ -63,7 +65,7 @@ def update_model(self, models_dir):
self.nlu_pipeline.load(models_dir)
logger.info("NLU Pipeline models updated")

def process(self, chat_model: ChatModel) -> ChatModel:
async def process(self, chat_model: ChatModel) -> ChatModel:
"""
Single entry point to process the dialogue request.
Expand Down Expand Up @@ -110,7 +112,7 @@ def process(self, chat_model: ChatModel) -> ChatModel:

# Step 5: Handle API trigger if the intent is complete
if chat_model_response.complete:
chat_model_response = self._handle_api_trigger(active_intent, chat_model_response, context)
chat_model_response = await self._handle_api_trigger(active_intent, chat_model_response, context)

logger.info(f"Processed input: {chat_model_response.input_text}", extra=chat_model_response.to_json())
return chat_model_response
Expand Down Expand Up @@ -233,27 +235,27 @@ def _handle_missing_parameters(self, parameters: List[ParameterModel], chat_mode

return chat_model_response

def _handle_api_trigger(self, intent: IntentModel, chat_model_response: ChatModel, context: Dict) -> ChatModel:
async def _handle_api_trigger(self, intent: IntentModel, chat_model_response: ChatModel, context: Dict) -> ChatModel:
"""
Handle API trigger if the intent requires it.
"""
if intent.api_trigger and intent.api_details:
try:
result = self._call_intent_api(intent, context)
result = await self._call_intent_api(intent, context)
context["result"] = result
template = Template(intent.speech_response, undefined=SilentUndefined)
chat_model_response.speech_response = split_sentence(template.render(**context))
template = Template(intent.speech_response, undefined=SilentUndefined, enable_async=True)
chat_model_response.speech_response = split_sentence(await template.render_async(**context))
except Exception as e:
logger.warning(f"API call failed: {e}")
chat_model_response.speech_response = ["Service is not available. Please try again later."]
else:
context["result"] = {}
template = Template(intent.speech_response, undefined=SilentUndefined)
chat_model_response.speech_response = split_sentence(template.render(**context))
template = Template(intent.speech_response, undefined=SilentUndefined, enable_async=True)
chat_model_response.speech_response = split_sentence(await template.render_async(**context))

return chat_model_response

def _call_intent_api(self, intent: IntentModel, context: Dict):
async def _call_intent_api(self, intent: IntentModel, context: Dict):
"""
Call the API associated with the intent.
"""
Expand All @@ -268,4 +270,4 @@ def _call_intent_api(self, intent: IntentModel, context: Dict):
else:
parameters = context.get("parameters", {})

return call_api(rendered_url, api_details.request_type, headers, parameters, api_details.is_json)
return await call_api(rendered_url, api_details.request_type, headers, parameters, api_details.is_json)
72 changes: 72 additions & 0 deletions app/bot/dialogue_manager/http_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import json
import logging
import aiohttp
import asyncio
from typing import Dict, Any, Optional
from aiohttp import ClientTimeout

logger = logging.getLogger("http_client")

async def call_api(
url: str,
method: str,
headers: Optional[Dict[str, str]] = None,
parameters: Optional[Dict[str, Any]] = None,
is_json: bool = False,
timeout: int = 30
) -> Dict[str, Any]:
"""
Asynchronously call external API with improved error handling and timeout management
Args:
url: The API endpoint URL
method: HTTP method (GET, POST, PUT, DELETE)
headers: Optional request headers
parameters: Optional request parameters or body
is_json: Whether to send parameters as JSON body
timeout: Request timeout in seconds
Returns:
Dict containing the API response
Raises:
aiohttp.ClientError: For HTTP-specific errors
asyncio.TimeoutError: When request times out
ValueError: For invalid method types
Exception: For other unexpected errors
"""
headers = headers or {}
parameters = parameters or {}
timeout_config = ClientTimeout(total=timeout)

try:
async with aiohttp.ClientSession(timeout=timeout_config) as session:
method = method.upper()
logger.debug(f"Initiating async API Call: url={url} method={method} payload={parameters}")

if method == "GET":
async with session.get(url, headers=headers, params=parameters) as response:
result = await response.json()
elif method in ["POST", "PUT"]:
kwargs = {"headers": headers, "json" if is_json else "params": parameters}
async with getattr(session, method.lower())(url, **kwargs) as response:
result = await response.json()
elif method == "DELETE":
async with session.delete(url, headers=headers, params=parameters) as response:
result = await response.json()
else:
raise ValueError(f"Unsupported request method: {method}")

response.raise_for_status()
logger.debug(f"API response => {result}")
return result

except aiohttp.ClientError as e:
logger.error(f"HTTP error occurred: {str(e)}")
raise
except asyncio.TimeoutError:
logger.error(f"Request timed out after {timeout} seconds")
raise
except Exception as e:
logger.error(f"Unexpected error during API call: {str(e)}")
raise
48 changes: 0 additions & 48 deletions app/bot/dialogue_manager/utils.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,11 @@
import json
import logging
import requests
from jinja2 import Undefined
from app.admin.entities.store import list_entities

logger = logging.getLogger("dialogue_manager")

def split_sentence(sentence):
return sentence.split("###")

async def get_synonyms():
"""
Build synonyms dict from DB
:return:
"""
synonyms = {}

entities = await list_entities()
for entity in entities:
for value in entity.entity_values:
for synonym in value.synonyms:
synonyms[synonym] = value.value
return synonyms

def call_api(url, type, headers={}, parameters={}, is_json=False):
"""
Call external API
:param url:
:param type:
:param parameters:
:param is_json:
:return:
"""
logger.debug("Initiating API Call with following info: url => {} payload => {}".format(url, parameters))
if "GET" in type:
response = requests.get(url, headers=headers, params=parameters, timeout=5)
elif "POST" in type:
if is_json:
response = requests.post(url, headers=headers, json=parameters, timeout=5)
else:
response = requests.post(url, headers=headers, params=parameters, timeout=5)
elif "PUT" in type:
if is_json:
response = requests.put(url, headers=headers, json=parameters, timeout=5)
else:
response = requests.put(url, headers=headers, params=parameters, timeout=5)
elif "DELETE" in type:
response = requests.delete(url, headers=headers, params=parameters, timeout=5)
else:
raise Exception("unsupported request method.")
result = json.loads(response.text)
logger.debug("API response => %s", result)
return result


class SilentUndefined(Undefined):
"""
Class to suppress jinja2 errors and warnings
Expand Down
4 changes: 2 additions & 2 deletions app/bot/nlu/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from app.bot.nlu.featurizers import SpacyFeaturizer
from app.bot.nlu.intent_classifiers import IntentClassifier
from app.bot.nlu.entity_extractors import EntityExtractor
from app.bot.dialogue_manager.utils import get_synonyms
from app.admin.entities.store import list_synonyms

from app.config import app_config

Expand Down Expand Up @@ -36,7 +36,7 @@ async def train_pipeline(app):
training_data.append(example)

# initialize and train pipeline
synonyms = await get_synonyms()
synonyms = await list_synonyms()
pipeline = NLUPipeline([
SpacyFeaturizer(spacy_model_name),
IntentClassifier(),
Expand Down

0 comments on commit 02a9d8d

Please sign in to comment.