From f5bb064dee7c9c7c28f4071051f7247dbc42a79f Mon Sep 17 00:00:00 2001 From: Timothy Carambat Date: Tue, 16 Jan 2024 10:37:46 -0800 Subject: [PATCH] Implement streaming for workspace chats via API (#604) --- server/endpoints/api/workspace/index.js | 148 +++++++++++++++++++++++- server/endpoints/chat.js | 15 +++ server/swagger/openapi.json | 99 ++++++++++++++++ server/utils/chats/stream.js | 2 + 4 files changed, 263 insertions(+), 1 deletion(-) diff --git a/server/endpoints/api/workspace/index.js b/server/endpoints/api/workspace/index.js index ffead3adb4..365e8b014c 100644 --- a/server/endpoints/api/workspace/index.js +++ b/server/endpoints/api/workspace/index.js @@ -11,6 +11,11 @@ const { const { getVectorDbClass } = require("../../../utils/helpers"); const { multiUserMode, reqBody } = require("../../../utils/http"); const { validApiKey } = require("../../../utils/middleware/validApiKey"); +const { + streamChatWithWorkspace, + writeResponseChunk, + VALID_CHAT_MODE, +} = require("../../../utils/chats/stream"); function apiWorkspaceEndpoints(app) { if (!app) return; @@ -483,7 +488,28 @@ function apiWorkspaceEndpoints(app) { const workspace = await Workspace.get({ slug }); if (!workspace) { - response.sendStatus(400).end(); + response.status(400).json({ + id: uuidv4(), + type: "abort", + textResponse: null, + sources: [], + close: true, + error: `Workspace ${slug} is not a valid workspace.`, + }); + return; + } + + if (!message?.length || !VALID_CHAT_MODE.includes(mode)) { + response.status(400).json({ + id: uuidv4(), + type: "abort", + textResponse: null, + sources: [], + close: true, + error: !message?.length + ? "message parameter cannot be empty." + : `${mode} is not a valid mode.`, + }); return; } @@ -506,6 +532,126 @@ function apiWorkspaceEndpoints(app) { } } ); + + app.post( + "/v1/workspace/:slug/stream-chat", + [validApiKey], + async (request, response) => { + /* + #swagger.tags = ['Workspaces'] + #swagger.description = 'Execute a streamable chat with a workspace' + #swagger.requestBody = { + description: 'Send a prompt to the workspace and the type of conversation (query or chat).
Query: Will not use LLM unless there are relevant sources from vectorDB & does not recall chat history.
Chat: Uses LLM general knowledge w/custom embeddings to produce output, uses rolling chat history.', + required: true, + type: 'object', + content: { + "application/json": { + example: { + message: "What is AnythingLLM?", + mode: "query | chat" + } + } + } + } + #swagger.responses[200] = { + content: { + "text/event-stream": { + schema: { + type: 'array', + example: [ + { + id: 'uuid-123', + type: "abort | textResponseChunk", + textResponse: "First chunk", + sources: [], + close: false, + error: "null | text string of the failure mode." + }, + { + id: 'uuid-123', + type: "abort | textResponseChunk", + textResponse: "chunk two", + sources: [], + close: false, + error: "null | text string of the failure mode." + }, + { + id: 'uuid-123', + type: "abort | textResponseChunk", + textResponse: "final chunk of LLM output!", + sources: [{title: "anythingllm.txt", chunk: "This is a context chunk used in the answer of the prompt by the LLM. This will only return in the final chunk."}], + close: true, + error: "null | text string of the failure mode." + } + ] + } + } + } + } + #swagger.responses[403] = { + schema: { + "$ref": "#/definitions/InvalidAPIKey" + } + } + */ + try { + const { slug } = request.params; + const { message, mode = "query" } = reqBody(request); + const workspace = await Workspace.get({ slug }); + + if (!workspace) { + response.status(400).json({ + id: uuidv4(), + type: "abort", + textResponse: null, + sources: [], + close: true, + error: `Workspace ${slug} is not a valid workspace.`, + }); + return; + } + + if (!message?.length || !VALID_CHAT_MODE.includes(mode)) { + response.status(400).json({ + id: uuidv4(), + type: "abort", + textResponse: null, + sources: [], + close: true, + error: !message?.length + ? "Message is empty" + : `${mode} is not a valid mode.`, + }); + return; + } + + response.setHeader("Cache-Control", "no-cache"); + response.setHeader("Content-Type", "text/event-stream"); + response.setHeader("Access-Control-Allow-Origin", "*"); + response.setHeader("Connection", "keep-alive"); + response.flushHeaders(); + + await streamChatWithWorkspace(response, workspace, message, mode); + await Telemetry.sendTelemetry("sent_chat", { + LLMSelection: process.env.LLM_PROVIDER || "openai", + Embedder: process.env.EMBEDDING_ENGINE || "inherit", + VectorDbSelection: process.env.VECTOR_DB || "pinecone", + }); + response.end(); + } catch (e) { + console.error(e); + writeResponseChunk(response, { + id: uuidv4(), + type: "abort", + textResponse: null, + sources: [], + close: true, + error: e.message, + }); + response.end(); + } + } + ); } module.exports = { apiWorkspaceEndpoints }; diff --git a/server/endpoints/chat.js b/server/endpoints/chat.js index 79fc10132d..adfec0ec3f 100644 --- a/server/endpoints/chat.js +++ b/server/endpoints/chat.js @@ -8,6 +8,7 @@ const { Telemetry } = require("../models/telemetry"); const { streamChatWithWorkspace, writeResponseChunk, + VALID_CHAT_MODE, } = require("../utils/chats/stream"); function chatEndpoints(app) { @@ -31,6 +32,20 @@ function chatEndpoints(app) { return; } + if (!message?.length || !VALID_CHAT_MODE.includes(mode)) { + response.status(400).json({ + id: uuidv4(), + type: "abort", + textResponse: null, + sources: [], + close: true, + error: !message?.length + ? "Message is empty." + : `${mode} is not a valid mode.`, + }); + return; + } + response.setHeader("Cache-Control", "no-cache"); response.setHeader("Content-Type", "text/event-stream"); response.setHeader("Access-Control-Allow-Origin", "*"); diff --git a/server/swagger/openapi.json b/server/swagger/openapi.json index 7b675c44b9..e7b07484a6 100644 --- a/server/swagger/openapi.json +++ b/server/swagger/openapi.json @@ -1612,6 +1612,105 @@ } } }, + "/v1/workspace/{slug}/stream-chat": { + "post": { + "tags": [ + "Workspaces" + ], + "description": "Execute a streamable chat with a workspace", + "parameters": [ + { + "name": "slug", + "in": "path", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "Authorization", + "in": "header", + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "content": { + "text/event-stream": { + "schema": { + "type": "array", + "example": [ + { + "id": "uuid-123", + "type": "abort | textResponseChunk", + "textResponse": "First chunk", + "sources": [], + "close": false, + "error": "null | text string of the failure mode." + }, + { + "id": "uuid-123", + "type": "abort | textResponseChunk", + "textResponse": "chunk two", + "sources": [], + "close": false, + "error": "null | text string of the failure mode." + }, + { + "id": "uuid-123", + "type": "abort | textResponseChunk", + "textResponse": "final chunk of LLM output!", + "sources": [ + { + "title": "anythingllm.txt", + "chunk": "This is a context chunk used in the answer of the prompt by the LLM. This will only return in the final chunk." + } + ], + "close": true, + "error": "null | text string of the failure mode." + } + ] + } + } + }, + "description": "OK" + }, + "400": { + "description": "Bad Request" + }, + "403": { + "description": "Forbidden", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/InvalidAPIKey" + } + }, + "application/xml": { + "schema": { + "$ref": "#/components/schemas/InvalidAPIKey" + } + } + } + } + }, + "requestBody": { + "description": "Send a prompt to the workspace and the type of conversation (query or chat).
Query: Will not use LLM unless there are relevant sources from vectorDB & does not recall chat history.
Chat: Uses LLM general knowledge w/custom embeddings to produce output, uses rolling chat history.", + "required": true, + "type": "object", + "content": { + "application/json": { + "example": { + "message": "What is AnythingLLM?", + "mode": "query | chat" + } + } + } + } + } + }, "/v1/system/env-dump": { "get": { "tags": [ diff --git a/server/utils/chats/stream.js b/server/utils/chats/stream.js index 11d4effd7c..04bb72b903 100644 --- a/server/utils/chats/stream.js +++ b/server/utils/chats/stream.js @@ -8,6 +8,7 @@ const { chatPrompt, } = require("."); +const VALID_CHAT_MODE = ["chat", "query"]; function writeResponseChunk(response, data) { response.write(`data: ${JSON.stringify(data)}\n\n`); return; @@ -503,6 +504,7 @@ function handleStreamResponses(response, stream, responseProps) { } module.exports = { + VALID_CHAT_MODE, streamChatWithWorkspace, writeResponseChunk, };