Skip to content

Commit a702ec0

Browse files
committed
feat: insights API
1 parent ec7e07f commit a702ec0

File tree

20 files changed

+1899
-102
lines changed

20 files changed

+1899
-102
lines changed

API_DOCUMENTATION.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,44 @@ Common cases:
352352
- `404 Not Found` — unknown agent id.
353353
- `500 Internal Server Error` — unexpected backend issues.
354354

355+
## Query Insights
356+
357+
The Query Insights API exposes raw interaction logs and lightweight analytics for downstream processing.
358+
359+
### `GET /v1/insights/queries`
360+
361+
Fetch paginated user queries. If no date range is provided, returns the most recent queries ordered by creation time.
362+
363+
- `start_date` _(ISO 8601, optional)_ — inclusive lower bound for filtering by creation time.
364+
- `end_date` _(ISO 8601, optional)_ — inclusive upper bound for filtering by creation time.
365+
- `agent_id` _(optional)_ — filter by agent id when provided.
366+
- `query_text` _(optional)_ — filter by text contained in the query (case-insensitive).
367+
- `limit` _(default `100`)_ — maximum rows returned.
368+
- `offset` _(default `0`)_ — pagination offset.
369+
370+
**Response** `200 OK`
371+
372+
```json
373+
{
374+
"items": [
375+
{
376+
"id": "ad0c2b34-04ab-4d0a-9855-47c19f0f2830",
377+
"created_at": "2024-04-01T12:30:45.123456+00:00",
378+
"agent_id": "cairo-coder",
379+
"query": "How do I declare a storage variable in Cairo 1?",
380+
"chat_history": [
381+
{"role": "user", "content": "What is Cairo?"},
382+
{"role": "assistant", "content": "Cairo is a programming language..."}
383+
],
384+
"output": "To declare a storage variable in Cairo 1, you use the #[storage] attribute..."
385+
}
386+
],
387+
"total": 1,
388+
"limit": 100,
389+
"offset": 0
390+
}
391+
```
392+
355393
## Versioning & Compatibility
356394

357395
- Current API version: `1.0.0` (see FastAPI metadata).

