Skip to content

Commit

Permalink
Fix sample stats for step invariant and subquery (#506)
Browse files Browse the repository at this point in the history
  • Loading branch information
harry671003 authored Jan 10, 2025
1 parent 2414c3d commit 14f9955
Show file tree
Hide file tree
Showing 29 changed files with 409 additions and 184 deletions.
23 changes: 22 additions & 1 deletion engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sort"
"time"

"github.com/thanos-io/promql-engine/execution/telemetry"

"github.com/efficientgo/core/errors"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -305,6 +307,9 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts
t: InstantQuery,
resultSort: resultSort,
scanners: scanners,
start: ts,
end: ts,
step: 0,
}, nil
}

Expand Down Expand Up @@ -352,6 +357,9 @@ func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryab
// TODO(fpetkovski): Infer the sort order from the plan, ideally without copying the newResultSort function.
resultSort: noSortResultSort{},
scanners: scnrs,
start: ts,
end: ts,
step: 0,
}, nil
}

Expand Down Expand Up @@ -404,6 +412,9 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *
warns: warns,
t: RangeQuery,
scanners: scnrs,
start: start,
end: end,
step: step,
}, nil
}

Expand Down Expand Up @@ -446,6 +457,9 @@ func (e *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable
warns: warns,
t: RangeQuery,
scanners: scnrs,
start: start,
end: end,
step: step,
}, nil
}

Expand Down Expand Up @@ -522,7 +536,7 @@ func (q *Query) Explain() *ExplainOutputNode {
}

func (q *Query) Analyze() *AnalyzeOutputNode {
if observableRoot, ok := q.exec.(model.ObservableVectorOperator); ok {
if observableRoot, ok := q.exec.(telemetry.ObservableVectorOperator); ok {
return analyzeQuery(observableRoot)
}
return nil
Expand All @@ -534,6 +548,9 @@ type compatibilityQuery struct {
plan logicalplan.Plan
ts time.Time // Empty for range queries.
warns annotations.Annotations
start time.Time
end time.Time
step time.Duration

t QueryType
resultSort resultSorter
Expand Down Expand Up @@ -707,6 +724,10 @@ func (q *compatibilityQuery) Stats() *stats.Statistics {

analysis := q.Analyze()
samples := stats.NewQuerySamples(enablePerStepStats)
if enablePerStepStats {
samples.InitStepTracking(q.start.UnixMilli(), q.end.UnixMilli(), telemetry.StepTrackingInterval(q.step))
}

if analysis != nil {
samples.PeakSamples = int(analysis.PeakSamples())
samples.TotalSamples = analysis.TotalSamples()
Expand Down
71 changes: 67 additions & 4 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4814,6 +4814,47 @@ func TestQueryStats(t *testing.T) {
end: time.Unix(1800, 0),
step: time.Second * 30,
},
{
name: "step invariant with samples",
load: `load 5m
http_requests_total{pod="nginx-1"} 1+1x5
http_requests_total{pod="nginx-2"} 1+2x5`,
query: `sum without (__name__) (http_requests_total @ end())`,
start: time.Unix(1, 0),
end: time.Unix(600, 0),
step: time.Second * 34,
},
{
name: "step invariant without samples",
load: `load 30s
http_requests_total{pod="nginx-1"} 1.00+1.00x15
http_requests_total{pod="nginx-2"} 1+2.00x21`,
query: `pi()`,
start: time.UnixMilli(0),
end: time.UnixMilli(120000),
step: time.Second * 30,
},
{
name: "fuzz subquery without enough samples",
load: `load 30s
http_requests_total{pod="nginx-1"} 1.00+1.00x15
http_requests_total{pod="nginx-2"} 1+2.00x21`,
query: `rate({__name__="http_requests_total"} offset -6s[1h:1m] offset 1m29s)`,
start: time.UnixMilli(0),
end: time.UnixMilli(120000),
step: time.Second * 30,
},
// TODO (harry671003): This is a known case which needs to be fixed upstream.
//{
// name: "fuzz aggregation with scalar param",
// load: `load 30s
// http_requests_total{pod="nginx-1"} -77.00+1.00x15
// http_requests_total{pod="nginx-2"} 1+0.67x21`,
// query: `quantile without (pod) (scalar({__name__="http_requests_total"} offset 2m58s), {__name__="http_requests_total"})`,
// start: time.UnixMilli(0),
// end: time.UnixMilli(221000),
// step: time.Second * 30,
//},
}

for _, tc := range cases {
Expand All @@ -4825,6 +4866,8 @@ func TestQueryStats(t *testing.T) {
Timeout: 300 * time.Second,
MaxSamples: math.MaxInt64,
EnablePerStepStats: true,
EnableAtModifier: true,
EnableNegativeOffset: true,
NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 { return 30 * time.Second.Milliseconds() },
}
qOpts := promql.NewPrometheusQueryOpts(true, 5*time.Minute)
Expand All @@ -4851,8 +4894,9 @@ func TestQueryStats(t *testing.T) {
stats.NewQueryStats(newStats)

testutil.WithGoCmp(comparer).Equals(t, oldResult, newResult)
testutil.Equals(t, oldStats.Samples.TotalSamples, newStats.Samples.TotalSamples)
testutil.Equals(t, oldStats.Samples.TotalSamplesPerStep, newStats.Samples.TotalSamplesPerStep)
if oldResult.Err == nil {
testutil.WithGoCmp(samplesComparer).Equals(t, oldStats.Samples, newStats.Samples)
}

// Range query
oldQ, err = oldEngine.NewRangeQuery(ctx, storage, qOpts, tc.query, tc.start, tc.end, tc.step)
Expand All @@ -4868,8 +4912,9 @@ func TestQueryStats(t *testing.T) {
stats.NewQueryStats(newStats)

testutil.WithGoCmp(comparer).Equals(t, oldResult, newResult)
testutil.Equals(t, oldStats.Samples.TotalSamples, newStats.Samples.TotalSamples)
testutil.Equals(t, oldStats.Samples.TotalSamplesPerStep, newStats.Samples.TotalSamplesPerStep)
if oldResult.Err == nil {
testutil.WithGoCmp(samplesComparer).Equals(t, oldStats.Samples, newStats.Samples)
}
})
}
}
Expand Down Expand Up @@ -5727,6 +5772,24 @@ var (
}
return false
})

samplesComparer = cmp.Comparer(func(x, y *stats.QuerySamples) bool {
if x == nil && y == nil {
return true
}
if x.TotalSamples != y.TotalSamples {
return false
}

if !cmp.Equal(x.TotalSamplesPerStep, y.TotalSamplesPerStep) {
return false
}

if !cmp.Equal(x.TotalSamplesPerStepMap(), y.TotalSamplesPerStepMap()) {
return false
}
return true
})
)

