diff --git a/.gitignore b/.gitignore index f347d7b..0dd0d41 100644 --- a/.gitignore +++ b/.gitignore @@ -15,10 +15,8 @@ yarn-error.log # Environment variables .env -.env.local -.env.development.local -.env.test.local -.env.production.local +.env.* +!.env.example packages/ui/.env diff --git a/API_DOCUMENTATION.md b/API_DOCUMENTATION.md index 65a02c9..d995d9b 100644 --- a/API_DOCUMENTATION.md +++ b/API_DOCUMENTATION.md @@ -352,6 +352,44 @@ Common cases: - `404 Not Found` — unknown agent id. - `500 Internal Server Error` — unexpected backend issues. +## Query Insights + +The Query Insights API exposes raw interaction logs and lightweight analytics for downstream processing. + +### `GET /v1/insights/queries` + +Fetch paginated user queries. If no date range is provided, returns the most recent queries ordered by creation time. + +- `start_date` _(ISO 8601, optional)_ — inclusive lower bound for filtering by creation time. +- `end_date` _(ISO 8601, optional)_ — inclusive upper bound for filtering by creation time. +- `agent_id` _(optional)_ — filter by agent id when provided. +- `query_text` _(optional)_ — filter by text contained in the query (case-insensitive). +- `limit` _(default `100`)_ — maximum rows returned. +- `offset` _(default `0`)_ — pagination offset. + +**Response** `200 OK` + +```json +{ + "items": [ + { + "id": "ad0c2b34-04ab-4d0a-9855-47c19f0f2830", + "created_at": "2024-04-01T12:30:45.123456+00:00", + "agent_id": "cairo-coder", + "query": "How do I declare a storage variable in Cairo 1?", + "chat_history": [ + { "role": "user", "content": "What is Cairo?" }, + { "role": "assistant", "content": "Cairo is a programming language..." } + ], + "output": "To declare a storage variable in Cairo 1, you use the #[storage] attribute..." + } + ], + "total": 1, + "limit": 100, + "offset": 0 +} +``` + ## Versioning & Compatibility - Current API version: `1.0.0` (see FastAPI metadata). diff --git a/python/pyproject.toml b/python/pyproject.toml index 24715c9..2fb74a6 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -142,6 +142,9 @@ strict_optional = true testpaths = ["tests"] pythonpath = ["src"] asyncio_mode = "auto" +markers = [ + "db: marks tests that require a database (run by default, use -m 'not db' to skip)", +] filterwarnings = [ "ignore::DeprecationWarning", "ignore::PendingDeprecationWarning", diff --git a/python/src/cairo_coder/core/rag_pipeline.py b/python/src/cairo_coder/core/rag_pipeline.py index 2fc8d2c..91c3496 100644 --- a/python/src/cairo_coder/core/rag_pipeline.py +++ b/python/src/cairo_coder/core/rag_pipeline.py @@ -82,6 +82,11 @@ def __init__(self, config: RagPipelineConfig): self._current_processed_query: ProcessedQuery | None = None self._current_documents: list[Document] = [] + @property + def last_retrieved_documents(self) -> list[Document]: + """Documents retrieved during the most recent pipeline execution.""" + return self._current_documents + async def _aprocess_query_and_retrieve_docs( self, query: str, diff --git a/python/src/cairo_coder/db/__init__.py b/python/src/cairo_coder/db/__init__.py new file mode 100644 index 0000000..6ab11e2 --- /dev/null +++ b/python/src/cairo_coder/db/__init__.py @@ -0,0 +1,22 @@ +""" +Database utilities for the Cairo Coder server. + +This package exposes helpers for initializing the asyncpg connection pool and +provides Pydantic representations used when persisting query insights data. +""" + +from .models import UserInteraction +from .repository import ( + create_user_interaction, + get_interactions, +) +from .session import close_pool, execute_schema_scripts, get_pool + +__all__ = [ + "UserInteraction", + "create_user_interaction", + "get_interactions", + "close_pool", + "execute_schema_scripts", + "get_pool", +] diff --git a/python/src/cairo_coder/db/models.py b/python/src/cairo_coder/db/models.py new file mode 100644 index 0000000..fd87b7b --- /dev/null +++ b/python/src/cairo_coder/db/models.py @@ -0,0 +1,25 @@ +""" +Pydantic models representing rows stored in the query insights database tables. +""" + +from __future__ import annotations + +import uuid +from datetime import datetime, timezone +from typing import Any, Optional + +from pydantic import BaseModel, Field + + +class UserInteraction(BaseModel): + """Represents a record in the user_interactions table.""" + + id: uuid.UUID = Field(default_factory=uuid.uuid4) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + agent_id: str + mcp_mode: bool = False + chat_history: Optional[list[dict[str, Any]]] = None + query: str + generated_answer: Optional[str] = None + retrieved_sources: Optional[list[dict[str, Any]]] = None + llm_usage: Optional[dict[str, Any]] = None diff --git a/python/src/cairo_coder/db/repository.py b/python/src/cairo_coder/db/repository.py new file mode 100644 index 0000000..bcf1e17 --- /dev/null +++ b/python/src/cairo_coder/db/repository.py @@ -0,0 +1,237 @@ +""" +Data access helpers for the Query Insights persistence layer. +""" + +from __future__ import annotations + +import json +from datetime import datetime +from typing import Any + +import structlog + +from cairo_coder.db.models import UserInteraction +from cairo_coder.db.session import get_pool + +logger = structlog.get_logger(__name__) + + +def _serialize_json_field(value: Any) -> str | None: + """ + Serialize a Python object to JSON string for database storage. + + Args: + value: Python object to serialize (dict, list, etc.) + + Returns: + JSON string or None if value is None/empty + """ + if value is None: + return None + return json.dumps(value) + + +def _normalize_json_field(value: Any, default: Any = None) -> Any: + """ + Normalize a JSON field from database (may be string or already parsed). + + Args: + value: Value from database (string, dict, list, or None) + default: Default value to use if parsing fails or value is None + + Returns: + Parsed JSON object or default value + """ + if value is None: + return default + if isinstance(value, str): + try: + return json.loads(value) + except (json.JSONDecodeError, TypeError): + return default + return value + + +def _normalize_row(row: dict | None, fields_with_defaults: dict[str, Any]) -> dict | None: + """ + Parse stringified JSON fields in a row dictionary and apply defaults for None values. + + Args: + row: Dictionary from database row (or None) + fields_with_defaults: Mapping of field names to default values + + Returns: + Normalized dictionary with parsed JSON fields, or None if input row is None + """ + if row is None: + return None + + d = dict(row) + for field, default_val in fields_with_defaults.items(): + d[field] = _normalize_json_field(d.get(field), default_val) + return d + + +async def create_user_interaction(interaction: UserInteraction) -> None: + """Persist a user interaction in the database.""" + pool = await get_pool() + try: + async with pool.acquire() as connection: + await connection.execute( + """ + INSERT INTO user_interactions ( + id, + agent_id, + mcp_mode, + chat_history, + query, + generated_answer, + retrieved_sources, + llm_usage + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + """, + interaction.id, + interaction.agent_id, + interaction.mcp_mode, + _serialize_json_field(interaction.chat_history), + interaction.query, + interaction.generated_answer, + _serialize_json_field(interaction.retrieved_sources), + _serialize_json_field(interaction.llm_usage), + ) + logger.debug("User interaction logged successfully", interaction_id=str(interaction.id)) + except Exception as exc: # pragma: no cover - defensive logging + logger.error("Failed to log user interaction", error=str(exc), exc_info=True) + + +async def get_interactions( + start_date: datetime | None, + end_date: datetime | None, + agent_id: str | None, + limit: int, + offset: int, + query_text: str | None = None, +) -> tuple[list[dict[str, Any]], int]: + """Fetch paginated interactions matching the supplied filters. + + If start_date and end_date are not provided, returns the last N interactions + ordered by created_at DESC. + """ + pool = await get_pool() + async with pool.acquire() as connection: + params: list[Any] = [] + filters = [] + + if start_date is not None: + params.append(start_date) + filters.append(f"created_at >= ${len(params)}") + + if end_date is not None: + params.append(end_date) + filters.append(f"created_at <= ${len(params)}") + + if agent_id: + params.append(agent_id) + filters.append(f"agent_id = ${len(params)}") + + if query_text: + params.append(f"%{query_text}%") + filters.append(f"query ILIKE ${len(params)}") + + where_clause = "WHERE " + " AND ".join(filters) if filters else "" + + count_query = f""" + SELECT COUNT(*) + FROM user_interactions + {where_clause} + """ + total = await connection.fetchval(count_query, *params) + + params.extend([limit, offset]) + limit_placeholder = len(params) - 1 + offset_placeholder = len(params) + data_query = f""" + SELECT id, created_at, agent_id, query, chat_history, generated_answer + FROM user_interactions + {where_clause} + ORDER BY created_at DESC + LIMIT ${limit_placeholder} + OFFSET ${offset_placeholder} + """ + rows = await connection.fetch(data_query, *params) + + # Normalize JSON fields that may be returned as strings by asyncpg + items = [_normalize_row(dict(row), {"chat_history": []}) for row in rows] + return items, int(total) + + +async def migrate_user_interaction(interaction: UserInteraction) -> tuple[bool, bool]: + """ + Persist a user interaction for migration purposes with upsert behavior. + + Uses ON CONFLICT DO UPDATE to override existing entries based on the ID. + This allows re-running migrations to update data if needed. + + Args: + interaction: UserInteraction model with pre-set ID from LangSmith + + Returns: + Tuple of (was_modified, was_inserted) where: + - was_modified: True if any action was taken (insert or update) + - was_inserted: True if inserted, False if updated + """ + pool = await get_pool() + try: + async with pool.acquire() as connection: + # Single upsert round-trip; infer insert vs update via system column + row = await connection.fetchrow( + """ + INSERT INTO user_interactions ( + id, + created_at, + agent_id, + mcp_mode, + chat_history, + query, + generated_answer, + retrieved_sources, + llm_usage + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (id) DO UPDATE SET + created_at = EXCLUDED.created_at, + agent_id = EXCLUDED.agent_id, + mcp_mode = EXCLUDED.mcp_mode, + chat_history = EXCLUDED.chat_history, + query = EXCLUDED.query, + generated_answer = EXCLUDED.generated_answer, + retrieved_sources = EXCLUDED.retrieved_sources, + llm_usage = EXCLUDED.llm_usage + RETURNING (xmax = 0) AS inserted + """, + interaction.id, + interaction.created_at, + interaction.agent_id, + interaction.mcp_mode, + _serialize_json_field(interaction.chat_history), + interaction.query, + interaction.generated_answer, + _serialize_json_field(interaction.retrieved_sources), + _serialize_json_field(interaction.llm_usage), + ) + + if row is None: + logger.warning("Unexpected: no result from upsert", interaction_id=str(interaction.id)) + return False, False + + was_inserted = bool(row["inserted"]) if "inserted" in row else False + if was_inserted: + logger.debug("User interaction inserted", interaction_id=str(interaction.id)) + else: + logger.debug("User interaction updated", interaction_id=str(interaction.id)) + return True, was_inserted + except Exception as exc: # pragma: no cover - defensive logging + logger.error("Failed to migrate user interaction", error=str(exc), exc_info=True) + raise + diff --git a/python/src/cairo_coder/db/session.py b/python/src/cairo_coder/db/session.py new file mode 100644 index 0000000..dc3b8a9 --- /dev/null +++ b/python/src/cairo_coder/db/session.py @@ -0,0 +1,90 @@ +""" +Asyncpg session management for the Query Insights persistence layer. +""" + +from __future__ import annotations + +import asyncio + +import asyncpg +import structlog + +from cairo_coder.config.manager import ConfigManager + +logger = structlog.get_logger(__name__) + +# Maintain one pool per running event loop to avoid cross-loop usage issues +pools: dict[int, asyncpg.Pool] = {} + + +async def get_pool() -> asyncpg.Pool: + """Return an asyncpg connection pool bound to the current event loop. + + FastAPI's TestClient and AnyIO can run application code across different + event loops. Using a single cached pool may lead to cross-loop errors. + To prevent this, we maintain a pool per loop. + """ + loop = asyncio.get_running_loop() + key = id(loop) + pool = pools.get(key) + if pool is None: + config = ConfigManager.load_config() + try: + pool = await asyncpg.create_pool( + dsn=config.vector_store.dsn, + min_size=2, + max_size=10, + ) + pools[key] = pool + logger.info("Database connection pool created successfully.") + except Exception as exc: # pragma: no cover - defensive logging + logger.error("Failed to create database connection pool", error=str(exc)) + raise + return pool + + +async def close_pool() -> None: + """Close the asyncpg connection pool if it is active.""" + import contextlib + + # Close and clear all pools + global pools + for p in list(pools.values()): + with contextlib.suppress(Exception): + await p.close() + pools.clear() + logger.info("Database connection pool(s) closed.") + + +async def execute_schema_scripts() -> None: + """Ensure the tables required for the insights platform exist.""" + pool = await get_pool() + async with pool.acquire() as connection: + # Ensure pgcrypto is available for gen_random_uuid() + await connection.execute( + "CREATE EXTENSION IF NOT EXISTS pgcrypto;" + ) + await connection.execute( + """ + CREATE TABLE IF NOT EXISTS user_interactions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + agent_id VARCHAR(50) NOT NULL, + mcp_mode BOOLEAN NOT NULL DEFAULT FALSE, + chat_history JSONB, + query TEXT NOT NULL, + generated_answer TEXT, + retrieved_sources JSONB, + llm_usage JSONB + ); + """ + ) + await connection.execute( + """ + CREATE INDEX IF NOT EXISTS idx_interactions_created_at + ON user_interactions(created_at); + CREATE INDEX IF NOT EXISTS idx_interactions_agent_id + ON user_interactions(agent_id); + """ + ) + logger.info("Database schema initialized.") diff --git a/python/src/cairo_coder/server/app.py b/python/src/cairo_coder/server/app.py index 7c60f6b..9a9aa9c 100644 --- a/python/src/cairo_coder/server/app.py +++ b/python/src/cairo_coder/server/app.py @@ -18,7 +18,7 @@ import structlog import uvicorn from dspy.adapters import ChatAdapter, XMLAdapter -from fastapi import Depends, FastAPI, Header, HTTPException, Request +from fastapi import BackgroundTasks, Depends, FastAPI, Header, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field, field_validator @@ -28,8 +28,12 @@ from cairo_coder.core.config import VectorStoreConfig from cairo_coder.core.rag_pipeline import RagPipeline from cairo_coder.core.types import Message, Role, StreamEventType +from cairo_coder.db import session as db_session +from cairo_coder.db.models import UserInteraction +from cairo_coder.db.repository import create_user_interaction from cairo_coder.dspy.document_retriever import SourceFilteredPgVectorRM from cairo_coder.dspy.suggestion_program import SuggestionGeneration +from cairo_coder.server.insights_api import router as insights_router from cairo_coder.utils.logging import setup_logging # Configure structured logging @@ -140,6 +144,70 @@ class SuggestionResponse(BaseModel): suggestions: list[str] = Field(..., description="List of 4-5 follow-up suggestions") +async def log_interaction_task( + agent_id: str, + mcp_mode: bool, + query: str, + chat_history: list[Message], + response: ChatCompletionResponse, + agent: RagPipeline, +) -> None: + """Background task that persists a user interaction.""" + sources_data = [ + {"page_content": doc.page_content, "metadata": doc.metadata} + for doc in agent.last_retrieved_documents + ] + + # Convert Message objects to dicts for JSON serialization + chat_history_dicts = [ + {"role": msg.role.value if hasattr(msg.role, "value") else msg.role, "content": msg.content} + for msg in chat_history + ] if chat_history else None + + interaction = UserInteraction( + agent_id=agent_id, + mcp_mode=mcp_mode, + chat_history=chat_history_dicts, + query=query, + generated_answer=response.choices[0].message.content if response.choices else None, + retrieved_sources=sources_data, + # TODO: fix LLM usage metrics + llm_usage={} + ) + await create_user_interaction(interaction) + + +async def log_interaction_raw( + agent_id: str, + mcp_mode: bool, + query: str, + chat_history: list[Message], + generated_answer: str | None, + agent: RagPipeline, +) -> None: + """Persist a user interaction without constructing a full response object.""" + sources_data = [ + {"page_content": doc.page_content, "metadata": doc.metadata} + for doc in agent.last_retrieved_documents + ] + + chat_history_dicts = [ + {"role": msg.role.value if hasattr(msg.role, "value") else msg.role, "content": msg.content} + for msg in chat_history + ] if chat_history else None + + interaction = UserInteraction( + agent_id=agent_id, + mcp_mode=mcp_mode, + chat_history=chat_history_dicts, + query=query, + generated_answer=generated_answer, + retrieved_sources=sources_data, + llm_usage={}, + ) + await create_user_interaction(interaction) + + class CairoCoderServer: """ FastAPI server for Cairo Coder that replicates TypeScript backend functionality. @@ -177,6 +245,8 @@ def __init__( allow_headers=["*"], ) + self.app.include_router(insights_router) + # Setup routes self._setup_routes() @@ -241,6 +311,7 @@ async def agent_chat_completions( agent_id: str, request: ChatCompletionRequest, req: Request, + background_tasks: BackgroundTasks, mcp: str | None = Header(None), x_mcp_mode: str | None = Header(None, alias="x-mcp-mode"), vector_db: SourceFilteredPgVectorRM = Depends(get_vector_db), @@ -267,13 +338,14 @@ async def agent_chat_completions( mcp_mode = bool(mcp or x_mcp_mode) return await self._handle_chat_completion( - request, req, agent_factory, agent_id, mcp_mode, vector_db + request, req, background_tasks, agent_factory, agent_id, mcp_mode, vector_db ) @self.app.post("/v1/chat/completions") async def v1_chat_completions( request: ChatCompletionRequest, req: Request, + background_tasks: BackgroundTasks, mcp: str | None = Header(None), x_mcp_mode: str | None = Header(None, alias="x-mcp-mode"), vector_db: SourceFilteredPgVectorRM = Depends(get_vector_db), @@ -284,13 +356,14 @@ async def v1_chat_completions( mcp_mode = bool(mcp or x_mcp_mode) return await self._handle_chat_completion( - request, req, agent_factory, None, mcp_mode, vector_db + request, req, background_tasks, agent_factory, None, mcp_mode, vector_db ) @self.app.post("/chat/completions") async def chat_completions( request: ChatCompletionRequest, req: Request, + background_tasks: BackgroundTasks, mcp: str | None = Header(None), x_mcp_mode: str | None = Header(None, alias="x-mcp-mode"), vector_db: SourceFilteredPgVectorRM = Depends(get_vector_db), @@ -301,7 +374,7 @@ async def chat_completions( mcp_mode = bool(mcp or x_mcp_mode) return await self._handle_chat_completion( - request, req, agent_factory, None, mcp_mode, vector_db + request, req, background_tasks, agent_factory, None, mcp_mode, vector_db ) @self.app.post("/v1/suggestions", response_model=SuggestionResponse) @@ -334,6 +407,7 @@ async def _handle_chat_completion( self, request: ChatCompletionRequest, req: Request, + background_tasks: BackgroundTasks, agent_factory: AgentFactory, agent_id: str | None = None, mcp_mode: bool = False, @@ -349,23 +423,19 @@ async def _handle_chat_completion( # Get last user message as query query = request.messages[-1].content + # Determine agent ID (fallback to cairo-coder) + effective_agent_id = agent_id or "cairo-coder" + # Create agent - if agent_id: - agent = agent_factory.get_or_create_agent( - agent_id=agent_id, - mcp_mode=mcp_mode, - ) - else: - # In the default case, fallback to cairo-coder - agent = agent_factory.get_or_create_agent( - agent_id="cairo-coder", - mcp_mode=mcp_mode, - ) + agent = agent_factory.get_or_create_agent( + agent_id=effective_agent_id, + mcp_mode=mcp_mode, + ) # Handle streaming vs non-streaming if request.stream: return StreamingResponse( - self._stream_chat_completion(agent, query, messages[:-1], mcp_mode), + self._stream_chat_completion(agent, query, messages[:-1], mcp_mode, effective_agent_id), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", @@ -373,7 +443,20 @@ async def _handle_chat_completion( "X-Accel-Buffering": "no", }, ) - return await self._generate_chat_completion(agent, query, messages[:-1], mcp_mode) + chat_history = messages[:-1] + response = await self._generate_chat_completion(agent, query, chat_history, mcp_mode) + + background_tasks.add_task( + log_interaction_task, + agent_id=effective_agent_id, + mcp_mode=mcp_mode, + query=query, + chat_history=chat_history, + response=response, + agent=agent, + ) + + return response except ValueError as e: raise HTTPException( @@ -397,7 +480,7 @@ async def _handle_chat_completion( ) from e async def _stream_chat_completion( - self, agent: RagPipeline, query: str, history: list[Message], mcp_mode: bool + self, agent: RagPipeline, query: str, history: list[Message], mcp_mode: bool, agent_id: str ) -> AsyncGenerator[str, None]: """Stream chat completion response - replicates TypeScript streaming.""" response_id = str(uuid.uuid4()) @@ -500,6 +583,19 @@ async def _stream_chat_completion( ], } yield f"data: {json.dumps(error_chunk)}\n\n" + finally: + # Log interaction regardless of client disconnects or errors + try: + await log_interaction_raw( + agent_id=agent_id, + mcp_mode=mcp_mode, + query=query, + chat_history=history, + generated_answer=final_response, + agent=agent, + ) + except Exception as log_error: + logger.error("Failed to log streaming interaction", error=str(log_error), exc_info=True) # Send final chunk final_chunk = { @@ -512,6 +608,8 @@ async def _stream_chat_completion( yield f"data: {json.dumps(final_chunk)}\n\n" yield "data: [DONE]\n\n" + # Logging is handled in finally above + def _format_chat_history_for_suggestions(self, chat_history: list[ChatMessage]) -> str: """ Format chat history for suggestion generation. @@ -660,6 +758,10 @@ async def lifespan(app: FastAPI): logger.info("Starting Cairo Coder server - initializing resources") + # Initialize SQL persistence layer + await db_session.get_pool() + await db_session.execute_schema_scripts() + # Load config once config = ConfigManager.load_config() vector_store_config = config.vector_store @@ -687,6 +789,8 @@ async def lifespan(app: FastAPI): # Cleanup logger.info("Shutting down Cairo Coder server - cleaning up resources") + await db_session.close_pool() + if _vector_db and _vector_db.pool: await _vector_db.pool.close() _vector_db.pool = None diff --git a/python/src/cairo_coder/server/insights_api.py b/python/src/cairo_coder/server/insights_api.py new file mode 100644 index 0000000..e8ffc9d --- /dev/null +++ b/python/src/cairo_coder/server/insights_api.py @@ -0,0 +1,71 @@ +""" +Router exposing the Query Insights API endpoints. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any +from uuid import UUID + +import structlog +from fastapi import APIRouter +from pydantic import BaseModel + +from cairo_coder.db.repository import get_interactions + +logger = structlog.get_logger(__name__) + +router = APIRouter(prefix="/v1/insights", tags=["Insights"]) + + +class QueryResponse(BaseModel): + """Subset of `UserInteraction` returned through the API.""" + + id: UUID + created_at: datetime + agent_id: str + query: str + chat_history: list[dict[str, Any]] + output: str | None + + +class PaginatedQueryResponse(BaseModel): + """Paginated list of raw queries.""" + + items: list[QueryResponse] + total: int + limit: int + offset: int + + +@router.get("/queries", response_model=PaginatedQueryResponse) +async def get_raw_queries( + start_date: datetime | None = None, + end_date: datetime | None = None, + agent_id: str | None = None, + query_text: str | None = None, + limit: int = 100, + offset: int = 0, +) -> PaginatedQueryResponse: + """Return raw user queries. + + If start_date and end_date are not provided, returns the last N queries + ordered by creation time (where N is the limit parameter). + """ + items, total = await get_interactions( + start_date, end_date, agent_id, limit, offset, query_text + ) + # Map generated_answer to output for API response + responses = [ + QueryResponse( + id=item["id"], + created_at=item["created_at"], + agent_id=item["agent_id"], + query=item["query"], + chat_history=item["chat_history"] or [], + output=item.get("generated_answer"), + ) + for item in items + ] + return PaginatedQueryResponse(items=responses, total=total, limit=limit, offset=offset) diff --git a/python/src/cairo_coder_tools/datasets/analysis.py b/python/src/cairo_coder_tools/datasets/analysis.py index 4443d9e..195ac1e 100644 --- a/python/src/cairo_coder_tools/datasets/analysis.py +++ b/python/src/cairo_coder_tools/datasets/analysis.py @@ -22,6 +22,7 @@ import argparse import json import os +import re import sys from collections import Counter, defaultdict from collections.abc import Iterable @@ -218,6 +219,39 @@ def collect_counts( return category_counts, intent_counts, leaves_by_category, frameworks_counts +def run_analysis_from_queries(queries: list[str]) -> dict[str, Any]: + """ + Produce lightweight analytics from a list of user queries. + + Returns aggregate counts plus the most frequent normalized terms to surface + recurring topics. + """ + total_queries = len(queries) + if total_queries == 0: + return { + "total_queries": 0, + "average_word_count": 0.0, + "top_terms": [], + } + + word_counts: list[int] = [] + term_counter: Counter[str] = Counter() + + for query in queries: + tokens = re.findall(r"[a-z0-9]+", query.lower()) + word_counts.append(len(tokens)) + term_counter.update(tokens) + + average_word_count = sum(word_counts) / total_queries if word_counts else 0.0 + top_terms = term_counter.most_common(10) + + return { + "total_queries": total_queries, + "average_word_count": average_word_count, + "top_terms": top_terms, + } + + def plot_barh(ax, labels: list[str], counts: list[int], title: str, xlabel: str, color: str): ax.barh(labels, counts, color=color) ax.set_title(title) diff --git a/python/src/cairo_coder_tools/datasets/extractors.py b/python/src/cairo_coder_tools/datasets/extractors.py index abcfca2..0fb3b42 100644 --- a/python/src/cairo_coder_tools/datasets/extractors.py +++ b/python/src/cairo_coder_tools/datasets/extractors.py @@ -94,54 +94,31 @@ def _extract_answer_from_prediction(output: str) -> str: @dataclass class RunQueries: - """Container for queries extracted from a single run.""" + """Container for a single run from LangSmith. + + This represents a single query with its chat history, matching the format + that LangSmith provides and the live server uses. + """ run_id: str - queries: list[str] + query: str + chat_history: list[dict[str, str]] output: str mcp_mode: bool + created_at: datetime + agent_id: str | None = None def to_dict(self) -> dict[str, Any]: - return { + result = { "run_id": self.run_id, - "queries": self.queries, + "query": self.query, + "chat_history": self.chat_history, "mcp_mode": self.mcp_mode, - "output": self.output + "output": self.output, + "created_at": self.created_at.isoformat(), } - - -class RunDeduplicator: - """Removes runs whose queries are prefixes of other runs.""" - - def deduplicate(self, runs: list[RunQueries]) -> list[RunQueries]: - """ - Remove runs that are prefixes of longer runs. - - Args: - runs: List of run queries to deduplicate - - Returns: - Filtered list with prefix runs removed - """ - if not runs: - return [] - - # Sort by query count (longest first) while tracking original indices - indexed_runs = list(enumerate(runs)) - indexed_runs.sort(key=lambda x: len(x[1].queries), reverse=True) - - kept_queries = [] - keep_indices = set() - - for idx, run in indexed_runs: - # Skip if this run's queries are a prefix of any kept run - if any(run.queries == kq[:len(run.queries)] for kq in kept_queries): - continue - - keep_indices.add(idx) - kept_queries.append(run.queries) - - # Restore original order - return [run for idx, run in enumerate(runs) if idx in keep_indices] + if self.agent_id: + result["agent_id"] = self.agent_id + return result def extract_cairocoder_pairs( @@ -150,10 +127,14 @@ def extract_cairocoder_pairs( run_name_filters: list[str] | None = None, project_name: str | None = None, ) -> tuple[list[dict[str, Any]], dict[str, int]]: - """Extract {query, answer} pairs from LangSmith for Cairo-Coder. + """Extract runs from LangSmith for Cairo-Coder. This function connects to LangSmith API and fetches runs from the specified - project, then deduplicates and formats them as query/answer pairs. + project. Each run is returned as-is with its query, chat_history, and output, + matching the format the live server uses. + + This replaces the old behavior of combining queries into arrays and deduplicating. + Now every query creates a database entry, making all queries searchable. Args: days_back: Number of days to look back for runs (default: 14) @@ -161,8 +142,8 @@ def extract_cairocoder_pairs( project_name: LangSmith project name (default: from LANGSMITH_PROJECT env var or "default") Returns: - A tuple of (pairs, stats) where: - - pairs is a list of dicts, each containing run information + A tuple of (runs, stats) where: + - runs is a list of dicts, each containing {run_id, query, chat_history, output, mcp_mode, created_at} - stats contains total runs, matched runs, etc. """ # Load environment variables @@ -176,7 +157,6 @@ def extract_cairocoder_pairs( # Initialize LangSmith client client = Client() - deduplicator = RunDeduplicator() # Calculate time range end_time = datetime.now(timezone.utc) @@ -208,23 +188,24 @@ def extract_cairocoder_pairs( output = run_data["outputs"]["output"] mcp_mode = inputs.get("mcp_mode", False) - # Extract user queries from chat history - user_queries_in_history = [ - msg['content'] for msg in chat_history - if isinstance(msg, dict) and msg.get("role") == "user" - ] - - # Combine chat history queries with current query - full_query = user_queries_in_history + [query] - # Extract clean answer from Prediction output clean_output = _extract_answer_from_prediction(output) + # Get timestamp (prefer start_time, fallback to end_time or now) + run_timestamp = run_data.get("start_time") or run_data.get("end_time") + if isinstance(run_timestamp, str): + run_timestamp = datetime.fromisoformat(run_timestamp.replace('Z', '+00:00')) + elif not isinstance(run_timestamp, datetime): + run_timestamp = datetime.now(timezone.utc) + + # Pass through data as LangSmith provides it runs.append(RunQueries( run_id=str(run_data["id"]), - queries=full_query, + query=query, + chat_history=chat_history, # Keep full chat history with user+assistant messages mcp_mode=mcp_mode, - output=clean_output + output=clean_output, + created_at=run_timestamp, )) except (KeyError, TypeError): skipped += 1 @@ -234,9 +215,6 @@ def extract_cairocoder_pairs( stats = {"total": 0, "matched": 0, "skipped": 0, "error": str(e)} return [], stats - # Deduplicate runs - runs = deduplicator.deduplicate(runs) - # Convert to output format results = [run.to_dict() for run in runs] diff --git a/python/src/cairo_coder_tools/datasets/migrate_langsmith.py b/python/src/cairo_coder_tools/datasets/migrate_langsmith.py new file mode 100644 index 0000000..f0b7e3f --- /dev/null +++ b/python/src/cairo_coder_tools/datasets/migrate_langsmith.py @@ -0,0 +1,161 @@ +""" +Migration utilities for importing LangSmith runs into the PostgreSQL database. + +This module provides functions to transform LangSmith run data into the +UserInteraction format and persist it to the database with idempotent behavior. +""" + +from __future__ import annotations + +import uuid +from datetime import datetime, timezone +from typing import Any + +import structlog + +from cairo_coder.agents.registry import AgentId +from cairo_coder.db.models import UserInteraction +from cairo_coder.db.repository import migrate_user_interaction + +logger = structlog.get_logger(__name__) + + +def transform_run_to_interaction(run: dict[str, Any]) -> UserInteraction: + """ + Transform a LangSmith run to a UserInteraction. + + LangSmith data format: + { + "run_id": "UUID", + "query": "current query", + "chat_history": [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}], + "output": "generated response", + "mcp_mode": false, + "created_at": "2025-01-15T10:30:00Z" + } + + This directly maps to UserInteraction, no transformation needed. + + Args: + run: Dictionary containing run data from LangSmith + + Returns: + UserInteraction model ready for database insertion + + Raises: + ValueError: If run data is invalid or missing required fields + """ + # Validate required fields + if not run.get("run_id"): + raise ValueError("Run missing 'run_id' field") + + if "query" not in run: + raise ValueError(f"Run {run['run_id']} has no query field") + + # Convert run_id to UUID + try: + run_id = uuid.UUID(run["run_id"]) + except (ValueError, TypeError) as e: + raise ValueError(f"Invalid run_id format: {run['run_id']}") from e + + # Parse created_at timestamp if available + created_at = datetime.now(timezone.utc) + if "created_at" in run: + try: + created_at_value = run["created_at"] + if isinstance(created_at_value, str): + created_at = datetime.fromisoformat(created_at_value.replace('Z', '+00:00')) + elif isinstance(created_at_value, datetime): + created_at = created_at_value + except (ValueError, TypeError) as e: + logger.warning( + "Failed to parse created_at, using migration time", + run_id=run["run_id"], + created_at=run.get("created_at"), + error=str(e), + ) + + # Get chat_history, ensure it's a list + chat_history = run.get("chat_history") + if chat_history and not isinstance(chat_history, list): + logger.warning( + "Invalid chat_history format, using empty list", + run_id=run["run_id"], + chat_history_type=type(chat_history).__name__, + ) + chat_history = [] + + # Create UserInteraction directly from run data + is_mcp_mode = run.get("mcp_mode", False) + default_agent_id = AgentId.CAIRO_CODER.value if is_mcp_mode else AgentId.STARKNET.value + resolved_agent_id = run.get("agent_id", default_agent_id) + + return UserInteraction( + id=run_id, + created_at=created_at, + agent_id=resolved_agent_id, + mcp_mode=is_mcp_mode, + chat_history=chat_history if chat_history else None, + query=run["query"], + generated_answer=run.get("output", ""), + retrieved_sources=None, # Not available from LangSmith export + llm_usage=None, # Not available from LangSmith export + ) + + +async def migrate_runs(runs: list[dict[str, Any]]) -> dict[str, int]: + """ + Migrate a list of LangSmith runs to the database. + + Expected format: {run_id, query, chat_history, output, mcp_mode, created_at} + + This function uses upsert behavior - existing records will be updated + with new data, and new records will be inserted. + + Args: + runs: List of run dictionaries from LangSmith + + Returns: + Dictionary with migration statistics: + { + "runs_processed": total number of runs processed, + "inserted": number of new records created, + "updated": number of existing records updated, + "failed": number of errors + } + """ + stats = { + "runs_processed": 0, + "inserted": 0, + "updated": 0, + "failed": 0, + } + + for run in runs: + try: + interaction = transform_run_to_interaction(run) + + was_modified, was_inserted = await migrate_user_interaction(interaction) + + if was_modified: + if was_inserted: + stats["inserted"] += 1 + else: + stats["updated"] += 1 + + stats["runs_processed"] += 1 + + except ValueError as e: + logger.warning("Skipping invalid run", error=str(e), run_id=run.get("run_id")) + stats["failed"] += 1 + except Exception as e: + logger.error( + "Failed to process run", + error=str(e), + run_id=run.get("run_id"), + exc_info=True, + ) + stats["failed"] += 1 + + logger.info("Migration completed", **stats) + return stats diff --git a/python/src/scripts/dataset.py b/python/src/scripts/dataset.py index 9694c10..3d7e70b 100644 --- a/python/src/scripts/dataset.py +++ b/python/src/scripts/dataset.py @@ -16,6 +16,7 @@ from cairo_coder_tools.datasets.extractors import ( extract_cairocoder_pairs, ) +from cairo_coder_tools.datasets.migrate_langsmith import migrate_runs class HelpOnInvalidCommand(TyperGroup): @@ -49,9 +50,15 @@ def get_command(self, ctx, cmd_name): # type: ignore[override] help="Generate synthetic datasets (no input required)", no_args_is_help=True, ) +migrate_app = typer.Typer( + cls=HelpOnInvalidCommand, + help="Migrate historical data to PostgreSQL database", + no_args_is_help=True, +) app.add_typer(extract_app, name="extract") app.add_typer(generate_app, name="generate") +app.add_typer(migrate_app, name="migrate") @@ -140,6 +147,91 @@ def generate_starklings( typer.echo(f"Generated {len(examples)} examples to {output}") +@migrate_app.command("langsmith") +def migrate_langsmith( + source: Path | None = typer.Option( + None, + "--source", + help="JSON file with LangSmith runs (if not provided, fetches from API)", + ), + days: int = typer.Option( + 14, + "--days", + help="Number of days to look back for runs (only used when fetching from API)", + ), + names: list[str] = typer.Option( + ["RagPipeline", "RagPipelineStreaming"], + "--names", + help="Filter runs by names (only used when fetching from API)", + ), + project: str = typer.Option( + None, + "--project", + help="LangSmith project name (only used when fetching from API)", + ), +) -> None: + """ + Migrate LangSmith runs to PostgreSQL database. + + This command can either: + 1. Load runs from a JSON file (--source) + 2. Fetch runs from LangSmith API (if no --source provided) + + The migration uses upsert behavior - existing records will be updated + with new data from the source, allowing you to refresh migrated data. + """ + # Determine data source + if source: + # Load from file + source = Path(source).expanduser() + if not source.exists(): + typer.secho(f"Error: File not found: {source}", fg=typer.colors.RED, err=True) + raise typer.Exit(1) + + with source.open("r", encoding="utf-8") as f: + data = json.load(f) + + runs = data.get("runs", []) + typer.echo(f"Loaded {len(runs)} runs from {source}") + else: + # Fetch from LangSmith API + typer.echo("Fetching runs from LangSmith API...") + runs, stats = extract_cairocoder_pairs( + days_back=days, + run_name_filters=names, + project_name=project, + ) + typer.echo( + f"Fetched {len(runs)} runs (matched: {stats['matched']}, " + f"skipped: {stats['skipped']}, total: {stats['total']})" + ) + + if not runs: + typer.echo("No runs to migrate.") + return + + # Migrate runs to database + typer.echo("Migrating runs to database...") + migration_stats = asyncio.run(migrate_runs(runs)) + + # Display results + typer.echo( + json.dumps( + { + "status": "completed", + **migration_stats, + }, + indent=2, + ) + ) + + if migration_stats["failed"] > 0: + typer.secho( + f"Warning: {migration_stats['failed']} runs failed to migrate. Check logs for details.", + fg=typer.colors.YELLOW, + ) + + # Analyze command removed - analyze_dataset function doesn't exist in analysis.py # TODO: Re-implement or remove if needed diff --git a/python/tests/conftest.py b/python/tests/conftest.py index 7d2e8e3..42bf3ba 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -7,11 +7,13 @@ import os from unittest.mock import AsyncMock, Mock, patch +from urllib.parse import urlparse import dspy import pytest from fastapi.testclient import TestClient +from cairo_coder.agents.registry import AgentId from cairo_coder.core.agent_factory import AgentFactory from cairo_coder.core.config import VectorStoreConfig from cairo_coder.core.rag_pipeline import RagPipeline, RagPipelineConfig @@ -27,7 +29,7 @@ from cairo_coder.dspy.generation_program import GenerationProgram, McpGenerationProgram from cairo_coder.dspy.query_processor import QueryProcessorProgram from cairo_coder.dspy.retrieval_judge import RetrievalJudge -from cairo_coder.server.app import CairoCoderServer, get_agent_factory +from cairo_coder.server.app import CairoCoderServer, get_agent_factory, get_vector_db @pytest.fixture(scope="function") @@ -123,7 +125,7 @@ def mock_agent_factory(mock_agent: Mock): Returns a mock AgentFactory. """ factory = Mock(spec=AgentFactory) - factory.get_available_agents.return_value = ["cairo-coder", "scarb-assistant"] + factory.get_available_agents.return_value = ["cairo-coder", "starknet-agent"] def get_agent_info(agent_id, **kwargs): # Use the actual registry @@ -209,6 +211,7 @@ async def mock_aforward(query: str, chat_history: list[Message] | None = None, m mock_agent.forward = mock_forward mock_agent.aforward = mock_aforward mock_agent.aforward_streaming = mock_aforward_streaming + return mock_agent @@ -222,17 +225,68 @@ def server(mock_vector_store_config): # Low-level pipeline fixtures (for integration tests) # ============================================================================= +@pytest.fixture(scope="session") +def postgres_container(): + """Session-scoped Postgres container for DB-backed tests. + + Skips if testcontainers is unavailable (e.g., in CI without Docker). + """ + try: + from testcontainers.postgres import PostgresContainer # type: ignore + except Exception as exc: # pragma: no cover - optional dependency + pytest.skip(f"testcontainers not available: {exc}") + + container = PostgresContainer("postgres:16-alpine") + container.start() + try: + yield container + finally: + container.stop() + @pytest.fixture -def client(server, mock_agent_factory, mock_vector_db): - """Unit-level client: uses mock_agent_factory and shared mock_vector_db.""" - from cairo_coder.server.app import get_vector_db +def client(server, postgres_container, real_pipeline, mock_vector_db, mock_agent_factory, monkeypatch, mock_vector_store_config): + """Integration-level client with pipeline injection. - async def mock_get_agent_factory(): - return mock_agent_factory + Overrides FastAPI dependencies: + - get_vector_db -> shared mock_vector_db + - get_agent_factory -> shared mock_agent_factory returning real_pipeline + """ + from unittest.mock import AsyncMock, Mock + + + # Configure the reusable mock factory to return the real pipeline + mock_agent_factory.get_or_create_agent.return_value = real_pipeline + mock_agent_factory.get_available_agents.return_value = [agent_id.value for agent_id in AgentId] + + # Ensure the app's lifespan can initialize vector DB using the ephemeral Postgres + raw_dsn = postgres_container.get_connection_url() + dsn = raw_dsn.replace("postgresql+psycopg2", "postgresql") + + parsed = urlparse(dsn) + host = parsed.hostname or "127.0.0.1" + port = str(parsed.port or 5432) + user = parsed.username or "postgres" + password = parsed.password or "postgres" + database = (parsed.path or "/postgres").lstrip("/") + + # Set env so ConfigManager.load_config() succeeds and the vector DB connects + monkeypatch.setenv("POSTGRES_HOST", host) + monkeypatch.setenv("POSTGRES_PORT", port) + monkeypatch.setenv("POSTGRES_DB", database) + monkeypatch.setenv("POSTGRES_USER", user) + monkeypatch.setenv("POSTGRES_PASSWORD", password) + # Keep default table name for vector store + monkeypatch.setenv("POSTGRES_TABLE_NAME", "documents") + + # Mock vector DB pool initialization + mock_vector_db._ensure_pool = AsyncMock() + if not hasattr(mock_vector_db, 'pool') or mock_vector_db.pool is None: + mock_vector_db.pool = Mock() + mock_vector_db.pool.close = AsyncMock() server.app.dependency_overrides[get_vector_db] = lambda: mock_vector_db - server.app.dependency_overrides[get_agent_factory] = mock_get_agent_factory + server.app.dependency_overrides[get_agent_factory] = lambda: mock_agent_factory return TestClient(server.app) # ============================================================================= diff --git a/python/tests/integration/conftest.py b/python/tests/integration/conftest.py index cba2057..9902fc5 100644 --- a/python/tests/integration/conftest.py +++ b/python/tests/integration/conftest.py @@ -9,10 +9,8 @@ import dspy import pytest -from fastapi.testclient import TestClient from cairo_coder.agents.registry import AgentId -from cairo_coder.server.app import get_agent_factory, get_vector_db @pytest.fixture @@ -149,18 +147,128 @@ def mock_predict_constructor(signature): monkeypatch.setattr("dspy.Predict", mock_predict_constructor) -@pytest.fixture -def client(server, real_pipeline, mock_vector_db, mock_agent_factory, patch_suggestion_program): - """Integration-level client with pipeline injection. +@pytest.fixture(scope="function") +async def test_db_pool(postgres_container): + """Asyncpg pool connected to the ephemeral Postgres. - Overrides FastAPI dependencies: - - get_vector_db -> shared mock_vector_db - - get_agent_factory -> shared mock_agent_factory returning real_pipeline + Creates schema directly to avoid cross-loop pool reuse with the app. """ - # Configure the reusable mock factory to return the real pipeline - mock_agent_factory.get_or_create_agent.return_value = real_pipeline - mock_agent_factory.get_available_agents.return_value = [agent_id.value for agent_id in AgentId] + import asyncpg # local import to avoid import at collection when skipped + + raw_dsn = postgres_container.get_connection_url() + # Convert SQLAlchemy-style DSN to asyncpg-compatible DSN + dsn = raw_dsn.replace("postgresql+psycopg2", "postgresql") + pool = await asyncpg.create_pool(dsn=dsn, min_size=1, max_size=5) + + async with pool.acquire() as connection: + await connection.execute( + """ + CREATE EXTENSION IF NOT EXISTS pgcrypto; + CREATE TABLE IF NOT EXISTS user_interactions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + agent_id VARCHAR(50) NOT NULL, + mcp_mode BOOLEAN NOT NULL DEFAULT FALSE, + chat_history JSONB, + query TEXT NOT NULL, + generated_answer TEXT, + retrieved_sources JSONB, + llm_usage JSONB + ); + """ + ) + await connection.execute( + """ + CREATE INDEX IF NOT EXISTS idx_interactions_created_at + ON user_interactions(created_at); + CREATE INDEX IF NOT EXISTS idx_interactions_agent_id + ON user_interactions(agent_id); + """ + ) + + try: + yield pool + finally: + await pool.close() + + +@pytest.fixture() +async def db_connection(test_db_pool): + """Function-scoped connection with a clean state per test. + + - Truncates tables before each test + - Wraps operations in a transaction and rolls back at teardown + """ + async with test_db_pool.acquire() as conn: + # Ensure clean tables and commit immediately so other connections can see it + await conn.execute("TRUNCATE TABLE user_interactions RESTART IDENTITY;") + # No explicit transaction to allow visibility from the app connection + yield conn + + +@pytest.fixture() +async def populated_db_connection(db_connection): + """Populate DB with sample interactions and analyses before yielding connection.""" + import uuid + from datetime import datetime, timedelta, timezone + + now = datetime.now(timezone.utc) + + # Insert sample interactions + import json as _json + + interactions = [ + { + "id": uuid.uuid4(), + "created_at": now - timedelta(hours=4), + "agent_id": "cairo-coder", + "mcp_mode": False, + "chat_history": _json.dumps([]), + "query": "What is Cairo?", + "generated_answer": "Cairo is a programming language", + "retrieved_sources": None, + "llm_usage": None, + }, + { + "id": uuid.uuid4(), + "created_at": now - timedelta(hours=2), + "agent_id": "starknet-agent", + "mcp_mode": True, + "chat_history": _json.dumps([]), + "query": "How to deploy a contract?", + "generated_answer": "Use Starknet CLI", + "retrieved_sources": None, + "llm_usage": None, + }, + { + "id": uuid.uuid4(), + "created_at": now - timedelta(minutes=30), + "agent_id": "cairo-coder", + "mcp_mode": False, + "chat_history": _json.dumps([]), + "query": "Storage variables?", + "generated_answer": "Use #[storage]", + "retrieved_sources": None, + "llm_usage": None, + }, + ] + + for it in interactions: + await db_connection.execute( + """ + INSERT INTO user_interactions ( + id, created_at, agent_id, mcp_mode, chat_history, query, generated_answer, retrieved_sources, llm_usage + ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) + """, + it["id"], + it["created_at"], + it["agent_id"], + it["mcp_mode"], + it["chat_history"], + it["query"], + it["generated_answer"], + it["retrieved_sources"], + it["llm_usage"], + ) - server.app.dependency_overrides[get_vector_db] = lambda: mock_vector_db - server.app.dependency_overrides[get_agent_factory] = lambda: mock_agent_factory - return TestClient(server.app) + return db_connection diff --git a/python/tests/integration/test_insights_api.py b/python/tests/integration/test_insights_api.py new file mode 100644 index 0000000..c49e212 --- /dev/null +++ b/python/tests/integration/test_insights_api.py @@ -0,0 +1,229 @@ +""" +Integration tests for the /v1/insights API endpoints. + +These tests rely on an ephemeral Postgres database via testcontainers and run +by default. They are automatically skipped if Docker is unavailable. + +To skip these tests explicitly, use: pytest -m "not db" +""" + +from __future__ import annotations + +import asyncio +from datetime import datetime, timedelta, timezone + +import pytest + +pytestmark = pytest.mark.db + + +class TestInsightsAPI: + def test_get_queries_success(self, client, populated_db_connection): + now = datetime.now(timezone.utc) + start = (now - timedelta(hours=6)).isoformat() + end = (now + timedelta(minutes=5)).isoformat() + + resp = client.get( + "/v1/insights/queries", + params={"start_date": start, "end_date": end, "limit": 10, "offset": 0}, + ) + assert resp.status_code == 200 + data = resp.json() + assert "items" in data and "total" in data + assert data["total"] >= 3 + assert len(data["items"]) <= 10 + assert all("id" in item and "query" in item for item in data["items"]) # shape check + + def test_get_queries_with_filters(self, client, populated_db_connection): + now = datetime.now(timezone.utc) + start = (now - timedelta(hours=1)).isoformat() + end = (now + timedelta(minutes=5)).isoformat() + + resp = client.get( + "/v1/insights/queries", + params={ + "start_date": start, + "end_date": end, + "agent_id": "cairo-coder", + "limit": 100, + "offset": 0, + }, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["total"] >= 1 + assert all(item["agent_id"] == "cairo-coder" for item in data["items"]) + + def test_get_queries_with_text_filter(self, client, populated_db_connection): + """Test that query_text parameter filters results by text contained in the query.""" + now = datetime.now(timezone.utc) + start = (now - timedelta(hours=6)).isoformat() + end = (now + timedelta(minutes=5)).isoformat() + + # First, get all queries to verify we have data + resp_all = client.get( + "/v1/insights/queries", + params={"start_date": start, "end_date": end, "limit": 100, "offset": 0}, + ) + assert resp_all.status_code == 200 + all_data = resp_all.json() + assert all_data["total"] >= 3 + + # Filter by text that should match at least one query + resp = client.get( + "/v1/insights/queries", + params={ + "start_date": start, + "end_date": end, + "query_text": "Hello", + "limit": 100, + "offset": 0, + }, + ) + assert resp.status_code == 200 + data = resp.json() + # Should have fewer results than unfiltered + assert data["total"] <= all_data["total"] + # All returned items should contain the search text (case-insensitive) + assert all("hello" in item["query"].lower() for item in data["items"]) + + def test_get_queries_with_text_filter_no_match(self, client, populated_db_connection): + """Test that query_text parameter returns empty results when no matches exist.""" + now = datetime.now(timezone.utc) + start = (now - timedelta(hours=6)).isoformat() + end = (now + timedelta(minutes=5)).isoformat() + + resp = client.get( + "/v1/insights/queries", + params={ + "start_date": start, + "end_date": end, + "query_text": "xyz_nonexistent_query_text_xyz", + "limit": 100, + "offset": 0, + }, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 0 + assert data["items"] == [] + + def test_get_queries_not_found(self, client, populated_db_connection): + now = datetime.now(timezone.utc) + start = (now - timedelta(days=10)).isoformat() + end = (now - timedelta(days=9)).isoformat() + + resp = client.get( + "/v1/insights/queries", + params={"start_date": start, "end_date": end, "limit": 100, "offset": 0}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["total"] == 0 + assert data["items"] == [] + + def test_get_queries_without_dates(self, client, populated_db_connection): + """Test that queries can be fetched without providing start/end dates.""" + resp = client.get("/v1/insights/queries", params={"limit": 100, "offset": 0}) + assert resp.status_code == 200 + data = resp.json() + assert data["total"] >= 3 + assert len(data["items"]) >= 3 + # Verify items are ordered by created_at DESC (most recent first) + if len(data["items"]) >= 2: + for i in range(len(data["items"]) - 1): + item_current = datetime.fromisoformat(data["items"][i]["created_at"]) + item_next = datetime.fromisoformat(data["items"][i + 1]["created_at"]) + assert item_current >= item_next + + def test_get_queries_without_dates_with_limit(self, client, populated_db_connection): + """Test that limit works correctly when dates are not provided.""" + resp = client.get("/v1/insights/queries", params={"limit": 2, "offset": 0}) + assert resp.status_code == 200 + data = resp.json() + assert data["total"] >= 3 + assert len(data["items"]) == 2 + + def test_get_queries_without_dates_with_filters(self, client, populated_db_connection): + """Test that other filters work when dates are not provided.""" + resp = client.get( + "/v1/insights/queries", + params={"agent_id": "cairo-coder", "limit": 100, "offset": 0}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["total"] >= 1 + assert all(item["agent_id"] == "cairo-coder" for item in data["items"]) + + +class TestDataIngestion: + async def test_chat_completion_logs_interaction_to_db(self, client, test_db_pool): + # Make a non-streaming chat completion request + payload = { + "messages": [{"role": "user", "content": "Hello"}], + "stream": False, + } + resp = client.post("/v1/chat/completions", json=payload) + assert resp.status_code == 200 + + # Poll DB until record appears + count = 0 + for _ in range(50): + async with test_db_pool.acquire() as conn: + count = await conn.fetchval("SELECT COUNT(*) FROM user_interactions") + if count >= 1: + break + await asyncio.sleep(0.05) + + assert count >= 1 + + # Verify content matches request/response shape + async with test_db_pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT agent_id, query, generated_answer + FROM user_interactions + ORDER BY created_at DESC LIMIT 1 + """ + ) + + assert row["agent_id"] == "cairo-coder" + assert row["query"] == "Hello" + assert isinstance(row["generated_answer"], str) and len(row["generated_answer"]) > 0 + + async def test_streaming_chat_completion_logs_interaction_to_db(self, client, test_db_pool): + """ + Verify that a user interaction is logged after a streaming response is fully consumed. + """ + payload = { + "messages": [{"role": "user", "content": "Hello streaming"}], + "stream": True, + } + # The `with` statement ensures the full request/response cycle completes + with client.stream("POST", "/v1/chat/completions", json=payload) as response: + assert response.status_code == 200 + # Consume the stream to trigger the background task at the end + response.read() + + # Poll the database to wait for the background task to complete + count = 0 + for _ in range(50): # Wait up to ~2.5 seconds + async with test_db_pool.acquire() as conn: + count = await conn.fetchval("SELECT COUNT(*) FROM user_interactions") + if count >= 1: + break + await asyncio.sleep(0.05) + + assert count >= 1, "Interaction was not logged for streaming request" + + # Verify the logged data + async with test_db_pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT query, generated_answer + FROM user_interactions + ORDER BY created_at DESC LIMIT 1 + """ + ) + assert row["query"] == "Hello streaming" + assert "Hello! I'm Cairo Coder" in row["generated_answer"] diff --git a/python/tests/integration/test_server_integration.py b/python/tests/integration/test_server_integration.py index 0670cf0..72fee07 100644 --- a/python/tests/integration/test_server_integration.py +++ b/python/tests/integration/test_server_integration.py @@ -11,6 +11,7 @@ import uuid from unittest.mock import Mock, patch +import pytest from fastapi import FastAPI from fastapi.testclient import TestClient @@ -580,9 +581,27 @@ def test_openai_error_response_structure(self, client: TestClient, mock_agent_fa class TestSuggestionEndpoint: - """Test suite for the suggestion generation endpoint.""" + @pytest.fixture(autouse=True) + def setup(self, monkeypatch): + from unittest.mock import AsyncMock, Mock + + import dspy + # Create a mock program + mock_program = Mock(spec=dspy.Predict) + mock_program.aforward = AsyncMock( + return_value=dspy.Prediction(suggestions=["My custom suggestion"]) + ) + # Patch dspy.Predict constructor + original_predict = dspy.Predict + def mock_predict_constructor(signature): + from cairo_coder.dspy.suggestion_program import SuggestionGeneration + if signature is SuggestionGeneration: + return mock_program + return original_predict(signature) + monkeypatch.setattr("dspy.Predict", mock_predict_constructor) - def test_suggestion_generation_success(self, client: TestClient): + """Test suite for the suggestion generation endpoint.""" + def test_suggestion_generation_success(self, monkeypatch, client: TestClient): """Test successful suggestion generation with chat history.""" response = client.post( "/v1/suggestions", diff --git a/python/tests/unit/db/test_repository.py b/python/tests/unit/db/test_repository.py new file mode 100644 index 0000000..d50dcb7 --- /dev/null +++ b/python/tests/unit/db/test_repository.py @@ -0,0 +1,190 @@ +""" +Unit tests for cairo_coder.db.repository using an ephemeral Postgres DB. + +These tests use the shared DB fixtures from tests/integration/conftest.py and run +by default. They are automatically skipped if Docker is unavailable. + +To skip these tests explicitly, use: pytest -m "not db" +""" + +from __future__ import annotations + +import uuid +from datetime import datetime, timedelta, timezone +from urllib.parse import urlparse + +import pytest + +# Import shared fixtures from integration conftest +pytest_plugins = ["tests.integration.conftest"] + +pytestmark = pytest.mark.db + + +@pytest.fixture(autouse=True) +def configure_test_db_env(postgres_container, monkeypatch): + """Auto-configure environment variables for DB tests.""" + raw_dsn = postgres_container.get_connection_url() + dsn = raw_dsn.replace("postgresql+psycopg2", "postgresql") + parsed = urlparse(dsn) + + monkeypatch.setenv("POSTGRES_HOST", parsed.hostname or "127.0.0.1") + monkeypatch.setenv("POSTGRES_PORT", str(parsed.port or 5432)) + monkeypatch.setenv("POSTGRES_DB", (parsed.path or "/postgres").lstrip("/")) + monkeypatch.setenv("POSTGRES_USER", parsed.username or "postgres") + monkeypatch.setenv("POSTGRES_PASSWORD", parsed.password or "postgres") + monkeypatch.setenv("POSTGRES_TABLE_NAME", "documents") + + +@pytest.mark.asyncio +async def test_create_user_interaction(test_db_pool, db_connection): + from cairo_coder.db.models import UserInteraction + from cairo_coder.db.repository import create_user_interaction + + interaction = UserInteraction( + agent_id="cairo-coder", + mcp_mode=False, + chat_history=[{"role": "user", "content": "Hello"}], + query="Hello", + generated_answer="Hi", + retrieved_sources=[{"pageContent": "Cairo", "metadata": {"source": "cairo_book"}}], + llm_usage={"model": {"prompt_tokens": 1, "completion_tokens": 1, "total_tokens": 2}}, + ) + + await create_user_interaction(interaction) + + row = await db_connection.fetchrow("SELECT * FROM user_interactions WHERE id = $1", interaction.id) + assert row is not None + assert row["agent_id"] == "cairo-coder" + assert row["query"] == "Hello" + + +@pytest.mark.asyncio +async def test_get_interactions(test_db_pool, db_connection): + from cairo_coder.db.repository import get_interactions + + now = datetime.now(timezone.utc) + # Seed 3 records + await db_connection.execute( + """ + INSERT INTO user_interactions (id, created_at, agent_id, mcp_mode, query) + VALUES ($1, $2, $3, $4, $5), + ($6, $7, $8, $9, $10), + ($11, $12, $13, $14, $15) + """, + uuid.uuid4(), now - timedelta(hours=4), "cairo-coder", False, "How to deploy a contract", + uuid.uuid4(), now - timedelta(hours=2), "starknet-agent", True, "Write a test for my contract", + uuid.uuid4(), now - timedelta(minutes=30), "cairo-coder", False, "Deploy my contract to testnet", + ) + + # Fetch all + items, total = await get_interactions(now - timedelta(days=1), now + timedelta(minutes=1), None, 100, 0) + assert total == 3 + assert len(items) == 3 + + # Filter by agent + items, total = await get_interactions(now - timedelta(days=1), now + timedelta(minutes=1), "cairo-coder", 100, 0) + assert total == 2 + assert all(it["agent_id"] == "cairo-coder" for it in items) + + # Filter by query text (case-insensitive) + items, total = await get_interactions(now - timedelta(days=1), now + timedelta(minutes=1), None, 100, 0, query_text="deploy") + assert total == 2 + assert all("deploy" in it["query"].lower() for it in items) + + # Filter by query text with no matches + items, total = await get_interactions(now - timedelta(days=1), now + timedelta(minutes=1), None, 100, 0, query_text="nonexistent") + assert total == 0 + assert items == [] + + # Combine agent_id and query_text filters + items, total = await get_interactions(now - timedelta(days=1), now + timedelta(minutes=1), "cairo-coder", 100, 0, query_text="deploy") + assert total == 2 + assert all(it["agent_id"] == "cairo-coder" and "deploy" in it["query"].lower() for it in items) + + # Pagination + items_page_1, _ = await get_interactions(now - timedelta(days=1), now + timedelta(minutes=1), None, 2, 0) + items_page_2, _ = await get_interactions(now - timedelta(days=1), now + timedelta(minutes=1), None, 2, 2) + assert len(items_page_1) == 2 + assert len(items_page_2) == 1 + + # No match with date range + items, total = await get_interactions(now - timedelta(days=10), now - timedelta(days=9), None, 100, 0) + assert total == 0 + assert items == [] + + # No dates provided - should return all items ordered by created_at DESC + items, total = await get_interactions(None, None, None, 100, 0) + assert total == 3 + assert len(items) == 3 + # Verify DESC order (most recent first) + assert items[0]["created_at"] > items[1]["created_at"] + assert items[1]["created_at"] > items[2]["created_at"] + + # No dates with limit + items, total = await get_interactions(None, None, None, 2, 0) + assert total == 3 + assert len(items) == 2 + + # No dates but with agent filter + items, total = await get_interactions(None, None, "cairo-coder", 100, 0) + assert total == 2 + assert all(it["agent_id"] == "cairo-coder" for it in items) + + +@pytest.mark.asyncio +async def test_migrate_user_interaction_upsert(test_db_pool, db_connection): + """Test that migrate_user_interaction performs upsert (insert or update).""" + from cairo_coder.db.models import UserInteraction + from cairo_coder.db.repository import migrate_user_interaction + + interaction_id = uuid.uuid4() + interaction = UserInteraction( + id=interaction_id, + agent_id="cairo-coder", + mcp_mode=False, + chat_history=[{"role": "user", "content": "First question"}], + query="Second question", + generated_answer="Answer here", + retrieved_sources=None, + llm_usage=None, + ) + + # First migration should insert + was_modified, was_inserted = await migrate_user_interaction(interaction) + assert was_modified is True + assert was_inserted is True + + # Verify it's in the database + row = await db_connection.fetchrow("SELECT * FROM user_interactions WHERE id = $1", interaction_id) + assert row is not None + assert row["query"] == "Second question" + assert row["generated_answer"] == "Answer here" + + # Second migration with same ID but different data should update + interaction_updated = UserInteraction( + id=interaction_id, + agent_id="cairo-coder", + mcp_mode=True, # Changed + chat_history=[{"role": "user", "content": "First question"}], + query="Updated question", # Changed + generated_answer="Updated answer", # Changed + retrieved_sources=None, + llm_usage=None, + ) + + was_modified_again, was_inserted_again = await migrate_user_interaction(interaction_updated) + assert was_modified_again is True + assert was_inserted_again is False # Updated, not inserted + + # Verify data was updated + row_updated = await db_connection.fetchrow("SELECT * FROM user_interactions WHERE id = $1", interaction_id) + assert row_updated is not None + assert row_updated["query"] == "Updated question" + assert row_updated["generated_answer"] == "Updated answer" + assert row_updated["mcp_mode"] is True + + # Verify still only one record + count = await db_connection.fetchval("SELECT COUNT(*) FROM user_interactions WHERE id = $1", interaction_id) + assert count == 1 + diff --git a/python/tests/unit/test_migrate_langsmith.py b/python/tests/unit/test_migrate_langsmith.py new file mode 100644 index 0000000..04bcc17 --- /dev/null +++ b/python/tests/unit/test_migrate_langsmith.py @@ -0,0 +1,326 @@ +"""Tests for LangSmith migration utilities.""" + +from __future__ import annotations + +import uuid +from urllib.parse import urlparse + +import pytest + +from cairo_coder_tools.datasets.migrate_langsmith import transform_run_to_interaction + +# Import test fixtures from integration tests +pytest_plugins = ["tests.integration.conftest"] + +pytestmark = pytest.mark.db + + +@pytest.fixture(autouse=True) +def configure_test_db_env(postgres_container, monkeypatch): + """Auto-configure environment variables for DB tests.""" + raw_dsn = postgres_container.get_connection_url() + dsn = raw_dsn.replace("postgresql+psycopg2", "postgresql") + parsed = urlparse(dsn) + + monkeypatch.setenv("POSTGRES_HOST", parsed.hostname or "127.0.0.1") + monkeypatch.setenv("POSTGRES_PORT", str(parsed.port or 5432)) + monkeypatch.setenv("POSTGRES_DB", (parsed.path or "/postgres").lstrip("/")) + monkeypatch.setenv("POSTGRES_USER", parsed.username or "postgres") + monkeypatch.setenv("POSTGRES_PASSWORD", parsed.password or "postgres") + monkeypatch.setenv("POSTGRES_TABLE_NAME", "documents") + + +def test_transform_run_to_interaction_simple(): + """Test transformation with no chat history.""" + from datetime import datetime, timezone + + run_id_str = str(uuid.uuid4()) + run = { + "run_id": run_id_str, + "query": "What is Cairo?", + "chat_history": [], + "output": "Cairo is a programming language...", + "mcp_mode": False, + "created_at": "2025-01-15T10:30:00Z", + } + + interaction = transform_run_to_interaction(run) + + assert str(interaction.id) == run_id_str + assert interaction.query == "What is Cairo?" + assert interaction.chat_history is None # Empty list becomes None + assert interaction.generated_answer == "Cairo is a programming language..." + assert interaction.mcp_mode is False + assert interaction.created_at == datetime(2025, 1, 15, 10, 30, 0, tzinfo=timezone.utc) + + +def test_transform_run_to_interaction_with_history(): + """Test transformation with chat history.""" + run_id_str = str(uuid.uuid4()) + run = { + "run_id": run_id_str, + "query": "How do I deploy it?", + "chat_history": [ + {"role": "user", "content": "What is Cairo?"}, + {"role": "assistant", "content": "Cairo is a programming language..."}, + ], + "output": "You can deploy using...", + "mcp_mode": True, + } + + interaction = transform_run_to_interaction(run) + + assert str(interaction.id) == run_id_str + assert interaction.query == "How do I deploy it?" + assert interaction.chat_history == [ + {"role": "user", "content": "What is Cairo?"}, + {"role": "assistant", "content": "Cairo is a programming language..."}, + ] + assert interaction.generated_answer == "You can deploy using..." + assert interaction.mcp_mode is True + + +def test_transform_run_to_interaction_missing_run_id(): + """Test that missing run_id raises ValueError.""" + run = { + "query": "Test query", + "chat_history": [], + "output": "Test output", + } + + with pytest.raises(ValueError, match="missing 'run_id'"): + transform_run_to_interaction(run) + + +def test_transform_run_to_interaction_missing_query(): + """Test that missing query field raises ValueError.""" + run = { + "run_id": str(uuid.uuid4()), + "chat_history": [], + "output": "Test output", + } + + with pytest.raises(ValueError, match="has no query field"): + transform_run_to_interaction(run) + + +def test_transform_run_to_interaction_invalid_uuid(): + """Test that invalid run_id raises ValueError.""" + run = { + "run_id": "not-a-uuid", + "query": "Test query", + "chat_history": [], + "output": "Test output", + } + + with pytest.raises(ValueError, match="Invalid run_id format"): + transform_run_to_interaction(run) + + +def test_transform_run_to_interaction_defaults(): + """Test that missing optional fields use defaults.""" + from datetime import datetime, timezone + + run_id = str(uuid.uuid4()) + run = { + "run_id": run_id, + "query": "Test query", + # mcp_mode, output, and created_at missing + } + + before_migration = datetime.now(timezone.utc) + interaction = transform_run_to_interaction(run) + after_migration = datetime.now(timezone.utc) + + assert str(interaction.id) == run_id + assert interaction.query == "Test query" + assert interaction.mcp_mode is False # Default + assert interaction.generated_answer == "" # Default + assert interaction.retrieved_sources is None + assert interaction.llm_usage is None + # Verify created_at defaults to migration time (within a reasonable window) + assert before_migration <= interaction.created_at <= after_migration + + +def test_transform_run_to_interaction_with_timestamp(): + """Test that created_at timestamp is preserved from LangSmith run.""" + from datetime import datetime, timezone + + run_id = str(uuid.uuid4()) + original_timestamp = datetime(2025, 1, 10, 15, 45, 30, tzinfo=timezone.utc) + + run = { + "run_id": run_id, + "query": "Test query", + "mcp_mode": False, + "output": "Test output", + "created_at": original_timestamp.isoformat(), + } + + interaction = transform_run_to_interaction(run) + + # Verify the original timestamp is preserved exactly + assert interaction.created_at == original_timestamp + + +def test_transform_run_to_interaction_with_datetime_object(): + """Test that created_at works with datetime objects.""" + from datetime import datetime, timezone + + run_id = str(uuid.uuid4()) + original_timestamp = datetime(2025, 1, 10, 15, 45, 30, tzinfo=timezone.utc) + + run = { + "run_id": run_id, + "query": "Test query", + "mcp_mode": False, + "output": "Test output", + "created_at": original_timestamp, # Pass as datetime object + } + + interaction = transform_run_to_interaction(run) + + # Verify the original timestamp is preserved + assert interaction.created_at == original_timestamp + + +@pytest.mark.asyncio +async def test_migrate_runs(test_db_pool, db_connection): + """Test migration with direct format (query + chat_history).""" + from datetime import datetime, timezone + + from cairo_coder_tools.datasets.migrate_langsmith import migrate_runs + + run_id = str(uuid.uuid4()) + timestamp = datetime(2025, 1, 15, 14, 30, 0, tzinfo=timezone.utc) + + # New format with query + chat_history + runs = [ + { + "run_id": run_id, + "query": "How do I deploy a contract?", + "chat_history": [ + {"role": "user", "content": "What is Cairo?"}, + {"role": "assistant", "content": "Cairo is a programming language..."}, + ], + "output": "You can deploy using Scarb...", + "mcp_mode": False, + "created_at": timestamp.isoformat(), + } + ] + + stats = await migrate_runs(runs) + + assert stats["runs_processed"] == 1 + assert stats["inserted"] == 1 + assert stats["updated"] == 0 + assert stats["failed"] == 0 + + # Verify data in database + row = await db_connection.fetchrow( + "SELECT * FROM user_interactions WHERE id = $1", uuid.UUID(run_id) + ) + assert row is not None + assert row["query"] == "How do I deploy a contract?" + assert row["generated_answer"] == "You can deploy using Scarb..." + assert row["created_at"] == timestamp + + # Verify chat_history is preserved + import json + chat_history = row["chat_history"] + if isinstance(chat_history, str): + chat_history = json.loads(chat_history) + assert len(chat_history) == 2 + assert chat_history[0]["role"] == "user" + assert chat_history[0]["content"] == "What is Cairo?" + + +@pytest.mark.asyncio +async def test_migrate_runs_with_invalid_data(test_db_pool): + """Test that invalid runs are counted as failed.""" + from datetime import datetime, timezone + + from cairo_coder_tools.datasets.migrate_langsmith import migrate_runs + + valid_run_id = str(uuid.uuid4()) + runs = [ + { + "run_id": valid_run_id, + "query": "Valid query", + "mcp_mode": False, + "output": "Valid output", + "created_at": datetime(2025, 1, 10, 10, 0, 0, tzinfo=timezone.utc).isoformat(), + }, + { + "run_id": "invalid-uuid", # Invalid UUID + "query": "Query", + "mcp_mode": False, + "output": "Output", + "created_at": datetime(2025, 1, 11, 10, 0, 0, tzinfo=timezone.utc).isoformat(), + }, + { + "run_id": str(uuid.uuid4()), + # Missing query field + "mcp_mode": False, + "output": "Output", + "created_at": datetime(2025, 1, 12, 10, 0, 0, tzinfo=timezone.utc).isoformat(), + }, + ] + + stats = await migrate_runs(runs) + + assert stats["runs_processed"] == 1 # Only the valid one processed + assert stats["inserted"] == 1 # Only the valid one inserted + assert stats["updated"] == 0 + assert stats["failed"] == 2 # Two invalid runs failed + + +@pytest.mark.asyncio +async def test_migrate_runs_upsert(test_db_pool, db_connection): + """Test that migrate_runs performs upsert (update on conflict).""" + from datetime import datetime, timezone + + from cairo_coder_tools.datasets.migrate_langsmith import migrate_runs + + run_id = str(uuid.uuid4()) + timestamp = datetime(2025, 1, 15, 14, 30, 0, tzinfo=timezone.utc) + + # First insert + runs = [ + { + "run_id": run_id, + "query": "Original query", + "chat_history": [], + "output": "Original answer", + "mcp_mode": False, + "created_at": timestamp.isoformat(), + } + ] + + stats1 = await migrate_runs(runs) + assert stats1["inserted"] == 1 + assert stats1["updated"] == 0 + + # Update the same run + runs_updated = [ + { + "run_id": run_id, + "query": "Updated query", + "chat_history": [{"role": "user", "content": "Previous context"}], + "output": "Updated answer", + "mcp_mode": True, + "created_at": timestamp.isoformat(), + } + ] + + stats2 = await migrate_runs(runs_updated) + assert stats2["inserted"] == 0 + assert stats2["updated"] == 1 + + # Verify updated data + row = await db_connection.fetchrow( + "SELECT * FROM user_interactions WHERE id = $1", uuid.UUID(run_id) + ) + assert row["query"] == "Updated query" + assert row["generated_answer"] == "Updated answer" + assert row["mcp_mode"] is True diff --git a/python/uv.lock b/python/uv.lock index 16124f7..4f33b46 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -345,7 +345,7 @@ wheels = [ [[package]] name = "cairo-coder" -version = "0.2.2" +version = "0.3.0" source = { editable = "." } dependencies = [ { name = "aiohttp" }, @@ -1580,6 +1580,8 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7f/91/ae2eb6b7979e2f9b035a9f612cf70f1bf54aad4e1d125129bef1eae96f19/greenlet-3.2.4-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c2ca18a03a8cfb5b25bc1cbe20f3d9a4c80d8c3b13ba3df49ac3961af0b1018d", size = 584358, upload-time = "2025-08-07T13:18:23.708Z" }, { url = "https://files.pythonhosted.org/packages/f7/85/433de0c9c0252b22b16d413c9407e6cb3b41df7389afc366ca204dbc1393/greenlet-3.2.4-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:9fe0a28a7b952a21e2c062cd5756d34354117796c6d9215a87f55e38d15402c5", size = 1113550, upload-time = "2025-08-07T13:42:37.467Z" }, { url = "https://files.pythonhosted.org/packages/a1/8d/88f3ebd2bc96bf7747093696f4335a0a8a4c5acfcf1b757717c0d2474ba3/greenlet-3.2.4-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8854167e06950ca75b898b104b63cc646573aa5fef1353d4508ecdd1ee76254f", size = 1137126, upload-time = "2025-08-07T13:18:20.239Z" }, + { url = "https://files.pythonhosted.org/packages/f1/29/74242b7d72385e29bcc5563fba67dad94943d7cd03552bac320d597f29b2/greenlet-3.2.4-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:f47617f698838ba98f4ff4189aef02e7343952df3a615f847bb575c3feb177a7", size = 1544904, upload-time = "2025-11-04T12:42:04.763Z" }, + { url = "https://files.pythonhosted.org/packages/c8/e2/1572b8eeab0f77df5f6729d6ab6b141e4a84ee8eb9bc8c1e7918f94eda6d/greenlet-3.2.4-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:af41be48a4f60429d5cad9d22175217805098a9ef7c40bfef44f7669fb9d74d8", size = 1611228, upload-time = "2025-11-04T12:42:08.423Z" }, { url = "https://files.pythonhosted.org/packages/d6/6f/b60b0291d9623c496638c582297ead61f43c4b72eef5e9c926ef4565ec13/greenlet-3.2.4-cp310-cp310-win_amd64.whl", hash = "sha256:73f49b5368b5359d04e18d15828eecc1806033db5233397748f4ca813ff1056c", size = 298654, upload-time = "2025-08-07T13:50:00.469Z" }, { url = "https://files.pythonhosted.org/packages/a4/de/f28ced0a67749cac23fecb02b694f6473f47686dff6afaa211d186e2ef9c/greenlet-3.2.4-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:96378df1de302bc38e99c3a9aa311967b7dc80ced1dcc6f171e99842987882a2", size = 272305, upload-time = "2025-08-07T13:15:41.288Z" }, { url = "https://files.pythonhosted.org/packages/09/16/2c3792cba130000bf2a31c5272999113f4764fd9d874fb257ff588ac779a/greenlet-3.2.4-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:1ee8fae0519a337f2329cb78bd7a8e128ec0f881073d43f023c7b8d4831d5246", size = 632472, upload-time = "2025-08-07T13:42:55.044Z" }, @@ -1589,6 +1591,8 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1f/8e/abdd3f14d735b2929290a018ecf133c901be4874b858dd1c604b9319f064/greenlet-3.2.4-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2523e5246274f54fdadbce8494458a2ebdcdbc7b802318466ac5606d3cded1f8", size = 587684, upload-time = "2025-08-07T13:18:25.164Z" }, { url = "https://files.pythonhosted.org/packages/5d/65/deb2a69c3e5996439b0176f6651e0052542bb6c8f8ec2e3fba97c9768805/greenlet-3.2.4-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:1987de92fec508535687fb807a5cea1560f6196285a4cde35c100b8cd632cc52", size = 1116647, upload-time = "2025-08-07T13:42:38.655Z" }, { url = "https://files.pythonhosted.org/packages/3f/cc/b07000438a29ac5cfb2194bfc128151d52f333cee74dd7dfe3fb733fc16c/greenlet-3.2.4-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:55e9c5affaa6775e2c6b67659f3a71684de4c549b3dd9afca3bc773533d284fa", size = 1142073, upload-time = "2025-08-07T13:18:21.737Z" }, + { url = "https://files.pythonhosted.org/packages/67/24/28a5b2fa42d12b3d7e5614145f0bd89714c34c08be6aabe39c14dd52db34/greenlet-3.2.4-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:c9c6de1940a7d828635fbd254d69db79e54619f165ee7ce32fda763a9cb6a58c", size = 1548385, upload-time = "2025-11-04T12:42:11.067Z" }, + { url = "https://files.pythonhosted.org/packages/6a/05/03f2f0bdd0b0ff9a4f7b99333d57b53a7709c27723ec8123056b084e69cd/greenlet-3.2.4-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:03c5136e7be905045160b1b9fdca93dd6727b180feeafda6818e6496434ed8c5", size = 1613329, upload-time = "2025-11-04T12:42:12.928Z" }, { url = "https://files.pythonhosted.org/packages/d8/0f/30aef242fcab550b0b3520b8e3561156857c94288f0332a79928c31a52cf/greenlet-3.2.4-cp311-cp311-win_amd64.whl", hash = "sha256:9c40adce87eaa9ddb593ccb0fa6a07caf34015a29bf8d344811665b573138db9", size = 299100, upload-time = "2025-08-07T13:44:12.287Z" }, { url = "https://files.pythonhosted.org/packages/44/69/9b804adb5fd0671f367781560eb5eb586c4d495277c93bde4307b9e28068/greenlet-3.2.4-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:3b67ca49f54cede0186854a008109d6ee71f66bd57bb36abd6d0a0267b540cdd", size = 274079, upload-time = "2025-08-07T13:15:45.033Z" }, { url = "https://files.pythonhosted.org/packages/46/e9/d2a80c99f19a153eff70bc451ab78615583b8dac0754cfb942223d2c1a0d/greenlet-3.2.4-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:ddf9164e7a5b08e9d22511526865780a576f19ddd00d62f8a665949327fde8bb", size = 640997, upload-time = "2025-08-07T13:42:56.234Z" }, @@ -1598,6 +1602,8 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/19/0d/6660d55f7373b2ff8152401a83e02084956da23ae58cddbfb0b330978fe9/greenlet-3.2.4-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:3b3812d8d0c9579967815af437d96623f45c0f2ae5f04e366de62a12d83a8fb0", size = 607586, upload-time = "2025-08-07T13:18:28.544Z" }, { url = "https://files.pythonhosted.org/packages/8e/1a/c953fdedd22d81ee4629afbb38d2f9d71e37d23caace44775a3a969147d4/greenlet-3.2.4-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:abbf57b5a870d30c4675928c37278493044d7c14378350b3aa5d484fa65575f0", size = 1123281, upload-time = "2025-08-07T13:42:39.858Z" }, { url = "https://files.pythonhosted.org/packages/3f/c7/12381b18e21aef2c6bd3a636da1088b888b97b7a0362fac2e4de92405f97/greenlet-3.2.4-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:20fb936b4652b6e307b8f347665e2c615540d4b42b3b4c8a321d8286da7e520f", size = 1151142, upload-time = "2025-08-07T13:18:22.981Z" }, + { url = "https://files.pythonhosted.org/packages/27/45/80935968b53cfd3f33cf99ea5f08227f2646e044568c9b1555b58ffd61c2/greenlet-3.2.4-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:ee7a6ec486883397d70eec05059353b8e83eca9168b9f3f9a361971e77e0bcd0", size = 1564846, upload-time = "2025-11-04T12:42:15.191Z" }, + { url = "https://files.pythonhosted.org/packages/69/02/b7c30e5e04752cb4db6202a3858b149c0710e5453b71a3b2aec5d78a1aab/greenlet-3.2.4-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:326d234cbf337c9c3def0676412eb7040a35a768efc92504b947b3e9cfc7543d", size = 1633814, upload-time = "2025-11-04T12:42:17.175Z" }, { url = "https://files.pythonhosted.org/packages/e9/08/b0814846b79399e585f974bbeebf5580fbe59e258ea7be64d9dfb253c84f/greenlet-3.2.4-cp312-cp312-win_amd64.whl", hash = "sha256:a7d4e128405eea3814a12cc2605e0e6aedb4035bf32697f72deca74de4105e02", size = 299899, upload-time = "2025-08-07T13:38:53.448Z" }, { url = "https://files.pythonhosted.org/packages/49/e8/58c7f85958bda41dafea50497cbd59738c5c43dbbea5ee83d651234398f4/greenlet-3.2.4-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:1a921e542453fe531144e91e1feedf12e07351b1cf6c9e8a3325ea600a715a31", size = 272814, upload-time = "2025-08-07T13:15:50.011Z" }, { url = "https://files.pythonhosted.org/packages/62/dd/b9f59862e9e257a16e4e610480cfffd29e3fae018a68c2332090b53aac3d/greenlet-3.2.4-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:cd3c8e693bff0fff6ba55f140bf390fa92c994083f838fece0f63be121334945", size = 641073, upload-time = "2025-08-07T13:42:57.23Z" }, @@ -1607,6 +1613,8 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ee/43/3cecdc0349359e1a527cbf2e3e28e5f8f06d3343aaf82ca13437a9aa290f/greenlet-3.2.4-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:23768528f2911bcd7e475210822ffb5254ed10d71f4028387e5a99b4c6699671", size = 610497, upload-time = "2025-08-07T13:18:31.636Z" }, { url = "https://files.pythonhosted.org/packages/b8/19/06b6cf5d604e2c382a6f31cafafd6f33d5dea706f4db7bdab184bad2b21d/greenlet-3.2.4-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:00fadb3fedccc447f517ee0d3fd8fe49eae949e1cd0f6a611818f4f6fb7dc83b", size = 1121662, upload-time = "2025-08-07T13:42:41.117Z" }, { url = "https://files.pythonhosted.org/packages/a2/15/0d5e4e1a66fab130d98168fe984c509249c833c1a3c16806b90f253ce7b9/greenlet-3.2.4-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:d25c5091190f2dc0eaa3f950252122edbbadbb682aa7b1ef2f8af0f8c0afefae", size = 1149210, upload-time = "2025-08-07T13:18:24.072Z" }, + { url = "https://files.pythonhosted.org/packages/1c/53/f9c440463b3057485b8594d7a638bed53ba531165ef0ca0e6c364b5cc807/greenlet-3.2.4-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:6e343822feb58ac4d0a1211bd9399de2b3a04963ddeec21530fc426cc121f19b", size = 1564759, upload-time = "2025-11-04T12:42:19.395Z" }, + { url = "https://files.pythonhosted.org/packages/47/e4/3bb4240abdd0a8d23f4f88adec746a3099f0d86bfedb623f063b2e3b4df0/greenlet-3.2.4-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:ca7f6f1f2649b89ce02f6f229d7c19f680a6238af656f61e0115b24857917929", size = 1634288, upload-time = "2025-11-04T12:42:21.174Z" }, { url = "https://files.pythonhosted.org/packages/0b/55/2321e43595e6801e105fcfdee02b34c0f996eb71e6ddffca6b10b7e1d771/greenlet-3.2.4-cp313-cp313-win_amd64.whl", hash = "sha256:554b03b6e73aaabec3745364d6239e9e012d64c68ccd0b8430c64ccc14939a8b", size = 299685, upload-time = "2025-08-07T13:24:38.824Z" }, { url = "https://files.pythonhosted.org/packages/22/5c/85273fd7cc388285632b0498dbbab97596e04b154933dfe0f3e68156c68c/greenlet-3.2.4-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:49a30d5fda2507ae77be16479bdb62a660fa51b1eb4928b524975b3bde77b3c0", size = 273586, upload-time = "2025-08-07T13:16:08.004Z" }, { url = "https://files.pythonhosted.org/packages/d1/75/10aeeaa3da9332c2e761e4c50d4c3556c21113ee3f0afa2cf5769946f7a3/greenlet-3.2.4-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:299fd615cd8fc86267b47597123e3f43ad79c9d8a22bebdce535e53550763e2f", size = 686346, upload-time = "2025-08-07T13:42:59.944Z" }, @@ -1614,6 +1622,8 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/dc/8b/29aae55436521f1d6f8ff4e12fb676f3400de7fcf27fccd1d4d17fd8fecd/greenlet-3.2.4-cp314-cp314-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:b4a1870c51720687af7fa3e7cda6d08d801dae660f75a76f3845b642b4da6ee1", size = 694659, upload-time = "2025-08-07T13:53:17.759Z" }, { url = "https://files.pythonhosted.org/packages/92/2e/ea25914b1ebfde93b6fc4ff46d6864564fba59024e928bdc7de475affc25/greenlet-3.2.4-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:061dc4cf2c34852b052a8620d40f36324554bc192be474b9e9770e8c042fd735", size = 695355, upload-time = "2025-08-07T13:18:34.517Z" }, { url = "https://files.pythonhosted.org/packages/72/60/fc56c62046ec17f6b0d3060564562c64c862948c9d4bc8aa807cf5bd74f4/greenlet-3.2.4-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:44358b9bf66c8576a9f57a590d5f5d6e72fa4228b763d0e43fee6d3b06d3a337", size = 657512, upload-time = "2025-08-07T13:18:33.969Z" }, + { url = "https://files.pythonhosted.org/packages/23/6e/74407aed965a4ab6ddd93a7ded3180b730d281c77b765788419484cdfeef/greenlet-3.2.4-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:2917bdf657f5859fbf3386b12d68ede4cf1f04c90c3a6bc1f013dd68a22e2269", size = 1612508, upload-time = "2025-11-04T12:42:23.427Z" }, + { url = "https://files.pythonhosted.org/packages/0d/da/343cd760ab2f92bac1845ca07ee3faea9fe52bee65f7bcb19f16ad7de08b/greenlet-3.2.4-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:015d48959d4add5d6c9f6c5210ee3803a830dce46356e3bc326d6776bde54681", size = 1680760, upload-time = "2025-11-04T12:42:25.341Z" }, { url = "https://files.pythonhosted.org/packages/e3/a5/6ddab2b4c112be95601c13428db1d8b6608a8b6039816f2ba09c346c08fc/greenlet-3.2.4-cp314-cp314-win_amd64.whl", hash = "sha256:e37ab26028f12dbb0ff65f29a8d3d44a765c61e729647bf2ddfbbed621726f01", size = 303425, upload-time = "2025-08-07T13:32:27.59Z" }, ]