diff --git a/chatapi/README.md b/chatapi/README.md index ba0f1ce904..49f6ff69c3 100644 --- a/chatapi/README.md +++ b/chatapi/README.md @@ -8,6 +8,15 @@ For model choices view: - Deepseek: https://api-docs.deepseek.com/quick_start/pricing - Gemini: https://deepmind.google/technologies/gemini/ +### Assistants to Responses migration checklist + +Use `rg "beta.assistants"` to confirm the deprecated helpers have been removed. The migration to the Responses API touched: + +1. Backend utilities (`src/utils/chat-assistant.utils.ts`, `src/utils/chat-helpers.utils.ts`) where assistant threads were replaced with `client.responses.create` and streaming wrappers. +2. Express/websocket wiring (`src/index.ts`) so every event now carries Responses metadata alongside the textual delta. +3. Angular consumers (`src/app/shared/chat.service.ts`, `src/app/chat/**`) which now normalise structured stream payloads while remaining backward compatible with plain strings. +4. Automated coverage (`chatapi/src/utils/chat-assistant.utils.spec.ts`, `src/app/chat/chat-window/chat-window.component.spec.ts`) to lock the new contract in place. + ## Development Notes Run `cd chatapi` and add a .env file in the `chatapi` directory with the following configs in the .env file(ensure the username & password match your admin credentials): ``` @@ -15,6 +24,13 @@ Run `cd chatapi` and add a .env file in the `chatapi` directory with the followi COUCHDB_HOST=http://localhost:2200 COUCHDB_USER=planet COUCHDB_PASS=planet + # Optional assistant overrides for the Responses API + OPENAI_ASSISTANT_NAME="OLE Assistant" + OPENAI_ASSISTANT_INSTRUCTIONS="Keep answers short" + OPENAI_RESPONSE_FORMAT=text + OPENAI_PARALLEL_TOOL_CALLS=false + # JSON array describing default tool configuration + OPENAI_ASSISTANT_TOOLS='[{"type":"code_interpreter"}]' ``` By default(linux), the chatapi uses 5000 as the serve port. For *windows* and *macOS* users we recommend using `5400` as the serve port to avoid conflicts with other services. diff --git a/chatapi/jest.config.ts b/chatapi/jest.config.ts new file mode 100644 index 0000000000..9d32fa742e --- /dev/null +++ b/chatapi/jest.config.ts @@ -0,0 +1,11 @@ +import type { Config } from 'jest'; + +const config: Config = { + preset: 'ts-jest', + testEnvironment: 'node', + roots: [ '/src' ], + moduleFileExtensions: [ 'ts', 'js', 'json' ], + collectCoverageFrom: [ 'src/**/*.ts', '!src/index.ts' ], +}; + +export default config; diff --git a/chatapi/package.json b/chatapi/package.json index a97b6c83f9..a5b17690e0 100644 --- a/chatapi/package.json +++ b/chatapi/package.json @@ -8,7 +8,8 @@ "build": "tsc", "dev": "nodemon --exec ts-node src/index.ts", "lint": "eslint . --ext .ts", - "lint-fix": "eslint . --ext .ts --fix" + "lint-fix": "eslint . --ext .ts --fix", + "test": "jest" }, "repository": { "type": "git", @@ -43,10 +44,13 @@ "ws": "^8.14.2" }, "devDependencies": { + "@types/jest": "^29.5.12", "@types/node": "^20.3.1", "@typescript-eslint/eslint-plugin": "^5.60.0", "@typescript-eslint/parser": "^5.60.0", "eslint": "^8.43.0", - "nodemon": "^2.0.22" + "jest": "^29.7.0", + "nodemon": "^2.0.22", + "ts-jest": "^29.2.5" } } diff --git a/chatapi/src/config/ai-providers.config.ts b/chatapi/src/config/ai-providers.config.ts index fb32a30c5c..2db817cd5c 100644 --- a/chatapi/src/config/ai-providers.config.ts +++ b/chatapi/src/config/ai-providers.config.ts @@ -2,12 +2,61 @@ import OpenAI from 'openai'; import { configurationDB } from './nano.config'; -import { ModelsDocument } from '../models/chat.model'; +import { AssistantResponseFormat, AssistantToolConfig, ModelsDocument } from '../models/chat.model'; let keys: Record = {}; let models: Record = {}; let assistant: Record = {}; +const parseBoolean = (value: string | undefined): boolean | undefined => { + if (value === undefined) { + return undefined; + } + + const normalized = value.trim().toLowerCase(); + if ([ 'true', '1', 'yes', 'y' ].includes(normalized)) { + return true; + } + if ([ 'false', '0', 'no', 'n' ].includes(normalized)) { + return false; + } + return undefined; +}; + +const parseJSON = (value: string | undefined): T | undefined => { + if (!value) { + return undefined; + } + try { + return JSON.parse(value) as T; + } catch (error) { + console.error(`Failed to parse JSON value from environment: ${error}`); // eslint-disable-line no-console + return undefined; + } +}; + +const parseResponseFormat = (value: string | undefined): AssistantResponseFormat | undefined => { + if (!value) { + return undefined; + } + + if (value.trim().startsWith('{')) { + return parseJSON(value); + } + + return value; +}; + +const parseTools = (value: string | undefined): AssistantToolConfig[] | undefined => { + const parsed = parseJSON(value); + + if (!parsed) { + return undefined; + } + + return Array.isArray(parsed) ? parsed : undefined; +}; + async function getConfig(): Promise { try { const allDocs = await configurationDB.list({ 'include_docs': true }); @@ -54,10 +103,31 @@ const initialize = async () => { 'gemini': { 'ai': keys.gemini, 'defaultModel': doc?.models.gemini || '' }, }; + const envAssistantName = process.env.OPENAI_ASSISTANT_NAME; + const envAssistantInstructions = process.env.OPENAI_ASSISTANT_INSTRUCTIONS; + const envResponseFormat = parseResponseFormat(process.env.OPENAI_RESPONSE_FORMAT); + const envParallelToolCalls = parseBoolean(process.env.OPENAI_PARALLEL_TOOL_CALLS); + const envTools = parseTools(process.env.OPENAI_ASSISTANT_TOOLS); + + const computedTools = Array.isArray(doc?.assistant?.tools) + ? doc?.assistant?.tools + : envTools ?? [ { 'type': 'code_interpreter' } ]; + assistant = { - 'name': doc?.assistant?.name || '', - 'instructions': doc?.assistant?.instructions || '', + 'name': doc?.assistant?.name || envAssistantName || '', + 'instructions': doc?.assistant?.instructions || envAssistantInstructions || '', + 'tools': computedTools, }; + + const resolvedResponseFormat = doc?.assistant?.response_format ?? envResponseFormat; + if (resolvedResponseFormat !== undefined) { + assistant.response_format = resolvedResponseFormat; + } + + const resolvedParallelToolCalls = doc?.assistant?.parallel_tool_calls ?? envParallelToolCalls; + if (resolvedParallelToolCalls !== undefined) { + assistant.parallel_tool_calls = resolvedParallelToolCalls; + } } catch (error) { console.error(`Error initializing configs: ${error}`); } diff --git a/chatapi/src/index.ts b/chatapi/src/index.ts index 28e8e838a8..0be9986cf6 100644 --- a/chatapi/src/index.ts +++ b/chatapi/src/index.ts @@ -5,7 +5,7 @@ import http from 'http'; import WebSocket from 'ws'; import { chat, chatNoSave } from './services/chat.service'; -import { keys } from './config/ai-providers.config'; +import { assistant, keys } from './config/ai-providers.config'; dotenv.config(); @@ -37,14 +37,25 @@ wss.on('connection', (ws) => { } const chatResponse = await chat(data, true, (response) => { - ws.send(JSON.stringify({ 'type': 'partial', response })); + ws.send(JSON.stringify({ + 'type': 'partial', + 'response': response, + 'metadata': { + 'source': 'responses', + 'response_format': assistant?.response_format ?? 'text', + } + })); }); if (chatResponse) { ws.send(JSON.stringify({ 'type': 'final', 'completionText': chatResponse.completionText, - 'couchDBResponse': chatResponse.couchSaveResponse + 'couchDBResponse': chatResponse.couchSaveResponse, + 'metadata': { + 'source': 'responses', + 'response_format': assistant?.response_format ?? 'text', + } })); } } catch (error: any) { @@ -69,14 +80,22 @@ app.post('/', async (req: any, res: any) => { const response = await chatNoSave(data.content, data.aiProvider, data.context, data.assistant, false); return res.status(200).json({ 'status': 'Success', - 'chat': response + 'chat': response, + 'metadata': { + 'source': 'responses', + 'response_format': assistant?.response_format ?? 'text', + } }); } else { const response = await chat(data, false); return res.status(201).json({ 'status': 'Success', 'chat': response?.completionText, - 'couchDBResponse': response?.couchSaveResponse + 'couchDBResponse': response?.couchSaveResponse, + 'metadata': { + 'source': 'responses', + 'response_format': assistant?.response_format ?? 'text', + } }); } } catch (error: any) { diff --git a/chatapi/src/models/chat.model.ts b/chatapi/src/models/chat.model.ts index 23fde46663..9327062fab 100644 --- a/chatapi/src/models/chat.model.ts +++ b/chatapi/src/models/chat.model.ts @@ -12,9 +12,22 @@ interface Providers { gemini?: string; } +export type AssistantResponseFormat = string | { + type: string; + [key: string]: any; +}; + +export interface AssistantToolConfig { + type: string; + [key: string]: any; +} + interface Assistant { name: string; instructions: string; + response_format?: AssistantResponseFormat; + parallel_tool_calls?: boolean; + tools?: AssistantToolConfig[]; } export interface ModelsDocument { diff --git a/chatapi/src/services/chat.service.ts b/chatapi/src/services/chat.service.ts index 3295c21967..3875b3601c 100644 --- a/chatapi/src/services/chat.service.ts +++ b/chatapi/src/services/chat.service.ts @@ -68,8 +68,8 @@ export async function chat(data: any, stream?: boolean, callback?: (response: st export async function chatNoSave( content: any, aiProvider: AIProvider, - assistant: boolean, - context?: any, + assistantOrContext: boolean | any, + contextOrAssistant?: any, stream?: boolean, callback?: (response: string) => void ): Promise { @@ -77,6 +77,14 @@ export async function chatNoSave( messages.push({ 'role': 'user', content }); + const assistant = typeof assistantOrContext === 'boolean' + ? assistantOrContext + : typeof contextOrAssistant === 'boolean' + ? contextOrAssistant + : false; + + const context = typeof assistantOrContext === 'boolean' ? contextOrAssistant : assistantOrContext; + try { const completionText = await aiChat(messages, aiProvider, assistant, context, stream, callback); messages.push({ diff --git a/chatapi/src/utils/chat-assistant.utils.spec.ts b/chatapi/src/utils/chat-assistant.utils.spec.ts new file mode 100644 index 0000000000..869d6d9ceb --- /dev/null +++ b/chatapi/src/utils/chat-assistant.utils.spec.ts @@ -0,0 +1,104 @@ +import { buildAssistantResponseParams, createAssistantResponse, createAssistantResponseStream } from './chat-assistant.utils'; +import { ChatMessage } from '../models/chat.model'; + +const mockCreate = jest.fn(); +const mockStream = jest.fn(); +const mockDone = jest.fn().mockResolvedValue(undefined); + +class MockResponseStream { + private events: any[]; + finalResponse: jest.Mock; + done = mockDone; + + constructor(events: any[], finalResponse: any) { + this.events = events; + this.finalResponse = jest.fn().mockResolvedValue(finalResponse); + } + + async *[Symbol.asyncIterator]() { + for (const event of this.events) { + yield event; + } + } +} + +jest.mock('../config/ai-providers.config', () => ({ + assistant: { + instructions: 'Be concise.', + tools: [ { type: 'code_interpreter' } ], + response_format: 'text', + parallel_tool_calls: true, + }, + keys: { + openai: { + responses: { + create: (...args: unknown[]) => mockCreate(...args), + stream: (...args: unknown[]) => mockStream(...args), + }, + }, + }, +})); + +describe('chat-assistant utils', () => { + beforeEach(() => { + mockCreate.mockReset(); + mockStream.mockReset(); + mockDone.mockClear(); + }); + + it('buildAssistantResponseParams merges instructions and context', () => { + const messages: ChatMessage[] = [ { role: 'user', content: 'Hello there' } ]; + const params = buildAssistantResponseParams(messages, 'gpt-test', 'Context info'); + + expect(params.model).toBe('gpt-test'); + expect(params.instructions).toContain('Be concise.'); + expect(params.instructions).toContain('Context info'); + expect(params.tools).toEqual([ { type: 'code_interpreter' } ]); + expect(params.response_format).toBe('text'); + expect(params.parallel_tool_calls).toBe(true); + expect(params.input).toEqual([ + { + role: 'user', + content: [ { type: 'text', text: 'Hello there' } ] + } + ]); + }); + + it('createAssistantResponse returns aggregated text from the API response', async () => { + mockCreate.mockResolvedValue({ + output: [ + { + type: 'output_text', + text: 'Aggregated reply' + } + ] + }); + + const messages: ChatMessage[] = [ { role: 'user', content: 'Ping?' } ]; + const params = buildAssistantResponseParams(messages, 'gpt-test'); + const result = await createAssistantResponse(params); + + expect(result).toBe('Aggregated reply'); + expect(mockCreate).toHaveBeenCalledWith(expect.objectContaining({ model: 'gpt-test' })); + }); + + it('createAssistantResponseStream collects deltas and final response', async () => { + const events = [ + { type: 'response.output_text.delta', delta: 'partial ' }, + { type: 'response.output_text.delta', delta: 'message' }, + { type: 'response.completed', response: { output_text: 'partial message!' } } + ]; + const streamInstance = new MockResponseStream(events, { output_text: 'partial message!' }); + mockStream.mockResolvedValue(streamInstance); + + const callback = jest.fn(); + const messages: ChatMessage[] = [ { role: 'user', content: 'Summarize.' } ]; + const params = buildAssistantResponseParams(messages, 'gpt-test'); + const result = await createAssistantResponseStream(params, callback); + + expect(result).toBe('partial message!'); + expect(callback).toHaveBeenCalledWith('partial '); + expect(callback).toHaveBeenCalledWith('message'); + expect(mockStream).toHaveBeenCalled(); + }); +}); diff --git a/chatapi/src/utils/chat-assistant.utils.ts b/chatapi/src/utils/chat-assistant.utils.ts index d6895b37ec..8d7e4f43cb 100644 --- a/chatapi/src/utils/chat-assistant.utils.ts +++ b/chatapi/src/utils/chat-assistant.utils.ts @@ -1,103 +1,223 @@ -import { keys } from '../config/ai-providers.config'; -import { assistant } from '../config/ai-providers.config'; - -/** - * Creates an assistant with the specified model - * @param model - Model to use for assistant - * @returns Assistant object - */ -export async function createAssistant(model: string) { - return await keys.openai.beta.assistants.create({ - 'name': assistant?.name, - 'instructions': assistant?.instructions, - 'tools': [ { 'type': 'code_interpreter' } ], +import { assistant, keys } from '../config/ai-providers.config'; +import { ChatMessage } from '../models/chat.model'; + +type ResponseInputMessage = { + role: 'system' | 'user' | 'assistant' | 'developer'; + content: Array<{ type: 'text'; text: string }>; +}; + +type AssistantResponseParams = { + model: string; + input: ResponseInputMessage[]; + instructions?: string; + tools?: Array>; + response_format?: Record | string; + parallel_tool_calls?: boolean; +}; + +const DEFAULT_ASSISTANT_TOOLS: Array> = [ { 'type': 'code_interpreter' } ]; + +const buildInstructions = (contextData?: string): string | undefined => { + const instructions: string[] = []; + + if (assistant?.instructions) { + instructions.push(assistant.instructions); + } + + if (contextData) { + instructions.push(contextData); + } + + if (instructions.length === 0) { + return undefined; + } + + return instructions.join('\n\n'); +}; + +const toResponseMessage = (message: ChatMessage): ResponseInputMessage => ({ + 'role': message.role, + 'content': [ { 'type': 'text', 'text': message.content } ] +}); + +const extractTextFromOutput = (output: any): string => { + if (!output) { + return ''; + } + + if (typeof output === 'string') { + return output; + } + + if (Array.isArray(output)) { + return output.map(extractTextFromOutput).join(''); + } + + if (output.type === 'output_text' && typeof output.text === 'string') { + return output.text; + } + + if (Array.isArray(output.content)) { + return output.content.map(extractTextFromOutput).join(''); + } + + if (output.results) { + return extractTextFromOutput(output.results); + } + + if (output.logs) { + return String(output.logs); + } + + return ''; +}; + +const extractResponseText = (response: any): string => { + if (!response) { + return ''; + } + + if (typeof response.output_text === 'string') { + return response.output_text; + } + + if (Array.isArray(response.output_text)) { + return response.output_text.join(''); + } + + if (Array.isArray(response.output)) { + return response.output.map(extractTextFromOutput).join(''); + } + + if (response.response && typeof response.response === 'object') { + return extractResponseText(response.response); + } + + return ''; +}; + +const extractDeltaText = (event: any): string => { + if (!event) { + return ''; + } + + if (typeof event.delta === 'string') { + return event.delta; + } + + if (Array.isArray(event.delta)) { + return event.delta.map(extractTextFromOutput).join(''); + } + + if (event.type === 'response.code_interpreter_call.completed') { + return extractTextFromOutput(event.code_interpreter_call?.results); + } + + if (event.snapshot) { + return extractTextFromOutput(event.snapshot); + } + + if (event.response) { + return extractResponseText(event.response); + } + + return ''; +}; + +export const buildAssistantResponseParams = ( + messages: ChatMessage[], + model: string, + contextData?: string +): AssistantResponseParams => { + const instructions = buildInstructions(contextData); + const inputMessages = messages.map(toResponseMessage); + + const params: AssistantResponseParams = { model, - }); -} - -export async function createThread() { - return await keys.openai.beta.threads.create(); -} - -export async function addToThread(threadId: any, message: string) { - return await keys.openai.beta.threads.messages.create( - threadId, - { - 'role': 'user', - 'content': message - } - ); -} - -export async function createRun(threadID: any, assistantID: any, instructions?: string) { - return await keys.openai.beta.threads.runs.create( - threadID, - { - 'assistant_id': assistantID, - instructions - } - ); -} - -export async function waitForRunCompletion(threadId: any, runId: any) { - let runStatus = await keys.openai.beta.threads.runs.retrieve(threadId, runId); - while (runStatus.status !== 'completed') { - await new Promise((resolve) => setTimeout(resolve, 1000)); - runStatus = await keys.openai.beta.threads.runs.retrieve(threadId, runId); - } - return runStatus; -} - -export async function retrieveResponse(threadId: any): Promise { - const messages = await keys.openai.beta.threads.messages.list(threadId); - for (const msg of messages.data) { - if ('text' in msg.content[0] && msg.role === 'assistant') { - return msg.content[0].text.value; - } + input: inputMessages, + }; + + if (instructions) { + params.instructions = instructions; + } + + if (assistant?.tools?.length) { + params.tools = assistant.tools; + } else { + params.tools = DEFAULT_ASSISTANT_TOOLS; } - throw new Error('Unable to retrieve response from assistant'); -} -// Run with streaming enabled -export async function createAndHandleRunWithStreaming( - threadID: any, assistantID: any, instructions: string, callback?: (response: string) => void -): Promise { + if (assistant?.response_format) { + params.response_format = assistant.response_format; + } + + if (assistant?.parallel_tool_calls !== undefined) { + params.parallel_tool_calls = assistant.parallel_tool_calls; + } + + return params; +}; + +export const createAssistantResponse = async (params: AssistantResponseParams): Promise => { + const response = await keys.openai.responses.create(params); + const text = extractResponseText(response); + + if (!text) { + throw new Error('Unable to retrieve response from assistant'); + } + + return text; +}; + +export const createAssistantResponseStream = async ( + params: AssistantResponseParams, + callback?: (response: string) => void +): Promise => { + const stream = await keys.openai.responses.stream(params); let completionText = ''; + let finalResponseText = ''; - return new Promise((resolve, reject) => { - keys.openai.beta.threads.runs.stream(threadID, { - 'assistant_id': assistantID, - instructions - }) - .on('textDelta', (textDelta: { value: string }) => { - if (textDelta && textDelta.value) { - completionText += textDelta.value; - if (callback) { - callback(textDelta.value); - } + try { + for await (const event of stream) { + const deltaText = extractDeltaText(event); + if (deltaText) { + completionText += deltaText; + if (callback) { + callback(deltaText); } - }) - .on('toolCallDelta', (toolCallDelta: { type: string; code_interpreter: { input: string; outputs: any[] } }) => { - if (toolCallDelta.type === 'code_interpreter') { - if (toolCallDelta && toolCallDelta.code_interpreter && toolCallDelta.code_interpreter.input) { - completionText += toolCallDelta.code_interpreter.input; - if (callback) { - callback(toolCallDelta.code_interpreter.input); - } - } - if (toolCallDelta && toolCallDelta.code_interpreter && toolCallDelta.code_interpreter.outputs) { - toolCallDelta.code_interpreter.outputs.forEach((output) => { - if (output.type === 'logs' && output.logs) { - completionText += output.logs; - if (callback) { - callback(output.logs); - } - } - }); - } + } + + if (!finalResponseText && event.type === 'response.completed') { + finalResponseText = extractResponseText(event.response); + } + } + + if (!finalResponseText) { + const response = await stream.finalResponse(); + finalResponseText = extractResponseText(response); + } + + if (finalResponseText) { + if (!completionText) { + completionText = finalResponseText; + if (callback) { + callback(finalResponseText); + } + } else if (finalResponseText.startsWith(completionText)) { + const remainder = finalResponseText.slice(completionText.length); + if (remainder && callback) { + callback(remainder); } - }) - .on('end', () => resolve(completionText)) - .on('error', reject); - }); -} + completionText = finalResponseText; + } else { + completionText = finalResponseText; + } + } + + return completionText; + } finally { + await stream.done().catch(() => undefined); + } +}; + +export const parseAssistantResponseText = extractResponseText; diff --git a/chatapi/src/utils/chat-helpers.utils.ts b/chatapi/src/utils/chat-helpers.utils.ts index e6bda46965..73c63929fe 100644 --- a/chatapi/src/utils/chat-helpers.utils.ts +++ b/chatapi/src/utils/chat-helpers.utils.ts @@ -3,13 +3,9 @@ import { AIProvider, ChatMessage } from '../models/chat.model'; import { Attachment } from '../models/db-doc.model'; import { fetchFileFromCouchDB } from './db.utils'; import { - createAssistant, - createThread, - addToThread, - createRun, - waitForRunCompletion, - retrieveResponse, - createAndHandleRunWithStreaming, + buildAssistantResponseParams, + createAssistantResponse, + createAssistantResponseStream, } from './chat-assistant.utils'; import { extractTextFromDocument } from './text-extraction.utils'; @@ -32,17 +28,12 @@ export async function aiChatStream( } const model = aiProvider.model ?? provider.defaultModel; + const contextData = typeof context === 'string' ? context : context?.data; + if (assistant) { try { - const asst = await createAssistant(model); - const thread = await createThread(); - for (const message of messages) { - await addToThread(thread.id, message.content); - } - - const completionText = await createAndHandleRunWithStreaming(thread.id, asst.id, context.data, callback); - - return completionText; + const params = buildAssistantResponseParams(messages, model, contextData); + return await createAssistantResponseStream(params, callback); } catch (error) { throw new Error(`Error processing request ${error}`); } @@ -100,17 +91,12 @@ export async function aiChatNonStream( } } + const contextData = typeof context === 'string' ? context : context?.data; + if (assistant) { try { - const asst = await createAssistant(model); - const thread = await createThread(); - for (const message of messages) { - await addToThread(thread.id, message.content); - } - const run = await createRun(thread.id, asst.id, context.data); - await waitForRunCompletion(thread.id, run.id); - - return await retrieveResponse(thread.id); + const params = buildAssistantResponseParams(messages, model, contextData); + return await createAssistantResponse(params); } catch (error) { throw new Error(`Error processing request ${error}`); } diff --git a/src/app/chat/chat-window/chat-window.component.spec.ts b/src/app/chat/chat-window/chat-window.component.spec.ts new file mode 100644 index 0000000000..322b69d0ac --- /dev/null +++ b/src/app/chat/chat-window/chat-window.component.spec.ts @@ -0,0 +1,54 @@ +import { ChangeDetectorRef } from '@angular/core'; +import { FormBuilder } from '@angular/forms'; + +import { ChatWindowComponent } from './chat-window.component'; +import { ChatService, ChatStreamMessage } from '../../shared/chat.service'; + +describe('ChatWindowComponent stream helpers', () => { + let component: ChatWindowComponent; + + beforeEach(() => { + const chatServiceStub = { + listAIProviders: () => ({ subscribe: () => undefined }) + } as unknown as ChatService; + const userServiceStub = { + get: () => ({ name: 'tester' }) + } as any; + const stateServiceStub = { + configuration: { streaming: true } + } as any; + const cdrStub = { + markForCheck: () => undefined + } as unknown as ChangeDetectorRef; + + component = new ChatWindowComponent(cdrStub, chatServiceStub, new FormBuilder(), stateServiceStub, userServiceStub); + component.conversations = [ { id: '1', query: 'Hi', response: '' } ]; + }); + + it('normalizes nested stream payloads to text', () => { + const message: ChatStreamMessage = { + type: 'partial', + response: { + content: [ + { text: 'Hello' }, + { text: ' ' }, + { logs: 'world' } + ] + } + }; + + const text = (component as any).extractResponseSegment(message); + expect(text).toBe('Hello world'); + }); + + it('prefers completionText for final messages', () => { + const message: ChatStreamMessage = { + type: 'final', + completionText: 'Goodbye', + response: 'ignored' + }; + + const text = (component as any).extractResponseSegment(message); + expect(text).toBe('Goodbye'); + }); +}); diff --git a/src/app/chat/chat-window/chat-window.component.ts b/src/app/chat/chat-window/chat-window.component.ts index 7063084cb0..1b4d3c0b5b 100644 --- a/src/app/chat/chat-window/chat-window.component.ts +++ b/src/app/chat/chat-window/chat-window.component.ts @@ -4,7 +4,7 @@ import { Subject } from 'rxjs'; import { filter, takeUntil } from 'rxjs/operators'; import { CustomValidators } from '../../validators/custom-validators'; import { ConversationForm, AIProvider } from '../chat.model'; -import { ChatService } from '../../shared/chat.service'; +import { ChatService, ChatStreamMessage } from '../../shared/chat.service'; import { showFormErrors, trackByIdVal } from '../../shared/table-helpers'; import { UserService } from '../../shared/user.service'; import { StateService } from '../../shared/state.service'; @@ -203,12 +203,19 @@ export class ChatWindowComponent implements OnInit, OnDestroy, AfterViewInit { // Subscribe to WebSocket messages this.chatService.getChatStream().subscribe((message) => { // Handle incoming messages from the chat stream - this.handleIncomingMessage(JSON.parse(message)); + this.handleIncomingMessage(message); }); } - handleIncomingMessage(message: any) { + handleIncomingMessage(message: ChatStreamMessage) { if (message.type === 'final') { + const finalResponse = this.extractResponseSegment(message); + if (finalResponse) { + const lastConversation = this.conversations[this.conversations.length - 1]; + if (lastConversation) { + lastConversation.response = finalResponse; + } + } this.selectedConversationId = { '_id': message.couchDBResponse?.id, '_rev': message.couchDBResponse?.rev @@ -217,11 +224,77 @@ export class ChatWindowComponent implements OnInit, OnDestroy, AfterViewInit { } else { this.spinnerOn = false; const lastConversation = this.conversations[this.conversations.length - 1]; - lastConversation.response += message.response; + if (lastConversation) { + lastConversation.response += this.extractResponseSegment(message); + } this.scrollTo('bottom'); } } + private extractResponseSegment(message: ChatStreamMessage): string { + if (!message) { + return ''; + } + + const payload = message.type === 'final' + ? message.completionText ?? message.response + : message.response; + + return this.coerceToText(payload); + } + + private coerceToText(value: unknown): string { + if (value === null || value === undefined) { + return ''; + } + + if (typeof value === 'string') { + return value; + } + + if (Array.isArray(value)) { + return value.map((entry) => this.coerceToText(entry)).join(''); + } + + if (typeof value === 'object') { + const record = value as Record; + + if (typeof record.text === 'string') { + return record.text; + } + + if (typeof record.value === 'string') { + return record.value; + } + + if (typeof record.delta === 'string') { + return record.delta; + } + + if (typeof record.logs === 'string') { + return record.logs; + } + + if (Array.isArray(record.content)) { + return this.coerceToText(record.content); + } + + if (Array.isArray(record.output)) { + return this.coerceToText(record.output); + } + + if (Array.isArray(record.results)) { + return this.coerceToText(record.results); + } + + if (typeof record.output_text === 'string') { + return record.output_text; + } + } + + return ''; + } + postSubmit() { this.spinnerOn = true; this.promptForm.controls.prompt.setValue(''); diff --git a/src/app/shared/chat.service.ts b/src/app/shared/chat.service.ts index 5ed604161b..90ce6c7719 100644 --- a/src/app/shared/chat.service.ts +++ b/src/app/shared/chat.service.ts @@ -8,6 +8,15 @@ import { findDocuments, inSelector } from '../shared/mangoQueries'; import { CouchService } from '../shared/couchdb.service'; import { AIServices, AIProvider } from '../chat/chat.model'; +export interface ChatStreamMessage { + type: string; + response?: unknown; + completionText?: string; + couchDBResponse?: any; + metadata?: Record; + error?: string; +} + @Injectable({ providedIn: 'root' }) export class ChatService { @@ -16,7 +25,7 @@ import { AIServices, AIProvider } from '../chat/chat.model'; private baseUrl = environment.chatAddress; private socket: WebSocket; - private chatStreamSubject: Subject = new Subject(); + private chatStreamSubject: Subject = new Subject(); private errorSubject: Subject = new Subject(); private newChatAdded: Subject = new Subject(); private newChatSelected: Subject = new Subject(); @@ -47,11 +56,11 @@ import { AIServices, AIProvider } from '../chat/chat.model'; }; this.socket.addEventListener('message', (event) => { try { - const message = JSON.parse(event.data); + const message: ChatStreamMessage = JSON.parse(event.data); if (message.type === 'error') { - this.errorSubject.next(message.error); + this.errorSubject.next(message.error || 'Unknown error'); } else { - this.chatStreamSubject.next(event.data); + this.chatStreamSubject.next(message); } } catch (error) { this.errorSubject.next('Invalid message format'); @@ -95,7 +104,7 @@ import { AIServices, AIProvider } from '../chat/chat.model'; } // Subscribe to stream updates - getChatStream(): Observable { + getChatStream(): Observable { return this.chatStreamSubject.asObservable(); } @@ -105,7 +114,7 @@ import { AIServices, AIProvider } from '../chat/chat.model'; // Method to send user input via WebSocket sendUserInput(data: any): void { - if (this.socket.readyState === WebSocket.OPEN) { + if (this.socket && this.socket.readyState === WebSocket.OPEN) { this.socket.send(JSON.stringify(data)); } }