Skip to content

Commit 8de1865

Browse files
committed
plan,execution: make coalesce a logical node
Signed-off-by: Michael Hoffmann <[email protected]>
1 parent c38e829 commit 8de1865

File tree

5 files changed

+153
-49
lines changed

5 files changed

+153
-49
lines changed

execution/execution.go

Lines changed: 33 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package execution
1818

1919
import (
20-
"runtime"
2120
"sort"
2221

2322
"github.com/efficientgo/core/errors"
@@ -72,6 +71,8 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
7271
return newUnaryExpression(e, storage, opts, hints)
7372
case *parser.StepInvariantExpr:
7473
return newStepInvariantExpression(e, storage, opts, hints)
74+
case logicalplan.Coalesce:
75+
return newCoalesce(e, storage, opts, hints)
7576
case logicalplan.Deduplicate:
7677
return newDeduplication(e, storage, opts, hints)
7778
case logicalplan.RemoteExecution:
@@ -94,21 +95,10 @@ func newVectorSelector(e *logicalplan.VectorSelector, storage *engstore.Selector
9495
batchsize := e.BatchSize
9596
selector := storage.GetFilteredSelector(start, end, opts.Step.Milliseconds(), e.LabelMatchers, e.Filters, hints)
9697

97-
numShards := runtime.GOMAXPROCS(0) / 2
98-
if numShards < 1 {
99-
numShards = 1
100-
}
101-
102-
operators := make([]model.VectorOperator, 0, numShards)
103-
for i := 0; i < numShards; i++ {
104-
operator := exchange.NewConcurrent(
105-
scan.NewVectorSelector(
106-
model.NewVectorPool(opts.StepsBatch), selector, opts, offset, hints, batchsize, i, numShards),
107-
2)
108-
operators = append(operators, operator)
109-
}
98+
shard := e.Shard
99+
numShards := e.NumShards
110100

111-
return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, batchsize*int64(numShards), operators...), nil
101+
return scan.NewVectorSelector(model.NewVectorPool(opts.StepsBatch), selector, opts, offset, hints, batchsize, shard, numShards), nil
112102
}
113103

114104
func newCall(e *parser.Call, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
@@ -231,10 +221,6 @@ func newRangeVectorFunction(e *parser.Call, t *logicalplan.MatrixSelector, stora
231221
hints.Range = milliSecondRange
232222
filter := storage.GetFilteredSelector(start, end, opts.Step.Milliseconds(), vs.LabelMatchers, filters, hints)
233223

234-
numShards := runtime.GOMAXPROCS(0) / 2
235-
if numShards < 1 {
236-
numShards = 1
237-
}
238224
var arg float64
239225
if e.Func.Name == "quantile_over_time" {
240226
constVal, err := unwrapConstVal(e.Args[0])
@@ -243,28 +229,21 @@ func newRangeVectorFunction(e *parser.Call, t *logicalplan.MatrixSelector, stora
243229
}
244230
arg = constVal
245231
}
246-
247-
operators := make([]model.VectorOperator, 0, numShards)
248-
for i := 0; i < numShards; i++ {
249-
operator, err := scan.NewMatrixSelector(
250-
model.NewVectorPool(opts.StepsBatch),
251-
filter,
252-
e.Func.Name,
253-
arg,
254-
opts,
255-
t.Range,
256-
vs.Offset,
257-
batchSize,
258-
i,
259-
numShards,
260-
)
261-
if err != nil {
262-
return nil, err
263-
}
264-
operators = append(operators, exchange.NewConcurrent(operator, 2))
265-
}
266-
267-
return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, batchSize*int64(numShards), operators...), nil
232+
shard := t.Shard
233+
numShards := t.NumShards
234+
235+
return scan.NewMatrixSelector(
236+
model.NewVectorPool(opts.StepsBatch),
237+
filter,
238+
e.Func.Name,
239+
arg,
240+
opts,
241+
t.Range,
242+
vs.Offset,
243+
batchSize,
244+
shard,
245+
numShards,
246+
)
268247
}
269248

