Skip to content

Commit b4ea2f4

Browse files
committed
feat: dataset analysis scripts
1 parent 2bf5c43 commit b4ea2f4

File tree

2 files changed

+690
-43
lines changed

2 files changed

+690
-43
lines changed
Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Extract human queries from LangSmith runs and export to JSON.
4+
5+
Retrieves runs from LangSmith within a specified time window, extracts human
6+
messages from various LangChain message formats, and outputs deduplicated
7+
queries grouped by run.
8+
9+
Environment variables:
10+
LANGSMITH_API_KEY: Required for authentication
11+
LANGSMITH_PROJECT: Optional project name (default: "default")
12+
13+
Example usage:
14+
python extract_langsmith.py
15+
python extract_langsmith.py --days 7 --output queries.json
16+
"""
17+
18+
from __future__ import annotations
19+
20+
import argparse
21+
import json
22+
import os
23+
from dataclasses import dataclass
24+
from datetime import datetime, timedelta, timezone
25+
from enum import Enum
26+
from pathlib import Path
27+
from typing import Any
28+
29+
from dotenv import load_dotenv
30+
from langsmith import Client
31+
32+
load_dotenv("../.env")
33+
34+
35+
class MessageType(Enum):
36+
"""Supported message types in LangChain format."""
37+
HUMAN = "human"
38+
HUMANMESSAGE = "humanmessage"
39+
CONSTRUCTOR = "constructor"
40+
41+
42+
@dataclass
43+
class RunQueries:
44+
"""Container for queries extracted from a single run."""
45+
run_id: str
46+
queries: list[str]
47+
48+
def to_dict(self) -> dict[str, Any]:
49+
return {"run_id": str(self.run_id), "queries": self.queries}
50+
51+
52+
@dataclass
53+
class Config:
54+
"""Script configuration."""
55+
output_path: Path
56+
days_back: int
57+
run_name_filter: str
58+
project_name: str
59+
60+
@classmethod
61+
def from_args(cls, args: argparse.Namespace) -> Config:
62+
project_name = os.getenv("LANGSMITH_PROJECT", "default")
63+
return cls(
64+
output_path=Path(args.output),
65+
days_back=args.days,
66+
run_name_filter=args.name,
67+
project_name=project_name,
68+
)
69+
70+
71+
class HumanMessageExtractor:
72+
"""Extracts human messages from LangChain message structures."""
73+
74+
HUMAN_MESSAGE_KEYS = ["chat_history", "messages", "inputs", "input"]
75+
76+
@staticmethod
77+
def _is_human_message_by_id(message: dict[str, Any]) -> bool:
78+
"""Check if message is human type based on ID field."""
79+
msg_id = message["id"]
80+
if isinstance(msg_id, list) and msg_id:
81+
return str(msg_id[-1]).lower() == MessageType.HUMANMESSAGE.value
82+
return False
83+
84+
@staticmethod
85+
def _is_human_message_by_type(message: dict[str, Any]) -> bool:
86+
"""Check if message is human type based on type field."""
87+
msg_type = str(message["type"]).lower()
88+
return msg_type in {MessageType.HUMAN.value, MessageType.HUMANMESSAGE.value}
89+
90+
def _process_message_object(self, obj: Any) -> list[str]:
91+
"""Process a single message object and extract human content."""
92+
# Check if this is a human message
93+
is_human = (
94+
self._is_human_message_by_id(obj) or
95+
self._is_human_message_by_type(obj)
96+
)
97+
if not is_human:
98+
return []
99+
100+
if "kwargs" not in obj or "content" not in obj["kwargs"]:
101+
raise ValueError(f"Expected kwargs and content in message object: {obj}")
102+
103+
content = obj["kwargs"]["content"]
104+
return [content] if content else []
105+
106+
def _process_value(self, value: Any) -> list[str]:
107+
if not isinstance(value, list):
108+
raise ValueError(f"Expected list, got {type(value)}: {value}")
109+
results = []
110+
for item in value:
111+
results.extend(self._process_message_object(item))
112+
return results
113+
114+
def extract(self, inputs: dict[str, Any]) -> list[str]:
115+
"""
116+
Extract human messages from LangChain inputs.
117+
118+
Args:
119+
inputs: Dictionary containing message data in various formats
120+
121+
Returns:
122+
List of unique human message strings in order of appearance
123+
"""
124+
results = []
125+
126+
for key in self.HUMAN_MESSAGE_KEYS:
127+
if key not in inputs:
128+
continue
129+
130+
results.extend(self._process_value(inputs[key]))
131+
132+
# Deduplicate while preserving order
133+
seen = set()
134+
unique_results = []
135+
for query in results:
136+
if query not in seen:
137+
unique_results.append(query)
138+
seen.add(query)
139+
140+
return unique_results
141+
142+
143+
class RunDeduplicator:
144+
"""Removes runs whose queries are prefixes of other runs."""
145+
146+
def deduplicate(self, runs: list[RunQueries]) -> list[RunQueries]:
147+
"""
148+
Remove runs that are prefixes of longer runs.
149+
150+
Args:
151+
runs: List of run queries to deduplicate
152+
153+
Returns:
154+
Filtered list with prefix runs removed
155+
"""
156+
if not runs:
157+
return []
158+
159+
# Sort by query count (longest first) while tracking original indices
160+
indexed_runs = list(enumerate(runs))
161+
indexed_runs.sort(key=lambda x: len(x[1].queries), reverse=True)
162+
163+
kept_queries = []
164+
keep_indices = set()
165+
166+
for idx, run in indexed_runs:
167+
# Skip if this run's queries are a prefix of any kept run
168+
if any(run.queries == kq[:len(run.queries)] for kq in kept_queries):
169+
continue
170+
171+
keep_indices.add(idx)
172+
kept_queries.append(run.queries)
173+
174+
# Restore original order
175+
return [run for idx, run in enumerate(runs) if idx in keep_indices]
176+
177+
178+
class LangSmithExporter:
179+
"""Exports human queries from LangSmith runs."""
180+
181+
def __init__(self, config: Config):
182+
self.config = config
183+
self.client = Client()
184+
self.extractor = HumanMessageExtractor()
185+
self.deduplicator = RunDeduplicator()
186+
187+
def _get_time_range(self) -> tuple[datetime, datetime]:
188+
"""Calculate start and end time for query."""
189+
end_time = datetime.now(timezone.utc)
190+
start_time = end_time - timedelta(days=self.config.days_back)
191+
return start_time, end_time
192+
193+
def fetch_runs(self) -> list[RunQueries]:
194+
start_time, end_time = self._get_time_range()
195+
196+
query_params = {
197+
"start_time": start_time,
198+
"end_time": end_time,
199+
"filter": f'eq(name, "{self.config.run_name_filter}")',
200+
"project_name": self.config.project_name,
201+
}
202+
203+
runs = []
204+
for run in self.client.list_runs(**query_params):
205+
run_data = run.dict()
206+
inputs = run_data["inputs"]
207+
208+
queries = self.extractor.extract(inputs)
209+
if queries:
210+
runs.append(RunQueries(
211+
run_id=run_data["id"],
212+
queries=queries,
213+
))
214+
215+
return runs
216+
217+
def export(self) -> None:
218+
"""Execute the full export pipeline and write results."""
219+
runs = self.fetch_runs()
220+
runs = self.deduplicator.deduplicate(runs)
221+
222+
total_queries = sum(len(run.queries) for run in runs)
223+
224+
output = {"runs": [run.to_dict() for run in runs]}
225+
226+
self.config.output_path.write_text(
227+
json.dumps(output, ensure_ascii=False, indent=2),
228+
encoding="utf-8",
229+
)
230+
231+
print(
232+
f"Exported {len(runs)} runs with {total_queries} queries "
233+
f"to {self.config.output_path}"
234+
)
235+
236+
237+
def parse_arguments() -> argparse.Namespace:
238+
"""Parse command line arguments."""
239+
parser = argparse.ArgumentParser(
240+
description="Export human queries from LangSmith runs to JSON",
241+
formatter_class=argparse.RawDescriptionHelpFormatter,
242+
)
243+
244+
parser.add_argument(
245+
"--output",
246+
default="langsmith_human_queries_last2w.json",
247+
help="Output JSON file path (default: %(default)s)",
248+
)
249+
parser.add_argument(
250+
"--days",
251+
type=int,
252+
default=14,
253+
help="Number of days to look back (default: %(default)s)",
254+
)
255+
parser.add_argument(
256+
"--name",
257+
default="RunnableSequence",
258+
help="Filter runs by name (default: %(default)s)",
259+
)
260+
261+
return parser.parse_args()
262+
263+
264+
def main() -> None:
265+
"""Main entry point."""
266+
args = parse_arguments()
267+
config = Config.from_args(args)
268+
exporter = LangSmithExporter(config)
269+
exporter.export()
270+
271+
272+
if __name__ == "__main__":
273+
main()

0 commit comments

Comments
 (0)