Skip to content

Commit 998354b

Browse files
authored
Add batched streaming aggregations (#324)
* Add batched streaming aggregations With the current model we expect each Next call to return samples for unique steps. This approach works well because of its simplicity, but for high cardinality queries (100K+ series), it tends to use a lot of memory because the buffers for each step tend to be big. This commit resolves that by allowing the aggregate to handle batches from the same step coming from subsequent Next calls. Selectors are expanded with a batchSize parameter which can be injected when a streaming aggregate is present in the plan. Using this parameter then can put an upper limit on the size of the output vectors they produce. Signed-off-by: Filip Petkovski <[email protected]> * Remove unused method Signed-off-by: Filip Petkovski <[email protected]> * Fix traverse Signed-off-by: Filip Petkovski <[email protected]> * Fix acceptance tests Signed-off-by: Filip Petkovski <[email protected]> * Add group test and fix failure Signed-off-by: Filip Petkovski <[email protected]> --------- Signed-off-by: Filip Petkovski <[email protected]>
1 parent fcf8381 commit 998354b

19 files changed

+675
-294
lines changed

engine/bench_test.go

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package engine_test
66
import (
77
"context"
88
"fmt"
9+
"runtime"
910
"strconv"
1011
"strings"
1112
"testing"
@@ -86,6 +87,10 @@ func BenchmarkChunkDecoding(b *testing.B) {
8687
}
8788

8889
func BenchmarkSingleQuery(b *testing.B) {
90+
b.StopTimer()
91+
memProfileRate := runtime.MemProfileRate
92+
runtime.MemProfileRate = 0
93+
8994
test := setupStorage(b, 5000, 3, 720)
9095
defer test.Close()
9196

@@ -94,10 +99,16 @@ func BenchmarkSingleQuery(b *testing.B) {
9499
step := time.Second * 30
95100

96101
query := "sum(rate(http_requests_total[2m]))"
97-
b.ResetTimer()
102+
opts := engine.Opts{
103+
EngineOpts: promql.EngineOpts{Timeout: 100 * time.Second},
104+
DisableFallback: true,
105+
SelectorBatchSize: 256,
106+
}
98107
b.ReportAllocs()
108+
b.StartTimer()
109+
runtime.MemProfileRate = memProfileRate
99110
for i := 0; i < b.N; i++ {
100-
result := executeRangeQuery(b, query, test, start, end, step)
111+
result := executeRangeQuery(b, query, test, start, end, step, opts)
101112
testutil.Ok(b, result.Err)
102113
}
103114
}
@@ -274,24 +285,29 @@ func BenchmarkRangeQuery(b *testing.B) {
274285
},
275286
}
276287

288+
opts := engine.Opts{
289+
EngineOpts: promql.EngineOpts{
290+
Logger: nil,
291+
Reg: nil,
292+
MaxSamples: 50000000,
293+
Timeout: 100 * time.Second,
294+
EnableAtModifier: true,
295+
EnableNegativeOffset: true,
296+
},
297+
SelectorBatchSize: 256,
298+
}
299+
277300
for _, tc := range cases {
278301
b.Run(tc.name, func(b *testing.B) {
279302
b.ReportAllocs()
280303
b.Run("old_engine", func(b *testing.B) {
281-
opts := promql.EngineOpts{
282-
Logger: nil,
283-
Reg: nil,
284-
MaxSamples: 50000000,
285-
Timeout: 100 * time.Second,
286-
EnableAtModifier: true,
287-
EnableNegativeOffset: true,
288-
}
289-
engine := promql.NewEngine(opts)
304+
305+
promEngine := promql.NewEngine(opts.EngineOpts)
290306

291307
b.ResetTimer()
292308
b.ReportAllocs()
293309
for i := 0; i < b.N; i++ {
294-
qry, err := engine.NewRangeQuery(context.Background(), tc.storage, nil, tc.query, start, end, step)
310+
qry, err := promEngine.NewRangeQuery(context.Background(), tc.storage, nil, tc.query, start, end, step)
295311
testutil.Ok(b, err)
296312

297313
oldResult := qry.Exec(context.Background())
@@ -303,7 +319,7 @@ func BenchmarkRangeQuery(b *testing.B) {
303319
b.ReportAllocs()
304320

305321
for i := 0; i < b.N; i++ {
306-
newResult := executeRangeQuery(b, tc.query, tc.storage, start, end, step)
322+
newResult := executeRangeQuery(b, tc.query, tc.storage, start, end, step, opts)
307323
testutil.Ok(b, newResult.Err)
308324
}
309325
})
@@ -562,8 +578,8 @@ func BenchmarkMergeSelectorsOptimizer(b *testing.B) {
562578

563579
}
564580

565-
func executeRangeQuery(b *testing.B, q string, storage *teststorage.TestStorage, start time.Time, end time.Time, step time.Duration) *promql.Result {
566-
return executeRangeQueryWithOpts(b, q, storage, start, end, step, engine.Opts{DisableFallback: true, EngineOpts: promql.EngineOpts{Timeout: 100 * time.Second}})
581+
func executeRangeQuery(b *testing.B, q string, storage *teststorage.TestStorage, start time.Time, end time.Time, step time.Duration, opts engine.Opts) *promql.Result {
582+
return executeRangeQueryWithOpts(b, q, storage, start, end, step, opts)
567583
}
568584

569585
func executeRangeQueryWithOpts(b *testing.B, q string, storage *teststorage.TestStorage, start time.Time, end time.Time, step time.Duration, opts engine.Opts) *promql.Result {

engine/engine.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ type Opts struct {
8383

8484
// EnableAnalysis enables query analysis.
8585
EnableAnalysis bool
86+
87+
// SelectorBatchSize specifies the maximum number of samples to be returned by selectors in a single batch.
88+
SelectorBatchSize int64
8689
}
8790

8891
func (o Opts) getLogicalOptimizers() []logicalplan.Optimizer {
@@ -180,6 +183,12 @@ func New(opts Opts) *compatibilityEngine {
180183
opts.ExtLookbackDelta = 1 * time.Hour
181184
level.Debug(opts.Logger).Log("msg", "externallookback delta is zero, setting to default value", "value", 1*24*time.Hour)
182185
}
186+
if opts.SelectorBatchSize != 0 {
187+
opts.LogicalOptimizers = append(
188+
[]logicalplan.Optimizer{logicalplan.SelectorBatchSize{Size: opts.SelectorBatchSize}},
189+
opts.LogicalOptimizers...,
190+
)
191+
}
183192

184193
functions := make(map[string]*parser.Function, len(parser.Functions))
185194
for k, v := range parser.Functions {

engine/engine_test.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,10 +1103,17 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
11031103
{
11041104
name: "group",
11051105
load: `load 30s
1106-
http_requests_total{pod="nginx-1"} 1+1x15
1107-
http_requests_total{pod="nginx-2"} 1+2x18`,
1106+
http_requests_total{pod="nginx-1"} 2+1x15
1107+
http_requests_total{pod="nginx-2"} 2+2x18`,
11081108
query: `group(http_requests_total)`,
11091109
},
1110+
{
1111+
name: "group by ",
1112+
load: `load 30s
1113+
http_requests_total{pod="nginx-1"} 2+1x15
1114+
http_requests_total{pod="nginx-2"} 2+2x18`,
1115+
query: `group by (pod) (http_requests_total)`,
1116+
},
11101117
{
11111118
name: "resets",
11121119
load: `load 30s
@@ -1907,6 +1914,8 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
19071914
EngineOpts: opts,
19081915
DisableFallback: disableFallback,
19091916
LogicalOptimizers: optimizers,
1917+
// Set to 1 to make sure batching is tested.
1918+
SelectorBatchSize: 1,
19101919
})
19111920
ctx := context.Background()
19121921
q1, err := newEngine.NewRangeQuery(ctx, storage, nil, tc.query, tc.start, tc.end, tc.step)
@@ -1920,7 +1929,7 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
19201929
defer q2.Close()
19211930
oldResult := q2.Exec(ctx)
19221931

1923-
testutil.WithGoCmp(comparer).Equals(t, newResult, oldResult)
1932+
testutil.WithGoCmp(comparer).Equals(t, oldResult, newResult)
19241933
})
19251934
}
19261935
})
@@ -4667,7 +4676,7 @@ func testNativeHistograms(t *testing.T, cases []histogramTestCase, opts promql.E
46674676
testutil.Assert(t, len(promVector) == 0)
46684677
}
46694678

4670-
testutil.WithGoCmp(comparer).Equals(t, newResult, promResult)
4679+
testutil.WithGoCmp(comparer).Equals(t, promResult, newResult)
46714680
})
46724681

46734682
t.Run("range", func(t *testing.T) {

0 commit comments

Comments
 (0)