From 02a9d8d37b50c226962cce9adb76bdfcc726310c Mon Sep 17 00:00:00 2001 From: Alfred Francis Date: Sat, 11 Jan 2025 09:56:12 +0100 Subject: [PATCH] async http client for api fulfillment --- app/admin/entities/store.py | 10 +++ app/bot/chat/routes.py | 2 +- app/bot/dialogue_manager/dialogue_manager.py | 26 +++---- app/bot/dialogue_manager/http_client.py | 72 ++++++++++++++++++++ app/bot/dialogue_manager/utils.py | 48 ------------- app/bot/nlu/training.py | 4 +- 6 files changed, 99 insertions(+), 63 deletions(-) create mode 100644 app/bot/dialogue_manager/http_client.py diff --git a/app/admin/entities/store.py b/app/admin/entities/store.py index aeff0fbf..ca2cdb8e 100644 --- a/app/admin/entities/store.py +++ b/app/admin/entities/store.py @@ -25,3 +25,13 @@ async def edit_entity(entity_id: str, entity_data: dict) -> dict: async def delete_entity(entity_id: str): await entity_collection.delete_one({"_id": ObjectId(entity_id)}) +async def list_synonyms(): + """ list all synonyms across the entities""" + synonyms = {} + + entities = await list_entities() + for entity in entities: + for value in entity.entity_values: + for synonym in value.synonyms: + synonyms[synonym] = value.value + return synonyms diff --git a/app/bot/chat/routes.py b/app/bot/chat/routes.py index e9013990..b6d436b0 100755 --- a/app/bot/chat/routes.py +++ b/app/bot/chat/routes.py @@ -14,7 +14,7 @@ async def chat(request: Request, body: dict): try: # Access the dialogue manager from the fast api application state. chat_request = ChatModel.from_json(body) - chat_response = request.app.state.dialogue_manager.process(chat_request) + chat_response = await request.app.state.dialogue_manager.process(chat_request) return chat_response.to_json() except Exception as e: raise HTTPException(status_code=500, detail=f"Error processing request: {e}") diff --git a/app/bot/dialogue_manager/dialogue_manager.py b/app/bot/dialogue_manager/dialogue_manager.py index 25653f4e..0336bb9a 100644 --- a/app/bot/dialogue_manager/dialogue_manager.py +++ b/app/bot/dialogue_manager/dialogue_manager.py @@ -8,8 +8,10 @@ from app.bot.nlu.featurizers import SpacyFeaturizer from app.bot.nlu.intent_classifiers import IntentClassifier from app.bot.nlu.entity_extractors import EntityExtractor -from app.bot.dialogue_manager.utils import SilentUndefined, call_api, get_synonyms, split_sentence +from app.bot.dialogue_manager.utils import SilentUndefined, split_sentence +from app.admin.entities.store import list_synonyms from app.bot.dialogue_manager.models import ChatModel, IntentModel, ParameterModel +from app.bot.dialogue_manager.http_client import call_api from app.config import app_config logger = logging.getLogger('dialogue_manager') @@ -32,7 +34,7 @@ async def from_config(cls): Initialize DialogueManager with all required dependencies """ - synonyms = await get_synonyms() + synonyms = await list_synonyms() # Initialize pipeline with components nlu_pipeline = NLUPipeline([ @@ -63,7 +65,7 @@ def update_model(self, models_dir): self.nlu_pipeline.load(models_dir) logger.info("NLU Pipeline models updated") - def process(self, chat_model: ChatModel) -> ChatModel: + async def process(self, chat_model: ChatModel) -> ChatModel: """ Single entry point to process the dialogue request. @@ -110,7 +112,7 @@ def process(self, chat_model: ChatModel) -> ChatModel: # Step 5: Handle API trigger if the intent is complete if chat_model_response.complete: - chat_model_response = self._handle_api_trigger(active_intent, chat_model_response, context) + chat_model_response = await self._handle_api_trigger(active_intent, chat_model_response, context) logger.info(f"Processed input: {chat_model_response.input_text}", extra=chat_model_response.to_json()) return chat_model_response @@ -233,27 +235,27 @@ def _handle_missing_parameters(self, parameters: List[ParameterModel], chat_mode return chat_model_response - def _handle_api_trigger(self, intent: IntentModel, chat_model_response: ChatModel, context: Dict) -> ChatModel: + async def _handle_api_trigger(self, intent: IntentModel, chat_model_response: ChatModel, context: Dict) -> ChatModel: """ Handle API trigger if the intent requires it. """ if intent.api_trigger and intent.api_details: try: - result = self._call_intent_api(intent, context) + result = await self._call_intent_api(intent, context) context["result"] = result - template = Template(intent.speech_response, undefined=SilentUndefined) - chat_model_response.speech_response = split_sentence(template.render(**context)) + template = Template(intent.speech_response, undefined=SilentUndefined, enable_async=True) + chat_model_response.speech_response = split_sentence(await template.render_async(**context)) except Exception as e: logger.warning(f"API call failed: {e}") chat_model_response.speech_response = ["Service is not available. Please try again later."] else: context["result"] = {} - template = Template(intent.speech_response, undefined=SilentUndefined) - chat_model_response.speech_response = split_sentence(template.render(**context)) + template = Template(intent.speech_response, undefined=SilentUndefined, enable_async=True) + chat_model_response.speech_response = split_sentence(await template.render_async(**context)) return chat_model_response - def _call_intent_api(self, intent: IntentModel, context: Dict): + async def _call_intent_api(self, intent: IntentModel, context: Dict): """ Call the API associated with the intent. """ @@ -268,4 +270,4 @@ def _call_intent_api(self, intent: IntentModel, context: Dict): else: parameters = context.get("parameters", {}) - return call_api(rendered_url, api_details.request_type, headers, parameters, api_details.is_json) + return await call_api(rendered_url, api_details.request_type, headers, parameters, api_details.is_json) diff --git a/app/bot/dialogue_manager/http_client.py b/app/bot/dialogue_manager/http_client.py new file mode 100644 index 00000000..b19c63d0 --- /dev/null +++ b/app/bot/dialogue_manager/http_client.py @@ -0,0 +1,72 @@ +import json +import logging +import aiohttp +import asyncio +from typing import Dict, Any, Optional +from aiohttp import ClientTimeout + +logger = logging.getLogger("http_client") + +async def call_api( + url: str, + method: str, + headers: Optional[Dict[str, str]] = None, + parameters: Optional[Dict[str, Any]] = None, + is_json: bool = False, + timeout: int = 30 +) -> Dict[str, Any]: + """ + Asynchronously call external API with improved error handling and timeout management + + Args: + url: The API endpoint URL + method: HTTP method (GET, POST, PUT, DELETE) + headers: Optional request headers + parameters: Optional request parameters or body + is_json: Whether to send parameters as JSON body + timeout: Request timeout in seconds + + Returns: + Dict containing the API response + + Raises: + aiohttp.ClientError: For HTTP-specific errors + asyncio.TimeoutError: When request times out + ValueError: For invalid method types + Exception: For other unexpected errors + """ + headers = headers or {} + parameters = parameters or {} + timeout_config = ClientTimeout(total=timeout) + + try: + async with aiohttp.ClientSession(timeout=timeout_config) as session: + method = method.upper() + logger.debug(f"Initiating async API Call: url={url} method={method} payload={parameters}") + + if method == "GET": + async with session.get(url, headers=headers, params=parameters) as response: + result = await response.json() + elif method in ["POST", "PUT"]: + kwargs = {"headers": headers, "json" if is_json else "params": parameters} + async with getattr(session, method.lower())(url, **kwargs) as response: + result = await response.json() + elif method == "DELETE": + async with session.delete(url, headers=headers, params=parameters) as response: + result = await response.json() + else: + raise ValueError(f"Unsupported request method: {method}") + + response.raise_for_status() + logger.debug(f"API response => {result}") + return result + + except aiohttp.ClientError as e: + logger.error(f"HTTP error occurred: {str(e)}") + raise + except asyncio.TimeoutError: + logger.error(f"Request timed out after {timeout} seconds") + raise + except Exception as e: + logger.error(f"Unexpected error during API call: {str(e)}") + raise \ No newline at end of file diff --git a/app/bot/dialogue_manager/utils.py b/app/bot/dialogue_manager/utils.py index 4af05b71..43641c3f 100644 --- a/app/bot/dialogue_manager/utils.py +++ b/app/bot/dialogue_manager/utils.py @@ -1,59 +1,11 @@ -import json import logging -import requests from jinja2 import Undefined -from app.admin.entities.store import list_entities logger = logging.getLogger("dialogue_manager") def split_sentence(sentence): return sentence.split("###") -async def get_synonyms(): - """ - Build synonyms dict from DB - :return: - """ - synonyms = {} - - entities = await list_entities() - for entity in entities: - for value in entity.entity_values: - for synonym in value.synonyms: - synonyms[synonym] = value.value - return synonyms - -def call_api(url, type, headers={}, parameters={}, is_json=False): - """ - Call external API - :param url: - :param type: - :param parameters: - :param is_json: - :return: - """ - logger.debug("Initiating API Call with following info: url => {} payload => {}".format(url, parameters)) - if "GET" in type: - response = requests.get(url, headers=headers, params=parameters, timeout=5) - elif "POST" in type: - if is_json: - response = requests.post(url, headers=headers, json=parameters, timeout=5) - else: - response = requests.post(url, headers=headers, params=parameters, timeout=5) - elif "PUT" in type: - if is_json: - response = requests.put(url, headers=headers, json=parameters, timeout=5) - else: - response = requests.put(url, headers=headers, params=parameters, timeout=5) - elif "DELETE" in type: - response = requests.delete(url, headers=headers, params=parameters, timeout=5) - else: - raise Exception("unsupported request method.") - result = json.loads(response.text) - logger.debug("API response => %s", result) - return result - - class SilentUndefined(Undefined): """ Class to suppress jinja2 errors and warnings diff --git a/app/bot/nlu/training.py b/app/bot/nlu/training.py index 4d01309c..ec3ea726 100644 --- a/app/bot/nlu/training.py +++ b/app/bot/nlu/training.py @@ -6,7 +6,7 @@ from app.bot.nlu.featurizers import SpacyFeaturizer from app.bot.nlu.intent_classifiers import IntentClassifier from app.bot.nlu.entity_extractors import EntityExtractor -from app.bot.dialogue_manager.utils import get_synonyms +from app.admin.entities.store import list_synonyms from app.config import app_config @@ -36,7 +36,7 @@ async def train_pipeline(app): training_data.append(example) # initialize and train pipeline - synonyms = await get_synonyms() + synonyms = await list_synonyms() pipeline = NLUPipeline([ SpacyFeaturizer(spacy_model_name), IntentClassifier(),