Skip to content

Commit c0f3313

Browse files
committed
plan,execution: make coalesce a logical node
Signed-off-by: Michael Hoffmann <[email protected]>
1 parent c38e829 commit c0f3313

File tree

5 files changed

+156
-49
lines changed

5 files changed

+156
-49
lines changed

execution/execution.go

+33-41
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package execution
1818

1919
import (
20-
"runtime"
2120
"sort"
2221

2322
"github.com/efficientgo/core/errors"
@@ -72,6 +71,8 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
7271
return newUnaryExpression(e, storage, opts, hints)
7372
case *parser.StepInvariantExpr:
7473
return newStepInvariantExpression(e, storage, opts, hints)
74+
case logicalplan.Coalesce:
75+
return newCoalesce(e, storage, opts, hints)
7576
case logicalplan.Deduplicate:
7677
return newDeduplication(e, storage, opts, hints)
7778
case logicalplan.RemoteExecution:
@@ -94,21 +95,10 @@ func newVectorSelector(e *logicalplan.VectorSelector, storage *engstore.Selector
9495
batchsize := e.BatchSize
9596
selector := storage.GetFilteredSelector(start, end, opts.Step.Milliseconds(), e.LabelMatchers, e.Filters, hints)
9697

97-
numShards := runtime.GOMAXPROCS(0) / 2
98-
if numShards < 1 {
99-
numShards = 1
100-
}
101-
102-
operators := make([]model.VectorOperator, 0, numShards)
103-
for i := 0; i < numShards; i++ {
104-
operator := exchange.NewConcurrent(
105-
scan.NewVectorSelector(
106-
model.NewVectorPool(opts.StepsBatch), selector, opts, offset, hints, batchsize, i, numShards),
107-
2)
108-
operators = append(operators, operator)
109-
}
98+
shard := e.Shard
99+
numShards := e.NumShards
110100

111-
return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, batchsize*int64(numShards), operators...), nil
101+
return scan.NewVectorSelector(model.NewVectorPool(opts.StepsBatch), selector, opts, offset, hints, batchsize, shard, numShards), nil
112102
}
113103

114104
func newCall(e *parser.Call, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
@@ -231,10 +221,6 @@ func newRangeVectorFunction(e *parser.Call, t *logicalplan.MatrixSelector, stora
231221
hints.Range = milliSecondRange
232222
filter := storage.GetFilteredSelector(start, end, opts.Step.Milliseconds(), vs.LabelMatchers, filters, hints)
233223

234-
numShards := runtime.GOMAXPROCS(0) / 2
235-
if numShards < 1 {
236-
numShards = 1
237-
}
238224
var arg float64
239225
if e.Func.Name == "quantile_over_time" {
240226
constVal, err := unwrapConstVal(e.Args[0])
@@ -243,28 +229,21 @@ func newRangeVectorFunction(e *parser.Call, t *logicalplan.MatrixSelector, stora
243229
}
244230
arg = constVal
245231
}
246-
247-
operators := make([]model.VectorOperator, 0, numShards)
248-
for i := 0; i < numShards; i++ {
249-
operator, err := scan.NewMatrixSelector(
250-
model.NewVectorPool(opts.StepsBatch),
251-
filter,
252-
e.Func.Name,
253-
arg,
254-
opts,
255-
t.Range,
256-
vs.Offset,
257-
batchSize,
258-
i,
259-
numShards,
260-
)
261-
if err != nil {
262-
return nil, err
263-
}
264-
operators = append(operators, exchange.NewConcurrent(operator, 2))
265-
}
266-
267-
return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, batchSize*int64(numShards), operators...), nil
232+
shard := t.Shard
233+
numShards := t.NumShards
234+
235+
return scan.NewMatrixSelector(
236+
model.NewVectorPool(opts.StepsBatch),
237+
filter,
238+
e.Func.Name,
239+
arg,
240+
opts,
241+
t.Range,
242+
vs.Offset,
243+
batchSize,
244+
shard,
245+
numShards,
246+
)
268247
}
269248

