Skip to content

Commit c38e829

Browse files
engine,execution: use logical matrix and vector (#373)
* engine,execution: use logical matrix and vector As the last phase of creating a new plan we replace all Matrix and Vector selectors by our logical ones. This makes it possible to not handle them twice everywhere in the codebase. It will also facilitate more general sharding of them later on since we can annotate them directly with their sharding information. We had to do matrix and vector selector in the same PR since the distributed engine cannot handle logical nodes well (because they generally change the String() representation of the node). We also had to change the String() representation of the logical nodes to be equivalent to the original PromQL expression they came from. Signed-off-by: Michael Hoffmann <[email protected]> * plan: use custom string render method for tests Signed-off-by: Michael Hoffmann <[email protected]> --------- Signed-off-by: Michael Hoffmann <[email protected]>
1 parent 7e2f0a1 commit c38e829

19 files changed

+226
-135
lines changed

engine/user_defined_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ type injectVectorSelector struct{}
6666

6767
func (i injectVectorSelector) Optimize(plan parser.Expr, opts *query.Options) (parser.Expr, annotations.Annotations) {
6868
logicalplan.TraverseBottomUp(nil, &plan, func(_, current *parser.Expr) bool {
69-
switch (*current).(type) {
70-
case *parser.VectorSelector:
69+
switch t := (*current).(type) {
70+
case *logicalplan.VectorSelector:
7171
*current = &logicalVectorSelector{
72-
VectorSelector: (*current).(*parser.VectorSelector),
72+
VectorSelector: t,
7373
}
7474
}
7575
return false
@@ -78,7 +78,7 @@ func (i injectVectorSelector) Optimize(plan parser.Expr, opts *query.Options) (p
7878
}
7979

8080
type logicalVectorSelector struct {
81-
*parser.VectorSelector
81+
*logicalplan.VectorSelector
8282
}
8383

8484
func (c logicalVectorSelector) MakeExecutionOperator(vectors *model.VectorPool, _ *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {

execution/execution.go

Lines changed: 25 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package execution
1919
import (
2020
"runtime"
2121
"sort"
22-
"time"
2322

2423
"github.com/efficientgo/core/errors"
2524
"github.com/prometheus/prometheus/model/labels"
@@ -59,8 +58,8 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
5958
switch e := expr.(type) {
6059
case *parser.NumberLiteral:
6160
return scan.NewNumberLiteralSelector(model.NewVectorPool(opts.StepsBatch), opts, e.Val), nil
62-
case *parser.VectorSelector, *logicalplan.VectorSelector:
63-
return newVectorSelector(expr, storage, opts, hints)
61+
case *logicalplan.VectorSelector:
62+
return newVectorSelector(e, storage, opts, hints)
6463
case *parser.Call:
6564
return newCall(e, storage, opts, hints)
6665
case *parser.AggregateExpr:
@@ -82,34 +81,18 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
8281
case logicalplan.UserDefinedExpr:
8382
return e.MakeExecutionOperator(model.NewVectorPool(opts.StepsBatch), storage, opts, hints)
8483
default:
85-
return nil, errors.Wrapf(parse.ErrNotSupportedExpr, "got: %s", e)
84+
return nil, errors.Wrapf(parse.ErrNotSupportedExpr, "got: %s (%T)", e, e)
8685
}
8786
}
8887

89-
func newVectorSelector(expr parser.Expr, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
90-
var (
91-
batchsize int64
92-
offset time.Duration
93-
selector engstore.SeriesSelector
94-
)
95-
switch e := expr.(type) {
96-
case *parser.VectorSelector:
97-
start, end := getTimeRangesForVectorSelector(e, opts, 0)
98-
hints.Start = start
99-
hints.End = end
100-
offset = e.Offset
101-
batchsize = 0
102-
selector = storage.GetSelector(start, end, opts.Step.Milliseconds(), e.LabelMatchers, hints)
103-
case *logicalplan.VectorSelector:
104-
start, end := getTimeRangesForVectorSelector(e.VectorSelector, opts, 0)
105-
hints.Start = start
106-
hints.End = end
107-
offset = e.Offset
108-
batchsize = e.BatchSize
109-
selector = storage.GetFilteredSelector(start, end, opts.Step.Milliseconds(), e.LabelMatchers, e.Filters, hints)
110-
default:
111-
return nil, errors.Wrapf(parse.ErrNotSupportedExpr, "got: %s", e)
112-
}
88+
func newVectorSelector(e *logicalplan.VectorSelector, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
89+
start, end := getTimeRangesForVectorSelector(e, opts, 0)
90+
hints.Start = start
91+
hints.End = end
92+
93+
offset := e.Offset
94+
batchsize := e.BatchSize
95+
selector := storage.GetFilteredSelector(start, end, opts.Step.Milliseconds(), e.LabelMatchers, e.Filters, hints)
11396

11497
numShards := runtime.GOMAXPROCS(0) / 2
11598
if numShards < 1 {
@@ -138,19 +121,13 @@ func newCall(e *parser.Call, storage *engstore.SelectorPool, opts *query.Options
138121
}
139122
if e.Func.Name == "timestamp" {
140123
switch arg := e.Args[0].(type) {
141-
case *logicalplan.VectorSelector, *parser.VectorSelector:
124+
case *logicalplan.VectorSelector:
142125
// We push down the timestamp function into the scanner through the hints.
143-
return newVectorSelector(e.Args[0], storage, opts, hints)
126+
return newVectorSelector(arg, storage, opts, hints)
144127
case *parser.StepInvariantExpr:
145128
// Step invariant expressions on vector selectors need to be unwrapped so that we
146129
// can return the original timestamp rather than the step invariant one.
147130
switch vs := arg.Expr.(type) {
148-
case *parser.VectorSelector:
149-
// Prometheus weirdness.
150-
if vs.Timestamp != nil {
151-
vs.OriginalOffset = 0
152-
}
153-
return newVectorSelector(vs, storage, opts, hints)
154131
case *logicalplan.VectorSelector:
155132
// Prometheus weirdness.
156133
if vs.Timestamp != nil {
@@ -175,7 +152,7 @@ func newCall(e *parser.Call, storage *engstore.SelectorPool, opts *query.Options
175152
return nil, parse.ErrNotImplemented
176153
}
177154
return newSubqueryFunction(e, t, storage, opts, hints)
178-
case *parser.MatrixSelector:
155+
case *logicalplan.MatrixSelector:
179156
return newRangeVectorFunction(e, t, storage, opts, hints)
180157
}
181158
}
@@ -203,7 +180,7 @@ func newAbsentOverTimeOperator(call *parser.Call, storage *engstore.SelectorPool
203180
Args: []parser.Expr{matrixCall},
204181
}
205182
return function.NewFunctionOperator(f, []model.VectorOperator{argOp}, opts.StepsBatch, opts)
206-
case *parser.MatrixSelector:
183+
case *logicalplan.MatrixSelector:
207184
matrixCall := &parser.Call{
208185
Func: &parser.Function{Name: "last_over_time"},
209186
Args: call.Args,
@@ -221,9 +198,12 @@ func newAbsentOverTimeOperator(call *parser.Call, storage *engstore.SelectorPool
221198
vs.LabelMatchers = append(vs.LabelMatchers, filters...)
222199
f := &parser.Call{
223200
Func: &parser.Function{Name: "absent"},
224-
Args: []parser.Expr{&parser.MatrixSelector{
225-
VectorSelector: vs,
226-
Range: arg.Range,
201+
Args: []parser.Expr{&logicalplan.MatrixSelector{
202+
MatrixSelector: &parser.MatrixSelector{
203+
VectorSelector: vs,
204+
Range: arg.Range,
205+
},
206+
OriginalString: arg.String(),
227207
}},
228208
}
229209
return function.NewFunctionOperator(f, []model.VectorOperator{argOp}, opts.StepsBatch, opts)
@@ -232,7 +212,7 @@ func newAbsentOverTimeOperator(call *parser.Call, storage *engstore.SelectorPool
232212
}
233213
}
234214

235-
func newRangeVectorFunction(e *parser.Call, t *parser.MatrixSelector, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
215+
func newRangeVectorFunction(e *parser.Call, t *logicalplan.MatrixSelector, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
236216
// TODO(saswatamcode): Range vector result might need new operator
237217
// before it can be non-nested. https://github.com/thanos-io/promql-engine/issues/39
238218
batchSize, vs, filters, err := unpackVectorSelector(t)
@@ -466,7 +446,7 @@ func newRemoteExecution(e logicalplan.RemoteExecution, opts *query.Options, hint
466446
}
467447

468448
// Copy from https://github.com/prometheus/prometheus/blob/v2.39.1/promql/engine.go#L791.
469-
func getTimeRangesForVectorSelector(n *parser.VectorSelector, opts *query.Options, evalRange int64) (int64, int64) {
449+
func getTimeRangesForVectorSelector(n *logicalplan.VectorSelector, opts *query.Options, evalRange int64) (int64, int64) {
470450
start := opts.Start.UnixMilli()
471451
end := opts.End.UnixMilli()
472452
if n.Timestamp != nil {
@@ -493,12 +473,10 @@ func unwrapConstVal(e parser.Expr) (float64, error) {
493473
return 0, errors.Wrap(parse.ErrNotSupportedExpr, "matrix selector argument must be a constant")
494474
}
495475

496-
func unpackVectorSelector(t *parser.MatrixSelector) (int64, *parser.VectorSelector, []*labels.Matcher, error) {
476+
func unpackVectorSelector(t *logicalplan.MatrixSelector) (int64, *logicalplan.VectorSelector, []*labels.Matcher, error) {
497477
switch t := t.VectorSelector.(type) {
498-
case *parser.VectorSelector:
499-
return 0, t, nil, nil
500478
case *logicalplan.VectorSelector:
501-
return t.BatchSize, t.VectorSelector, t.Filters, nil
479+
return t.BatchSize, t, t.Filters, nil
502480
default:
503481
return 0, nil, nil, parse.ErrNotSupportedExpr
504482
}

execution/function/absent.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/prometheus/prometheus/promql/parser"
1414

1515
"github.com/thanos-io/promql-engine/execution/model"
16+
"github.com/thanos-io/promql-engine/logicalplan"
1617
)
1718

1819
type absentOperator struct {
@@ -49,10 +50,10 @@ func (o *absentOperator) loadSeries() {
4950
// https://github.com/prometheus/prometheus/blob/main/promql/functions.go#L1385
5051
var lm []*labels.Matcher
5152
switch n := o.funcExpr.Args[0].(type) {
52-
case *parser.VectorSelector:
53+
case *logicalplan.VectorSelector:
5354
lm = n.LabelMatchers
54-
case *parser.MatrixSelector:
55-
lm = n.VectorSelector.(*parser.VectorSelector).LabelMatchers
55+
case *logicalplan.MatrixSelector:
56+
lm = n.VectorSelector.(*logicalplan.VectorSelector).LabelMatchers
5657
default:
5758
o.series = []labels.Labels{labels.EmptyLabels()}
5859
return

execution/function/histogram.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313

1414
"github.com/cespare/xxhash/v2"
1515
"github.com/prometheus/prometheus/model/labels"
16-
1716
"github.com/prometheus/prometheus/promql/parser"
1817

1918
"github.com/thanos-io/promql-engine/execution/model"
@@ -174,8 +173,8 @@ func (o *histogramOperator) processInputSeries(vectors []model.StepVector) ([]mo
174173
out = append(out, step)
175174
o.vectorOp.GetPool().PutStepVector(vector)
176175
}
177-
178176
o.vectorOp.GetPool().PutVectors(vectors)
177+
179178
return out, nil
180179
}
181180

execution/scan/vector_selector.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,6 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) {
169169
o.currentStep += o.step * int64(o.numSteps)
170170
o.currentSeries = 0
171171
}
172-
173172
return vectors, nil
174173
}
175174

execution/step_invariant/step_invariant.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/prometheus/prometheus/promql/parser"
1515

1616
"github.com/thanos-io/promql-engine/execution/model"
17+
"github.com/thanos-io/promql-engine/logicalplan"
1718
"github.com/thanos-io/promql-engine/query"
1819
)
1920

@@ -72,7 +73,7 @@ func NewStepInvariantOperator(
7273
// We do not duplicate results for range selectors since result is a matrix
7374
// with their unique timestamps which does not depend on the step.
7475
switch expr.(type) {
75-
case *parser.MatrixSelector, *parser.SubqueryExpr:
76+
case *logicalplan.MatrixSelector, *parser.SubqueryExpr:
7677
u.cacheResult = false
7778
}
7879
u.OperatorTelemetry = &model.NoopTelemetry{}

logicalplan/distribute.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,9 +413,9 @@ func calculateStartOffset(expr *parser.Expr, lookbackDelta time.Duration) time.D
413413
switch n := (*node).(type) {
414414
case *parser.SubqueryExpr:
415415
selectRange += n.Range
416-
case *parser.MatrixSelector:
416+
case *MatrixSelector:
417417
selectRange += n.Range
418-
case *parser.VectorSelector:
418+
case *VectorSelector:
419419
offset = n.Offset
420420
}
421421
})
@@ -469,7 +469,7 @@ func matchesExternalLabelSet(expr parser.Expr, externalLabelSet []labels.Labels)
469469
}
470470
var selectorSet [][]*labels.Matcher
471471
traverse(&expr, func(current *parser.Expr) {
472-
vs, ok := (*current).(*parser.VectorSelector)
472+
vs, ok := (*current).(*VectorSelector)
473473
if ok {
474474
selectorSet = append(selectorSet, vs.LabelMatchers)
475475
}

logicalplan/distribute_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ sum(
551551
plan := New(expr, &query.Options{Start: tcase.queryStart, End: tcase.queryEnd, Step: time.Minute})
552552
optimizedPlan, _ := plan.Optimize(optimizers)
553553
expectedPlan := cleanUp(replacements, tcase.expected)
554-
testutil.Equals(t, expectedPlan, optimizedPlan.Expr().String())
554+
testutil.Equals(t, expectedPlan, renderExprTree(optimizedPlan.Expr()))
555555
})
556556
}
557557
}

logicalplan/filter.go

Lines changed: 0 additions & 40 deletions
This file was deleted.

logicalplan/merge_selects.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (m MergeSelectsOptimizer) Optimize(plan parser.Expr, _ *query.Options) (par
3333

3434
func extractSelectors(selectors matcherHeap, expr parser.Expr) {
3535
traverse(&expr, func(node *parser.Expr) {
36-
e, ok := (*node).(*parser.VectorSelector)
36+
e, ok := (*node).(*VectorSelector)
3737
if !ok {
3838
return
3939
}
@@ -49,8 +49,6 @@ func replaceMatchers(selectors matcherHeap, expr *parser.Expr) {
4949
traverse(expr, func(node *parser.Expr) {
5050
var matchers []*labels.Matcher
5151
switch e := (*node).(type) {
52-
case *parser.VectorSelector:
53-
matchers = e.LabelMatchers
5452
case *VectorSelector:
5553
matchers = e.LabelMatchers
5654
default:
@@ -85,12 +83,6 @@ func replaceMatchers(selectors matcherHeap, expr *parser.Expr) {
8583
}
8684

8785
switch e := (*node).(type) {
88-
case *parser.VectorSelector:
89-
e.LabelMatchers = replacement
90-
*node = &VectorSelector{
91-
VectorSelector: e,
92-
Filters: filters,
93-
}
9486
case *VectorSelector:
9587
e.LabelMatchers = replacement
9688
e.Filters = filters

logicalplan/merge_selects_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func TestMergeSelects(t *testing.T) {
4747

4848
plan := New(expr, &query.Options{})
4949
optimizedPlan, _ := plan.Optimize(optimizers)
50-
testutil.Equals(t, tcase.expected, optimizedPlan.Expr().String())
50+
testutil.Equals(t, tcase.expected, renderExprTree(optimizedPlan.Expr()))
5151
})
5252
}
5353
}

logicalplan/passthrough.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (m PassthroughOptimizer) Optimize(plan parser.Expr, opts *query.Options) (p
6363

6464
matchingLabelsEngines := make([]api.RemoteEngine, 0, len(engines))
6565
TraverseBottomUp(nil, &plan, func(parent, current *parser.Expr) (stop bool) {
66-
if vs, ok := (*current).(*parser.VectorSelector); ok {
66+
if vs, ok := (*current).(*VectorSelector); ok {
6767
for _, e := range engines {
6868
if !labelSetsMatch(vs.LabelMatchers, e.LabelSets()...) {
6969
continue

0 commit comments

Comments
 (0)