python/pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ strict_optional = true
142142
testpaths = ["tests"]
143143
pythonpath = ["src"]
144144
asyncio_mode = "auto"
145+
markers = [
146+
"db: marks tests that require a database (run by default, use -m 'not db' to skip)",
147+
]
145148
filterwarnings = [
146149
"ignore::DeprecationWarning",
147150
"ignore::PendingDeprecationWarning",

python/src/cairo_coder/core/rag_pipeline.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@ def __init__(self, config: RagPipelineConfig):
8282
self._current_processed_query: ProcessedQuery | None = None
8383
self._current_documents: list[Document] = []
8484

85+
@property
86+
def last_retrieved_documents(self) -> list[Document]:
87+
"""Documents retrieved during the most recent pipeline execution."""
88+
return self._current_documents
89+
8590
async def _aprocess_query_and_retrieve_docs(
8691
self,
8792
query: str,
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
"""
2+
Database utilities for the Cairo Coder server.
3+
4+
This package exposes helpers for initializing the asyncpg connection pool and
5+
provides Pydantic representations used when persisting query insights data.
6+
"""
7+
8+
from .models import UserInteraction
9+
from .repository import (
10+
create_user_interaction,
11+
get_interactions,
12+
)
13+
from .session import close_pool, execute_schema_scripts, get_pool
14+
15+
__all__ = [
16+
"UserInteraction",
17+
"create_user_interaction",
18+
"get_interactions",
19+
"close_pool",
20+
"execute_schema_scripts",
21+
"get_pool",
22+
]
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
"""
2+
Pydantic models representing rows stored in the query insights database tables.
3+
"""
4+
5+
from __future__ import annotations
6+
7+
import uuid
8+
from datetime import datetime, timezone
9+
from typing import Any, Optional
10+
11+
from pydantic import BaseModel, Field
12+
13+
14+
class UserInteraction(BaseModel):
15+
"""Represents a record in the user_interactions table."""
16+
17+
id: uuid.UUID = Field(default_factory=uuid.uuid4)
18+
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
19+
agent_id: str
20+
mcp_mode: bool = False
21+
chat_history: Optional[list[dict[str, Any]]] = None
22+
query: str
23+
generated_answer: Optional[str] = None
24+
retrieved_sources: Optional[list[dict[str, Any]]] = None
25+
llm_usage: Optional[dict[str, Any]] = None
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
"""
2+
Data access helpers for the Query Insights persistence layer.
3+
"""
4+
5+
from __future__ import annotations
6+
7+
import json
8+
import uuid
9+
from datetime import datetime
10+
from typing import Any
11+
12+
import structlog
13+
14+
from cairo_coder.db.models import UserInteraction
15+
from cairo_coder.db.session import get_pool
16+
17+
logger = structlog.get_logger(__name__)
18+
19+
20+
def _serialize_json_field(value: Any) -> str | None:
21+
"""
22+
Serialize a Python object to JSON string for database storage.
23+
24+
Args:
25+
value: Python object to serialize (dict, list, etc.)
26+
27+
Returns:
28+
JSON string or None if value is None/empty
29+
"""
30+
if value is None:
31+
return None
32+
return json.dumps(value)
33+
34+
35+
def _normalize_json_field(value: Any, default: Any = None) -> Any:
36+
"""
37+
Normalize a JSON field from database (may be string or already parsed).
38+
39+
Args:
40+
value: Value from database (string, dict, list, or None)
41+
default: Default value to use if parsing fails or value is None
42+
43+
Returns:
44+
Parsed JSON object or default value
45+
"""
46+
if value is None:
47+
return default
48+
if isinstance(value, str):
49+
try:
50+
return json.loads(value)
51+
except (json.JSONDecodeError, TypeError):
52+
return default
53+
return value
54+
55+
56+
def _normalize_row(row: dict | None, fields_with_defaults: dict[str, Any]) -> dict | None:
57+
"""
58+
Parse stringified JSON fields in a row dictionary and apply defaults for None values.
59+
60+
Args:
61+
row: Dictionary from database row (or None)
62+
fields_with_defaults: Mapping of field names to default values
63+
64+
Returns:
65+
Normalized dictionary with parsed JSON fields, or None if input row is None
66+
"""
67+
if row is None:
68+
return None
69+
70+
d = dict(row)
71+
for field, default_val in fields_with_defaults.items():
72+
d[field] = _normalize_json_field(d.get(field), default_val)
73+
return d
74+
75+
76+
async def create_user_interaction(interaction: UserInteraction) -> None:
77+
"""Persist a user interaction in the database."""
78+
pool = await get_pool()
79+
try:
80+
async with pool.acquire() as connection:
81+
await connection.execute(
82+
"""
83+
INSERT INTO user_interactions (
84+
id,
85+
agent_id,
86+
mcp_mode,
87+
chat_history,
88+
query,
89+
generated_answer,
90+
retrieved_sources,
91+
llm_usage
92+
)
93+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
94+
""",
95+
interaction.id,
96+
interaction.agent_id,
97+
interaction.mcp_mode,
98+
_serialize_json_field(interaction.chat_history),
99+
interaction.query,
100+
interaction.generated_answer,
101+
_serialize_json_field(interaction.retrieved_sources),
102+
_serialize_json_field(interaction.llm_usage),
103+
)
104+
logger.debug("User interaction logged successfully", interaction_id=str(interaction.id))
105+
except Exception as exc: # pragma: no cover - defensive logging
106+
logger.error("Failed to log user interaction", error=str(exc), exc_info=True)
107+
108+
109+
async def get_interactions(
110+
start_date: datetime | None,
111+
end_date: datetime | None,
112+
agent_id: str | None,
113+
limit: int,
114+
offset: int,
115+
query_text: str | None = None,
116+
) -> tuple[list[dict[str, Any]], int]:
117+
"""Fetch paginated interactions matching the supplied filters.
118+
119+
If start_date and end_date are not provided, returns the last N interactions
120+
ordered by created_at DESC.
121+
"""
122+
pool = await get_pool()
123+
async with pool.acquire() as connection:
124+
params: list[Any] = []
125+
filters = []
126+
127+
if start_date is not None:
128+
params.append(start_date)
129+
filters.append(f"created_at >= ${len(params)}")
130+
131+
if end_date is not None:
132+
params.append(end_date)
133+
filters.append(f"created_at <= ${len(params)}")
134+
135+
if agent_id:
136+
params.append(agent_id)
137+
filters.append(f"agent_id = ${len(params)}")
138+
139+
if query_text:
140+
params.append(f"%{query_text}%")
141+
filters.append(f"query ILIKE ${len(params)}")
142+
143+
where_clause = "WHERE " + " AND ".join(filters) if filters else ""
144+
145+
count_query = f"""
146+
SELECT COUNT(*)
147+
FROM user_interactions
148+
{where_clause}
149+
"""
150+
total = await connection.fetchval(count_query, *params)
151+
152+
params.extend([limit, offset])
153+
limit_placeholder = len(params) - 1
154+
offset_placeholder = len(params)
155+
data_query = f"""
156+
SELECT id, created_at, agent_id, query, chat_history, generated_answer
157+
FROM user_interactions
158+
{where_clause}
159+
ORDER BY created_at DESC
160+
LIMIT ${limit_placeholder}
161+
OFFSET ${offset_placeholder}
162+
"""
163+
rows = await connection.fetch(data_query, *params)
164+
165+
# Normalize JSON fields that may be returned as strings by asyncpg
166+
items = [_normalize_row(dict(row), {"chat_history": []}) for row in rows]
167+
return items, int(total)
168+
169+
170+
async def migrate_user_interaction(interaction: UserInteraction) -> tuple[bool, bool]:
171+
"""
172+
Persist a user interaction for migration purposes with upsert behavior.
173+
174+
Uses ON CONFLICT DO UPDATE to override existing entries based on the ID.
175+
This allows re-running migrations to update data if needed.
176+
177+
Args:
178+
interaction: UserInteraction model with pre-set ID from LangSmith
179+
180+
Returns:
181+
Tuple of (was_modified, was_inserted) where:
182+
- was_modified: True if any action was taken (insert or update)
183+
- was_inserted: True if inserted, False if updated
184+
"""
185+
pool = await get_pool()
186+
try:
187+
async with pool.acquire() as connection:
188+
# Single upsert round-trip; infer insert vs update via system column
189+
row = await connection.fetchrow(
190+
"""
191+
INSERT INTO user_interactions (
192+
id,
193+
created_at,
194+
agent_id,
195+
mcp_mode,
196+
chat_history,
197+
query,
198+
generated_answer,
199+
retrieved_sources,
200+
llm_usage
201+
)
202+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
203+
ON CONFLICT (id) DO UPDATE SET
204+
created_at = EXCLUDED.created_at,
205+
agent_id = EXCLUDED.agent_id,
206+
mcp_mode = EXCLUDED.mcp_mode,
207+
chat_history = EXCLUDED.chat_history,
208+
query = EXCLUDED.query,
209+
generated_answer = EXCLUDED.generated_answer,
210+
retrieved_sources = EXCLUDED.retrieved_sources,
211+
llm_usage = EXCLUDED.llm_usage
212+
RETURNING (xmax = 0) AS inserted
213+
""",
214+
interaction.id,
215+
interaction.created_at,
216+
interaction.agent_id,
217+
interaction.mcp_mode,
218+
_serialize_json_field(interaction.chat_history),
219+
interaction.query,
220+
interaction.generated_answer,
221+
_serialize_json_field(interaction.retrieved_sources),
222+
_serialize_json_field(interaction.llm_usage),
223+
)
224+
225+
if row is None:
226+
logger.warning("Unexpected: no result from upsert", interaction_id=str(interaction.id))
227+
return False, False
228+
229+
was_inserted = bool(row["inserted"]) if "inserted" in row else False
230+
if was_inserted:
231+
logger.debug("User interaction inserted", interaction_id=str(interaction.id))
232+
else:
233+
logger.debug("User interaction updated", interaction_id=str(interaction.id))
234+
return True, was_inserted
235+
except Exception as exc: # pragma: no cover - defensive logging
236+
logger.error("Failed to migrate user interaction", error=str(exc), exc_info=True)
237+
raise
238+

0 commit comments

Comments
 (0)