Skip to content

Commit c474664

Browse files
committed
plan,execution: make coalesce a logical node
Signed-off-by: Michael Hoffmann <[email protected]>
1 parent 4c3bd49 commit c474664

File tree

5 files changed

+160
-50
lines changed

5 files changed

+160
-50
lines changed

execution/execution.go

+36-40
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package execution
1818

1919
import (
20-
"runtime"
2120
"sort"
2221
"time"
2322

@@ -94,6 +93,8 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
9493
return exchange.NewDuplicateLabelCheck(op, opts), nil
9594
case *parser.StepInvariantExpr:
9695
return newStepInvariantExpression(e, storage, opts, hints)
96+
case logicalplan.Coalesce:
97+
return newCoalesce(e, storage, opts, hints)
9798
case logicalplan.Deduplicate:
9899
return newDeduplication(e, storage, opts, hints)
99100
case logicalplan.RemoteExecution:
@@ -117,19 +118,10 @@ func newVectorSelector(e *logicalplan.VectorSelector, storage *engstore.Selector
117118
selector := storage.GetFilteredSelector(start, end, opts.Step.Milliseconds(), e.LabelMatchers, e.Filters, hints)
118119
selectTimestamp := e.SelectTimestamp
119120

120-
numShards := runtime.GOMAXPROCS(0) / 2
121-
if numShards < 1 {
122-
numShards = 1
123-
}
124-
125-
operators := make([]model.VectorOperator, 0, numShards)
126-
for i := 0; i < numShards; i++ {
127-
operator := scan.NewVectorSelector(
128-
model.NewVectorPool(opts.StepsBatch), selector, opts, offset, hints, batchsize, selectTimestamp, i, numShards)
129-
operators = append(operators, exchange.NewConcurrent(operator, 2, opts))
130-
}
121+
shard := e.Shard
122+
numShards := e.NumShards
131123

132-
return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, batchsize*int64(numShards), operators...)
124+
return scan.NewVectorSelector(model.NewVectorPool(opts.StepsBatch), selector, opts, offset, hints, batchsize, selectTimestamp, shard, numShards)
133125
}
134126

135127
func newCall(e *parser.Call, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
@@ -148,6 +140,8 @@ func newCall(e *parser.Call, storage *engstore.SelectorPool, opts *query.Options
148140
return newSubqueryFunction(e, t, storage, opts, hints)
149141
case *logicalplan.MatrixSelector:
150142
return newRangeVectorFunction(e, t, storage, opts, hints)
143+
case logicalplan.Coalesce:
144+
return newRangeVectorFunction(e, t, storage, opts, hints)
151145
}
152146
}
153147
return newInstantVectorFunction(e, storage, opts, hints)
@@ -193,7 +187,7 @@ func newAbsentOverTimeOperator(call *parser.Call, storage *engstore.SelectorPool
193187
}
194188
}
195189

196-
func newRangeVectorFunction(e *parser.Call, t *logicalplan.MatrixSelector, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
190+
func newRangeVectorFunction(e *parser.Call, t parser.Expr, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
197191
// TODO(saswatamcode): Range vector result might need new operator
198192
// before it can be non-nested. https://github.com/thanos-io/promql-engine/issues/39
199193
batchSize, vs, filters, err := unpackVectorSelector(t)
@@ -212,10 +206,6 @@ func newRangeVectorFunction(e *parser.Call, t *logicalplan.MatrixSelector, stora
212206
hints.Range = milliSecondRange
213207
filter := storage.GetFilteredSelector(start, end, opts.Step.Milliseconds(), vs.LabelMatchers, filters, hints)
214208

215-
numShards := runtime.GOMAXPROCS(0) / 2
216-
if numShards < 1 {
217-
numShards = 1
218-
}
219209
var arg float64
220210
if e.Func.Name == "quantile_over_time" {
221211
constVal, err := unwrapConstVal(e.Args[0])
@@ -224,28 +214,21 @@ func newRangeVectorFunction(e *parser.Call, t *logicalplan.MatrixSelector, stora
224214
}
225215
arg = constVal
226216
}
227-
228-
operators := make([]model.VectorOperator, 0, numShards)
229-
for i := 0; i < numShards; i++ {
230-
operator, err := scan.NewMatrixSelector(
231-
model.NewVectorPool(opts.StepsBatch),
232-
filter,
233-
e.Func.Name,
234-
arg,
235-
opts,
236-
t.Range,
237-
vs.Offset,
238-
batchSize,
239-
i,
240-
numShards,
241-
)
242-
if err != nil {
243-
return nil, err
244-
}
245-
operators = append(operators, exchange.NewConcurrent(operator, 2, opts))
246-
}
247-
248-
return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, batchSize*int64(numShards), operators...), nil
217+
shard := t.Shard
218+
numShards := t.NumShards
219+
220+
return scan.NewMatrixSelector(
221+
model.NewVectorPool(opts.StepsBatch),
222+
filter,
223+
e.Func.Name,
224+
arg,
225+
opts,
226+
t.Range,
227+
vs.Offset,
228+
batchSize,
229+
shard,
230+
numShards,
231+
)
249232
}
250233

251234
func newSubqueryFunction(e *parser.Call, t *parser.SubqueryExpr, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
@@ -394,6 +377,18 @@ func newStepInvariantExpression(e *parser.StepInvariantExpr, storage *engstore.S
394377
return step_invariant.NewStepInvariantOperator(model.NewVectorPoolWithSize(opts.StepsBatch, 1), next, e.Expr, opts)
395378
}
396379

380+
func newCoalesce(e logicalplan.Coalesce, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
381+
operators := make([]model.VectorOperator, len(e.Exprs))
382+
for i, expr := range e.Exprs {
383+
operator, err := newOperator(expr, storage, opts, hints)
384+
if err != nil {
385+
return nil, err
386+
}
387+
operators[i] = exchange.NewConcurrent(operator, 2, opts)
388+
}
389+
return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, 0, operators...), nil
390+
}
391+
397392
func newDeduplication(e logicalplan.Deduplicate, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
398393
// The Deduplicate operator will deduplicate samples using a last-sample-wins strategy.
399394
// Sorting engines by MaxT ensures that samples produced due to
@@ -411,6 +406,7 @@ func newDeduplication(e logicalplan.Deduplicate, storage *engstore.SelectorPool,
411406
}
412407
operators[i] = operator
413408
}
409+
// We dont need to use logical coalesce here since it was already pushed back above remote evaluation here
414410
coalesce := exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, 0, operators...)
415411
dedup := exchange.NewDedupOperator(model.NewVectorPool(opts.StepsBatch), coalesce, opts)
416412
return exchange.NewConcurrent(dedup, 2, opts), nil