270249
func newSubqueryFunction(e *parser.Call, t *parser.SubqueryExpr, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
@@ -407,6 +386,18 @@ func newStepInvariantExpression(e *parser.StepInvariantExpr, storage *engstore.S
407386
return step_invariant.NewStepInvariantOperator(model.NewVectorPoolWithSize(opts.StepsBatch, 1), next, e.Expr, opts)
408387
}
409388

389+
func newCoalesce(e logicalplan.Coalesce, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
390+
operators := make([]model.VectorOperator, len(e.Exprs))
391+
for i, expr := range e.Exprs {
392+
operator, err := newOperator(expr, storage, opts, hints)
393+
if err != nil {
394+
return nil, err
395+
}
396+
operators[i] = exchange.NewConcurrent(operator, 2)
397+
}
398+
return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, 0, operators...), nil
399+
}
400+
410401
func newDeduplication(e logicalplan.Deduplicate, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
411402
// The Deduplicate operator will deduplicate samples using a last-sample-wins strategy.
412403
// Sorting engines by MaxT ensures that samples produced due to
@@ -424,6 +415,7 @@ func newDeduplication(e logicalplan.Deduplicate, storage *engstore.SelectorPool,
424415
}
425416
operators[i] = operator
426417
}
418+
// We dont need to use logical coalesce here since it was already pushed back above remote evaluation here
427419
coalesce := exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, 0, operators...)
428420
dedup := exchange.NewDedupOperator(model.NewVectorPool(opts.StepsBatch), coalesce)
429421
return exchange.NewConcurrent(dedup, 2), nil

logicalplan/coalesce.go

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Copyright (c) The Thanos Community Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package logicalplan
5+
6+
import (
7+
"github.com/prometheus/prometheus/promql/parser"
8+
"github.com/prometheus/prometheus/promql/parser/posrange"
9+
"github.com/prometheus/prometheus/util/annotations"
10+
11+
"github.com/thanos-io/promql-engine/query"
12+
)
13+
14+
type Coalesce struct {
15+
// We assume to always have at least one expression
16+
Exprs []parser.Expr
17+
}
18+
19+
func (c Coalesce) String() string {
20+
return c.Exprs[0].String()
21+
}
22+
23+
func (c Coalesce) Pretty(level int) string { return c.String() }
24+
25+
func (c Coalesce) PositionRange() posrange.PositionRange { return c.Exprs[0].PositionRange() }
26+
27+
func (c Coalesce) Type() parser.ValueType { return c.Exprs[0].Type() }
28+
29+
func (c Coalesce) PromQLExpr() {}
30+
31+
type CoalesceOptimizer struct{}
32+
33+
func (c CoalesceOptimizer) Optimize(expr parser.Expr, opts *query.Options) (parser.Expr, annotations.Annotations) {
34+
numShards := opts.NumShards()
35+
36+
TraverseBottomUp(nil, &expr, func(parent, e *parser.Expr) bool {
37+
switch t := (*e).(type) {
38+
case *VectorSelector:
39+
if parent != nil {
40+
// we coalesce matrix selectors in a different branch
41+
if _, ok := (*parent).(MatrixSelector); ok {
42+
return false
43+
}
44+
// timestamp/absent is a weird function and those workarounds are for it
45+
if _, ok := (*parent).(*parser.StepInvariantExpr); ok {
46+
return false
47+
}
48+
if c, ok := (*parent).(*parser.Call); ok {
49+
if c.Func.Name == "absent" || c.Func.Name == "timestamp" {
50+
return true
51+
}
52+
}
53+
}
54+
exprs := make([]parser.Expr, numShards)
55+
for i := 0; i < numShards; i++ {
56+
exprs[i] = &VectorSelector{
57+
VectorSelector: t.VectorSelector,
58+
Filters: t.Filters,
59+
BatchSize: t.BatchSize,
60+
Shard: i,
61+
NumShards: numShards,
62+
}
63+
}
64+
*e = Coalesce{Exprs: exprs}
65+
return true
66+
case *parser.Call:
67+
var (
68+
ms *MatrixSelector
69+
marg int
70+
)
71+
for i := range t.Args {
72+
if arg, ok := t.Args[i].(*MatrixSelector); ok {
73+
ms = arg
74+
marg = i
75+
}
76+
}
77+
if ms == nil {
78+
return false
79+
}
80+
81+
exprs := make([]parser.Expr, numShards)
82+
for i := 0; i < numShards; i++ {
83+
aux := &MatrixSelector{
84+
MatrixSelector: ms.MatrixSelector,
85+
OriginalString: ms.OriginalString,
86+
Shard: i,
87+
NumShards: numShards,
88+
}
89+
f := &parser.Call{
90+
Func: t.Func,
91+
Args: t.Args,
92+
PosRange: t.PosRange,
93+
}
94+
f.Args[marg] = aux
95+
96+
exprs[i] = f
97+
}
98+
*e = Coalesce{Exprs: exprs}
99+
default:
100+
return true
101+
}
102+
return true
103+
})
104+
return expr, nil
105+
}

