Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package engine_test
import (
"context"
"fmt"
"runtime"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -86,6 +87,10 @@ func BenchmarkChunkDecoding(b *testing.B) {
}

func BenchmarkSingleQuery(b *testing.B) {
b.StopTimer()
memProfileRate := runtime.MemProfileRate
runtime.MemProfileRate = 0

test := setupStorage(b, 5000, 3, 720)
defer test.Close()

Expand All @@ -94,10 +99,16 @@ func BenchmarkSingleQuery(b *testing.B) {
step := time.Second * 30

query := "sum(rate(http_requests_total[2m]))"
b.ResetTimer()
opts := engine.Opts{
EngineOpts: promql.EngineOpts{Timeout: 100 * time.Second},
DisableFallback: true,
SelectorBatchSize: 256,
}
b.ReportAllocs()
b.StartTimer()
runtime.MemProfileRate = memProfileRate
for i := 0; i < b.N; i++ {
result := executeRangeQuery(b, query, test, start, end, step)
result := executeRangeQuery(b, query, test, start, end, step, opts)
testutil.Ok(b, result.Err)
}
}
Expand Down Expand Up @@ -274,24 +285,29 @@ func BenchmarkRangeQuery(b *testing.B) {
},
}

opts := engine.Opts{
EngineOpts: promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 50000000,
Timeout: 100 * time.Second,
EnableAtModifier: true,
EnableNegativeOffset: true,
},
SelectorBatchSize: 256,
}

for _, tc := range cases {
b.Run(tc.name, func(b *testing.B) {
b.ReportAllocs()
b.Run("old_engine", func(b *testing.B) {
opts := promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 50000000,
Timeout: 100 * time.Second,
EnableAtModifier: true,
EnableNegativeOffset: true,
}
engine := promql.NewEngine(opts)

promEngine := promql.NewEngine(opts.EngineOpts)

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
qry, err := engine.NewRangeQuery(context.Background(), tc.storage, nil, tc.query, start, end, step)
qry, err := promEngine.NewRangeQuery(context.Background(), tc.storage, nil, tc.query, start, end, step)
testutil.Ok(b, err)

oldResult := qry.Exec(context.Background())
Expand All @@ -303,7 +319,7 @@ func BenchmarkRangeQuery(b *testing.B) {
b.ReportAllocs()

for i := 0; i < b.N; i++ {
newResult := executeRangeQuery(b, tc.query, tc.storage, start, end, step)
newResult := executeRangeQuery(b, tc.query, tc.storage, start, end, step, opts)
testutil.Ok(b, newResult.Err)
}
})
Expand Down Expand Up @@ -562,8 +578,8 @@ func BenchmarkMergeSelectorsOptimizer(b *testing.B) {

}

func executeRangeQuery(b *testing.B, q string, storage *teststorage.TestStorage, start time.Time, end time.Time, step time.Duration) *promql.Result {
return executeRangeQueryWithOpts(b, q, storage, start, end, step, engine.Opts{DisableFallback: true, EngineOpts: promql.EngineOpts{Timeout: 100 * time.Second}})
func executeRangeQuery(b *testing.B, q string, storage *teststorage.TestStorage, start time.Time, end time.Time, step time.Duration, opts engine.Opts) *promql.Result {
return executeRangeQueryWithOpts(b, q, storage, start, end, step, opts)
}

func executeRangeQueryWithOpts(b *testing.B, q string, storage *teststorage.TestStorage, start time.Time, end time.Time, step time.Duration, opts engine.Opts) *promql.Result {
Expand Down
9 changes: 9 additions & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ type Opts struct {

// EnableAnalysis enables query analysis.
EnableAnalysis bool

// SelectorBatchSize specifies the maximum number of samples to be returned by selectors in a single batch.
SelectorBatchSize int64
}

func (o Opts) getLogicalOptimizers() []logicalplan.Optimizer {
Expand Down Expand Up @@ -180,6 +183,12 @@ func New(opts Opts) *compatibilityEngine {
opts.ExtLookbackDelta = 1 * time.Hour
level.Debug(opts.Logger).Log("msg", "externallookback delta is zero, setting to default value", "value", 1*24*time.Hour)
}
if opts.SelectorBatchSize != 0 {
opts.LogicalOptimizers = append(
[]logicalplan.Optimizer{logicalplan.SelectorBatchSize{Size: opts.SelectorBatchSize}},
opts.LogicalOptimizers...,
)
}

functions := make(map[string]*parser.Function, len(parser.Functions))
for k, v := range parser.Functions {
Expand Down
17 changes: 13 additions & 4 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,10 +1103,17 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
{
name: "group",
load: `load 30s
http_requests_total{pod="nginx-1"} 1+1x15
http_requests_total{pod="nginx-2"} 1+2x18`,
http_requests_total{pod="nginx-1"} 2+1x15
http_requests_total{pod="nginx-2"} 2+2x18`,
query: `group(http_requests_total)`,
},
{
name: "group by ",
load: `load 30s
http_requests_total{pod="nginx-1"} 2+1x15
http_requests_total{pod="nginx-2"} 2+2x18`,
query: `group by (pod) (http_requests_total)`,
},
{
name: "resets",
load: `load 30s
Expand Down Expand Up @@ -1907,6 +1914,8 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
EngineOpts: opts,
DisableFallback: disableFallback,
LogicalOptimizers: optimizers,
// Set to 1 to make sure batching is tested.
SelectorBatchSize: 1,
})
ctx := context.Background()
q1, err := newEngine.NewRangeQuery(ctx, storage, nil, tc.query, tc.start, tc.end, tc.step)
Expand All @@ -1920,7 +1929,7 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
defer q2.Close()
oldResult := q2.Exec(ctx)

testutil.WithGoCmp(comparer).Equals(t, newResult, oldResult)
testutil.WithGoCmp(comparer).Equals(t, oldResult, newResult)
})
}
})
Expand Down Expand Up @@ -4667,7 +4676,7 @@ func testNativeHistograms(t *testing.T, cases []histogramTestCase, opts promql.E
testutil.Assert(t, len(promVector) == 0)
}

testutil.WithGoCmp(comparer).Equals(t, newResult, promResult)
testutil.WithGoCmp(comparer).Equals(t, promResult, newResult)
})

t.Run("range", func(t *testing.T) {
Expand Down
Loading