logicalplan/coalesce.go

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright (c) The Thanos Community Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package logicalplan
5+
6+
import (
7+
"github.com/prometheus/prometheus/promql/parser"
8+
"github.com/prometheus/prometheus/promql/parser/posrange"
9+
"github.com/prometheus/prometheus/util/annotations"
10+
11+
"github.com/thanos-io/promql-engine/query"
12+
)
13+
14+
type Coalesce struct {
15+
// We assume to always have at least one expression
16+
Exprs []parser.Expr
17+
}
18+
19+
func (c Coalesce) String() string {
20+
return c.Exprs[0].String()
21+
}
22+
23+
func (c Coalesce) Pretty(level int) string { return c.String() }
24+
25+
func (c Coalesce) PositionRange() posrange.PositionRange { return c.Exprs[0].PositionRange() }
26+
27+
func (c Coalesce) Type() parser.ValueType { return c.Exprs[0].Type() }
28+
29+
func (c Coalesce) PromQLExpr() {}
30+
31+
type CoalesceOptimizer struct{}
32+
33+
func (c CoalesceOptimizer) Optimize(expr parser.Expr, opts *query.Options) (parser.Expr, annotations.Annotations) {
34+
numShards := opts.NumShards()
35+
36+
TraverseBottomUp(nil, &expr, func(parent, e *parser.Expr) bool {
37+
switch t := (*e).(type) {
38+
case *VectorSelector:
39+
if parent != nil {
40+
// we coalesce matrix selectors in a different branch
41+
if _, ok := (*parent).(MatrixSelector); ok {
42+
return false
43+
}
44+
}
45+
exprs := make([]parser.Expr, numShards)
46+
for i := 0; i < numShards; i++ {
47+
exprs[i] = &VectorSelector{
48+
VectorSelector: t.VectorSelector,
49+
Filters: t.Filters,
50+
BatchSize: t.BatchSize,
51+
SelectTimestamp: t.SelectTimestamp,
52+
Shard: i,
53+
NumShards: numShards,
54+
}
55+
}
56+
*e = Coalesce{Exprs: exprs}
57+
return true
58+
case *MatrixSelector:
59+
// handled in *parser.Call branch
60+
return false
61+
case *parser.Call:
62+
// non-recursively handled in execution.go
63+
if t.Func.Name == "absent_over_time" {
64+
return true
65+
}
66+
var (
67+
ms *MatrixSelector
68+
marg int
69+
)
70+
for i := range t.Args {
71+
if arg, ok := t.Args[i].(*MatrixSelector); ok {
72+
ms = arg
73+
marg = i
74+
}
75+
}
76+
if ms == nil {
77+
return false
78+
}
79+
80+
exprs := make([]parser.Expr, numShards)
81+
for i := 0; i < numShards; i++ {
82+
aux := &MatrixSelector{
83+
MatrixSelector: ms.MatrixSelector,
84+
OriginalString: ms.OriginalString,
85+
Shard: i,
86+
NumShards: numShards,
87+
}
88+
f := &parser.Call{
89+
Func: t.Func,
90+
Args: t.Args,
91+
PosRange: t.PosRange,
92+
}
93+
f.Args[marg] = aux
94+
95+
exprs[i] = f
96+
}
97+
*e = Coalesce{Exprs: exprs}
98+
return true
99+
default:
100+
return true
101+
}
102+
})
103+
return expr, nil
104+
}