logicalplan/plan.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ var (
2828
var DefaultOptimizers = []Optimizer{
2929
SortMatchers{},
3030
MergeSelectsOptimizer{},
31+
CoalesceOptimizer{},
3132
}
3233

3334
type Plan interface {
@@ -183,9 +184,9 @@ func replaceSelectors(plan parser.Expr) parser.Expr {
183184
traverse(&plan, func(current *parser.Expr) {
184185
switch t := (*current).(type) {
185186
case *parser.MatrixSelector:
186-
*current = &MatrixSelector{MatrixSelector: t, OriginalString: t.String()}
187+
*current = &MatrixSelector{MatrixSelector: t, OriginalString: t.String(), Shard: 0, NumShards: 1}
187188
case *parser.VectorSelector:
188-
*current = &VectorSelector{VectorSelector: t}
189+
*current = &VectorSelector{VectorSelector: t, Shard: 0, NumShards: 1}
189190
}
190191
})
191192
return plan
@@ -414,6 +415,9 @@ type VectorSelector struct {
414415
*parser.VectorSelector
415416
Filters []*labels.Matcher
416417
BatchSize int64
418+
419+
Shard int
420+
NumShards int
417421
}
418422

419423
func (f VectorSelector) String() string {
@@ -435,6 +439,9 @@ type MatrixSelector struct {
435439

436440
// Needed because this operator is used in the distributed mode
437441
OriginalString string
442+
443+
Shard int
444+
NumShards int
438445
}
439446

440447
func (f MatrixSelector) String() string {

logicalplan/plan_test.go

-6
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ var closedParenthesis = regexp.MustCompile(`\s+\)`)
3232
// by testMatrixSelector that has a overridden string method?
3333
func renderExprTree(expr parser.Expr) string {
3434
switch t := expr.(type) {
35-
case *parser.NumberLiteral:
36-
return fmt.Sprint(t.Val)
3735
case *VectorSelector:
3836
var b strings.Builder
3937
base := t.VectorSelector.String()
@@ -49,8 +47,6 @@ func renderExprTree(expr parser.Expr) string {
4947
return b.String()
5048
}
5149
return base
52-
case *MatrixSelector:
53-
return t.String()
5450
case *parser.BinaryExpr:
5551
var b strings.Builder
5652
b.WriteString(renderExprTree(t.LHS))
@@ -105,8 +101,6 @@ func renderExprTree(expr parser.Expr) string {
105101
b.WriteString(renderExprTree(t.Expr))
106102
b.WriteRune(')')
107103
return b.String()
108-
case *parser.StepInvariantExpr:
109-
return renderExprTree(t.Expr)
110104
default:
111105
return t.String()
112106
}

query/options.go

+9
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package query
55

66
import (
77
"context"
8+
"runtime"
89
"time"
910

1011
"github.com/prometheus/prometheus/promql/parser"
@@ -36,6 +37,14 @@ func (o *Options) NumSteps() int {
3637
return int(totalSteps)
3738
}
3839

40+
func (o *Options) NumShards() int {
41+
numShards := runtime.GOMAXPROCS(0) / 2
42+
if numShards < 1 {
43+
numShards = 1
44+
}
45+
return numShards
46+
}
47+
3948
func (o *Options) IsInstantQuery() bool {
4049
return o.NumSteps() == 1
4150
}

0 commit comments

Comments
 (0)