-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpolar.py
70 lines (53 loc) · 1.61 KB
/
polar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import asyncio
from pathlib import Path
import polars as pl
from logfire.experimental.query_client import AsyncLogfireQueryClient
from loguru import logger
FILE_NAME = "knd_evals_traces.parquet"
query = """
WITH agent_traces AS (
SELECT DISTINCT trace_id
FROM records
WHERE attributes->>'agent_name' = 'knd_evals'
AND end_timestamp >= NOW() - INTERVAL '10 minutes'
)
SELECT
r.trace_id,
r.span_id,
r.start_timestamp,
r.end_timestamp,
r.duration,
r.level,
r.message,
r.tags,
r.attributes->>'agent_name' as agent_name
FROM records r
JOIN agent_traces at ON r.trace_id = at.trace_id
ORDER BY r.trace_id, r.start_timestamp;
"""
async def get_df() -> pl.DataFrame:
async with AsyncLogfireQueryClient(read_token="H0CTvcy0WCrl6xjxm8r8ZjWxP3LPSq5Mzdv81GvXXRPz") as client:
result = await client.query_arrow(sql=query)
return pl.DataFrame(result)
async def save_df():
df = await get_df()
df.write_parquet(FILE_NAME)
async def main():
if not Path(FILE_NAME).exists():
await save_df()
df = pl.read_parquet(FILE_NAME)
logger.info(df)
total_runs = df["trace_id"].n_unique()
logger.info(f"Total runs: {total_runs}")
res = (
df.filter(pl.col("tags").list.contains("yoo_check"))
.group_by("trace_id", maintain_order=True)
.agg(pl.len().alias("num_check_errors"))
)
logger.info(res)
check_accuracy = (total_runs - res["num_check_errors"].sum()) / total_runs
logger.info(f"Check accuracy: {check_accuracy}")
res = df.filter(pl.col("level") == 17)
logger.info(res)
if __name__ == "__main__":
asyncio.run(main())