logicalplan/plan.go

+11-4
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ var (
2727
var DefaultOptimizers = []Optimizer{
2828
SortMatchers{},
2929
MergeSelectsOptimizer{},
30+
CoalesceOptimizer{},
3031
}
3132

3233
type Plan interface {
@@ -182,24 +183,24 @@ func replaceSelectors(plan parser.Expr) parser.Expr {
182183
traverse(&plan, func(current *parser.Expr) {
183184
switch t := (*current).(type) {
184185
case *parser.MatrixSelector:
185-
*current = &MatrixSelector{MatrixSelector: t, OriginalString: t.String()}
186+
*current = &MatrixSelector{MatrixSelector: t, OriginalString: t.String(), Shard: 0, NumShards: 1}
186187
case *parser.VectorSelector:
187-
*current = &VectorSelector{VectorSelector: t}
188+
*current = &VectorSelector{VectorSelector: t, Shard: 0, NumShards: 1}
188189
case *parser.Call:
189190
if t.Func.Name != "timestamp" {
190191
return
191192
}
192193
switch v := unwrapParens(t.Args[0]).(type) {
193194
case *parser.VectorSelector:
194-
*current = &VectorSelector{VectorSelector: v, SelectTimestamp: true}
195+
*current = &VectorSelector{VectorSelector: v, SelectTimestamp: true, Shard: 0, NumShards: 1}
195196
case *parser.StepInvariantExpr:
196197
vs, ok := unwrapParens(v.Expr).(*parser.VectorSelector)
197198
if ok {
198199
// Prometheus weirdness
199200
if vs.Timestamp != nil {
200201
vs.OriginalOffset = 0
201202
}
202-
*current = &VectorSelector{VectorSelector: vs, SelectTimestamp: true}
203+
*current = &VectorSelector{VectorSelector: vs, SelectTimestamp: true, Shard: 0, NumShards: 1}
203204
}
204205
}
205206
}
@@ -462,6 +463,9 @@ type VectorSelector struct {
462463
Filters []*labels.Matcher
463464
BatchSize int64
464465
SelectTimestamp bool
466+
467+
Shard int
468+
NumShards int
465469
}
466470

467471
func (f VectorSelector) String() string {
@@ -488,6 +492,9 @@ type MatrixSelector struct {
488492

489493
// Needed because this operator is used in the distributed mode
490494
OriginalString string
495+
496+
Shard int
497+
NumShards int
491498
}
492499

493500
func (f MatrixSelector) String() string {

logicalplan/plan_test.go

-6
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ var closedParenthesis = regexp.MustCompile(`\s+\)`)
3232
// by testMatrixSelector that has a overridden string method?
3333
func renderExprTree(expr parser.Expr) string {
3434
switch t := expr.(type) {
35-
case *parser.NumberLiteral:
36-
return fmt.Sprint(t.Val)
3735
case *VectorSelector:
3836
var b strings.Builder
3937
base := t.VectorSelector.String()
@@ -49,8 +47,6 @@ func renderExprTree(expr parser.Expr) string {
4947
return b.String()
5048
}
5149
return base
52-
case *MatrixSelector:
53-
return t.String()
5450
case *parser.BinaryExpr:
5551
var b strings.Builder
5652
b.WriteString(renderExprTree(t.LHS))
@@ -105,8 +101,6 @@ func renderExprTree(expr parser.Expr) string {
105101
b.WriteString(renderExprTree(t.Expr))
106102
b.WriteRune(')')
107103
return b.String()
108-
case *parser.StepInvariantExpr:
109-
return renderExprTree(t.Expr)
110104
default:
111105
return t.String()
112106
}

query/options.go

+9
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package query
55

66
import (
77
"context"
8+
"runtime"
89
"time"
910

1011
"github.com/prometheus/prometheus/promql/parser"
@@ -35,6 +36,14 @@ func (o *Options) NumSteps() int {
3536
return int(totalSteps)
3637
}
3738

39+
func (o *Options) NumShards() int {
40+
numShards := runtime.GOMAXPROCS(0) / 2
41+
if numShards < 1 {
42+
numShards = 1
43+
}
44+
return numShards
45+
}
46+
3847
func (o *Options) IsInstantQuery() bool {
3948
return o.NumSteps() == 1
4049
}

0 commit comments

Comments
 (0)