270249
func newSubqueryFunction(e *parser.Call, t *parser.SubqueryExpr, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
@@ -407,6 +386,18 @@ func newStepInvariantExpression(e *parser.StepInvariantExpr, storage *engstore.S
407386
return step_invariant.NewStepInvariantOperator(model.NewVectorPoolWithSize(opts.StepsBatch, 1), next, e.Expr, opts)
408387
}
409388

389+
func newCoalesce(e logicalplan.Coalesce, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
390+
operators := make([]model.VectorOperator, len(e.Exprs))
391+
for i, expr := range e.Exprs {
392+
operator, err := newOperator(expr, storage, opts, hints)
393+
if err != nil {
394+
return nil, err
395+
}
396+
operators[i] = exchange.NewConcurrent(operator, 2)
397+
}
398+
return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, 0, operators...), nil
399+
}
400+
410401
func newDeduplication(e logicalplan.Deduplicate, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
411402
// The Deduplicate operator will deduplicate samples using a last-sample-wins strategy.
412403
// Sorting engines by MaxT ensures that samples produced due to
@@ -424,6 +415,7 @@ func newDeduplication(e logicalplan.Deduplicate, storage *engstore.SelectorPool,
424415
}
425416
operators[i] = operator
426417
}
418+
// We dont need to use logical coalesce here since it was already pushed back above remote evaluation here
427419
coalesce := exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, 0, operators...)
428420
dedup := exchange.NewDedupOperator(model.NewVectorPool(opts.StepsBatch), coalesce)
429421
return exchange.NewConcurrent(dedup, 2), nil

logicalplan/coalesce.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package logicalplan
2+
3+
import (
4+
"github.com/prometheus/prometheus/promql/parser"
5+
"github.com/prometheus/prometheus/promql/parser/posrange"
6+
"github.com/prometheus/prometheus/util/annotations"
7+
8+
"github.com/thanos-io/promql-engine/query"
9+
)
10+
11+
type Coalesce struct {
12+
// We assume to always have at least one expression
13+
Exprs []parser.Expr
14+
}
15+
16+
func (c Coalesce) String() string {
17+
return c.Exprs[0].String()
18+
}
19+
20+
func (c Coalesce) Pretty(level int) string { return c.String() }
21+
22+
func (c Coalesce) PositionRange() posrange.PositionRange { return c.Exprs[0].PositionRange() }
23+
24+
func (c Coalesce) Type() parser.ValueType { return c.Exprs[0].Type() }
25+
26+
func (c Coalesce) PromQLExpr() {}
27+
28+
type CoalesceOptimizer struct{}
29+
30+
func (c CoalesceOptimizer) Optimize(expr parser.Expr, opts *query.Options) (parser.Expr, annotations.Annotations) {
31+
numShards := opts.NumShards()
32+
33+
TraverseBottomUp(nil, &expr, func(parent, e *parser.Expr) bool {
34+
switch t := (*e).(type) {
35+
case *VectorSelector:
36+
if parent != nil {
37+
// we coalesce matrix selectors in a different branch
38+
if _, ok := (*parent).(MatrixSelector); ok {
39+
return false
40+
}
41+
// timestamp/absent is a weird function and those workarounds are for it
42+
if _, ok := (*parent).(*parser.StepInvariantExpr); ok {
43+
return false
44+
}
45+
if c, ok := (*parent).(*parser.Call); ok {
46+
if c.Func.Name == "absent" || c.Func.Name == "timestamp" {
47+
return true
48+
}
49+
}
50+
}
51+
exprs := make([]parser.Expr, numShards)
52+
for i := 0; i < numShards; i++ {
53+
exprs[i] = &VectorSelector{
54+
VectorSelector: t.VectorSelector,
55+
Filters: t.Filters,
56+
BatchSize: t.BatchSize,
57+
Shard: i,
58+
NumShards: numShards,
59+
}
60+
}
61+
*e = Coalesce{Exprs: exprs}
62+
return true
63+
case *parser.Call:
64+
var (
65+
ms *MatrixSelector
66+
marg int
67+
)
68+
for i := range t.Args {
69+
if arg, ok := t.Args[i].(*MatrixSelector); ok {
70+
ms = arg
71+
marg = i
72+
}
73+
}
74+
if ms == nil {
75+
return false
76+
}
77+
78+
exprs := make([]parser.Expr, numShards)
79+
for i := 0; i < numShards; i++ {
80+
aux := &MatrixSelector{
81+
MatrixSelector: ms.MatrixSelector,
82+
OriginalString: ms.OriginalString,
83+
Shard: i,
84+
NumShards: numShards,
85+
}
86+
f := &parser.Call{
87+
Func: t.Func,
88+
Args: t.Args,
89+
PosRange: t.PosRange,
90+
}
91+
f.Args[marg] = aux
92+
93+
exprs[i] = f
94+
}
95+
*e = Coalesce{Exprs: exprs}
96+
default:
97+
return true
98+
}
99+
return true
100+
})
101+
return expr, nil
102+
}