func queryExplanation(q promql.Query) string {
Expand Down
113 changes: 88 additions & 25 deletions engine/enginefuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/promqltest"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/stats"
"github.com/prometheus/prometheus/util/teststorage"
"github.com/stretchr/testify/require"

Expand All @@ -32,9 +33,31 @@ import (
const testRuns = 100

type testCase struct {
query string
loads []string
oldRes, newRes *promql.Result
query string
loads []string
oldRes, newRes *promql.Result
oldStats, newStats *stats.Statistics
start, end time.Time
interval time.Duration
validateSamples bool
}

// shouldValidateSamples checks if the samples can be compared for the expr.
// For certain known cases, prometheus engine and thanos engine returns different samples.
func shouldValidateSamples(expr parser.Expr) bool {
valid := true

parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
switch n := node.(type) {
case *parser.Call:
if n.Func.Name == "scalar" {
valid = false
return errors.New("error")
}
}
return nil
})
return valid
}

func FuzzEnginePromQLSmithRangeQuery(f *testing.F) {
Expand All @@ -61,7 +84,9 @@ func FuzzEnginePromQLSmithRangeQuery(f *testing.F) {
MaxSamples: 1e10,
EnableNegativeOffset: true,
EnableAtModifier: true,
EnablePerStepStats: true,
}
qOpts := promql.NewPrometheusQueryOpts(true, 0)

storage := promqltest.LoadedStorage(t, load)
defer storage.Close()
Expand All @@ -80,22 +105,22 @@ func FuzzEnginePromQLSmithRangeQuery(f *testing.F) {
}
ps := promqlsmith.New(rnd, seriesSet, psOpts...)

newEngine := engine.New(engine.Opts{EngineOpts: opts, DisableFallback: true})
newEngine := engine.New(engine.Opts{EngineOpts: opts, DisableFallback: true, EnableAnalysis: true})
oldEngine := promql.NewEngine(opts)

var (
q1 promql.Query
query string
q1 promql.Query
query string
validateSamples bool
)
cases := make([]*testCase, testRuns)
for i := 0; i < testRuns; i++ {
// Since we disabled fallback, keep trying until we find a query
// that can be natively executed by the engine.
// Parsing experimental function, like mad_over_time, will lead to a parser.ParseErrors, so we also ignore those.
for {
expr := ps.WalkRangeQuery()
validateSamples = shouldValidateSamples(expr)

query = expr.Pretty(0)
q1, err = newEngine.NewRangeQuery(context.Background(), storage, nil, query, start, end, interval)
q1, err = newEngine.NewRangeQuery(context.Background(), storage, qOpts, query, start, end, interval)
if errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented) || errors.As(err, &parser.ParseErrors{}) {
continue
} else {
Expand All @@ -105,17 +130,27 @@ func FuzzEnginePromQLSmithRangeQuery(f *testing.F) {

testutil.Ok(t, err)
newResult := q1.Exec(context.Background())
newStats := q1.Stats()
stats.NewQueryStats(newStats)

q2, err := oldEngine.NewRangeQuery(context.Background(), storage, nil, query, start, end, interval)
q2, err := oldEngine.NewRangeQuery(context.Background(), storage, qOpts, query, start, end, interval)
testutil.Ok(t, err)

oldResult := q2.Exec(context.Background())
oldStats := q2.Stats()
stats.NewQueryStats(oldStats)

cases[i] = &testCase{
query: query,
newRes: newResult,
oldRes: oldResult,
loads: []string{load},
query: query,
newRes: newResult,
newStats: newStats,
oldRes: oldResult,
oldStats: oldStats,
loads: []string{load},
start: start,
end: end,
interval: interval,
validateSamples: validateSamples,
}
}
validateTestCases(t, cases)
Expand All @@ -141,7 +176,9 @@ func FuzzEnginePromQLSmithInstantQuery(f *testing.F) {
MaxSamples: 1e10,
EnableNegativeOffset: true,
EnableAtModifier: true,
EnablePerStepStats: true,
}
qOpts := promql.NewPrometheusQueryOpts(true, 0)

storage := promqltest.LoadedStorage(t, load)
defer storage.Close()
Expand All @@ -151,6 +188,7 @@ func FuzzEnginePromQLSmithInstantQuery(f *testing.F) {
EngineOpts: opts,
DisableFallback: true,
LogicalOptimizers: logicalplan.AllOptimizers,
EnableAnalysis: true,
})
oldEngine := promql.NewEngine(opts)

Expand All @@ -176,8 +214,11 @@ func FuzzEnginePromQLSmithInstantQuery(f *testing.F) {
// Parsing experimental function, like mad_over_time, will lead to a parser.ParseErrors, so we also ignore those.
for {
expr := ps.WalkInstantQuery()
if !shouldValidateSamples(expr) {
continue
}
query = expr.Pretty(0)
q1, err = newEngine.NewInstantQuery(context.Background(), storage, nil, query, queryTime)
q1, err = newEngine.NewInstantQuery(context.Background(), storage, qOpts, query, queryTime)
if errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented) || errors.As(err, &parser.ParseErrors{}) {
continue
} else {
Expand All @@ -187,17 +228,26 @@ func FuzzEnginePromQLSmithInstantQuery(f *testing.F) {

testutil.Ok(t, err)
newResult := q1.Exec(context.Background())
newStats := q1.Stats()
stats.NewQueryStats(newStats)

q2, err := oldEngine.NewInstantQuery(context.Background(), storage, nil, query, queryTime)
q2, err := oldEngine.NewInstantQuery(context.Background(), storage, qOpts, query, queryTime)
testutil.Ok(t, err)

oldResult := q2.Exec(context.Background())
oldStats := q2.Stats()
stats.NewQueryStats(oldStats)

cases[i] = &testCase{
query: query,
newRes: newResult,
oldRes: oldResult,
loads: []string{load},
query: query,
newRes: newResult,
newStats: newStats,
oldRes: oldResult,
oldStats: oldStats,
loads: []string{load},
start: queryTime,
end: queryTime,
validateSamples: true,
}
}
validateTestCases(t, cases)
Expand Down Expand Up @@ -444,14 +494,27 @@ func getSeries(ctx context.Context, q storage.Queryable) ([]labels.Labels, error

func validateTestCases(t *testing.T, cases []*testCase) {
failures := 0
logQuery := func(c *testCase) {
for _, load := range c.loads {
t.Logf(load)
}
t.Logf("query: %s, start: %d, end: %d, interval: %v", c.query, c.start.UnixMilli(), c.end.UnixMilli(), c.interval)
}
for i, c := range cases {
if !cmp.Equal(c.oldRes, c.newRes, comparer) {
for _, load := range c.loads {
t.Logf(load)
}
t.Logf(c.query)
logQuery(c)

t.Logf("case %d error mismatch.\nnew result: %s\nold result: %s\n", i, c.newRes.String(), c.oldRes.String())
//failures++
continue
}
if !c.validateSamples || c.oldRes.Err != nil {
// Skip sample comparison
continue
}
if !cmp.Equal(c.oldStats.Samples, c.newStats.Samples, samplesComparer) {
logQuery(c)
t.Logf("case: %d, samples mismatch. total samples: old: %v, new: %v. samples per step: old: %v, new: %v", i, c.oldStats.Samples.TotalSamples, c.newStats.Samples.TotalSamples, c.oldStats.Samples.TotalSamplesPerStep, c.newStats.Samples.TotalSamplesPerStep)
failures++
}
}
Expand Down
Loading

0 comments on commit 14f9955

Please sign in to comment.