logicalplan/plan.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ var (
2828
var DefaultOptimizers = []Optimizer{
2929
SortMatchers{},
3030
MergeSelectsOptimizer{},
31+
CoalesceOptimizer{},
3132
}
3233

3334
type Plan interface {
@@ -183,9 +184,9 @@ func replaceSelectors(plan parser.Expr) parser.Expr {
183184
traverse(&plan, func(current *parser.Expr) {
184185
switch t := (*current).(type) {
185186
case *parser.MatrixSelector:
186-
*current = &MatrixSelector{MatrixSelector: t, OriginalString: t.String()}
187+
*current = &MatrixSelector{MatrixSelector: t, OriginalString: t.String(), Shard: 0, NumShards: 1}
187188
case *parser.VectorSelector:
188-
*current = &VectorSelector{VectorSelector: t}
189+
*current = &VectorSelector{VectorSelector: t, Shard: 0, NumShards: 1}
189190
}
190191
})
191192
return plan
@@ -414,6 +415,9 @@ type VectorSelector struct {
414415
*parser.VectorSelector
415416
Filters []*labels.Matcher
416417
BatchSize int64
418+
419+
Shard int
420+
NumShards int
417421
}
418422

419423
func (f VectorSelector) String() string {
@@ -435,6 +439,9 @@ type MatrixSelector struct {
435439

436440
// Needed because this operator is used in the distributed mode
437441
OriginalString string
442+
443+
Shard int
444+
NumShards int
438445
}
439446

440447
func (f MatrixSelector) String() string {

logicalplan/plan_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ var closedParenthesis = regexp.MustCompile(`\s+\)`)
3232
// by testMatrixSelector that has a overridden string method?
3333
func renderExprTree(expr parser.Expr) string {
3434
switch t := expr.(type) {
35-
case *parser.NumberLiteral:
36-
return fmt.Sprint(t.Val)
3735
case *VectorSelector:
3836
var b strings.Builder
3937
base := t.VectorSelector.String()
@@ -49,8 +47,6 @@ func renderExprTree(expr parser.Expr) string {
4947
return b.String()
5048
}
5149
return base
52-
case *MatrixSelector:
53-
return t.String()
5450
case *parser.BinaryExpr:
5551
var b strings.Builder
5652
b.WriteString(renderExprTree(t.LHS))
@@ -105,8 +101,6 @@ func renderExprTree(expr parser.Expr) string {
105101
b.WriteString(renderExprTree(t.Expr))
106102
b.WriteRune(')')
107103
return b.String()
108-
case *parser.StepInvariantExpr:
109-
return renderExprTree(t.Expr)
110104
default:
111105
return t.String()
112106
}

query/options.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package query
55

66
import (
77
"context"
8+
"runtime"
89
"time"
910

1011
"github.com/prometheus/prometheus/promql/parser"
@@ -36,6 +37,14 @@ func (o *Options) NumSteps() int {
3637
return int(totalSteps)
3738
}
3839

40+
func (o *Options) NumShards() int {
41+
numShards := runtime.GOMAXPROCS(0) / 2
42+
if numShards < 1 {
43+
numShards = 1
44+
}
45+
return numShards
46+
}
47+
3948
func (o *Options) IsInstantQuery() bool {
4049
return o.NumSteps() == 1
4150
}

0 commit comments

Comments
 (0)