Skip to content

Commit 4c3bd49

Browse files
execution,plan: move timestamp handling to plan (#397)
* move timestamp handling to plan Signed-off-by: Michael Hoffmann <[email protected]>
1 parent d3b776e commit 4c3bd49

File tree

3 files changed

+41
-33
lines changed

3 files changed

+41
-33
lines changed

engine/distributed_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ func TestDistributedAggregations(t *testing.T) {
244244
query: `max_over_time(min_over_time(sum(bar)[15s:15s])[15s:15s])`,
245245
},
246246
{name: "subquery over distributed binary expression", query: `max_over_time((bar/bar)[30s:15s])`},
247+
{name: "timestamp", query: `timestamp(bar)`},
248+
{name: "timestamp - step invariant", query: `timestamp(bar @ 6000)`},
247249
}
248250

249251
lookbackDeltas := []time.Duration{0, 30 * time.Second, 5 * time.Minute}

execution/execution.go

+4-29
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,12 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
6060
case *parser.NumberLiteral:
6161
return scan.NewNumberLiteralSelector(model.NewVectorPool(opts.StepsBatch), opts, e.Val), nil
6262
case *logicalplan.VectorSelector:
63-
op, err := newVectorSelector(e, storage, opts, hints)
64-
if err != nil {
65-
return nil, err
66-
}
63+
op := newVectorSelector(e, storage, opts, hints)
6764
if e.SelectTimestamp {
6865
// we will drop the __name__ label here, so we need to check for duplicate labels
6966
return exchange.NewDuplicateLabelCheck(op, opts), nil
7067
}
71-
return op, err
68+
return op, nil
7269
case *parser.Call:
7370
op, err := newCall(e, storage, opts, hints)
7471
if err != nil {
@@ -110,7 +107,7 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
110107
}
111108
}
112109

113-
func newVectorSelector(e *logicalplan.VectorSelector, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
110+
func newVectorSelector(e *logicalplan.VectorSelector, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) model.VectorOperator {
114111
start, end := getTimeRangesForVectorSelector(e, opts, 0)
115112
hints.Start = start
116113
hints.End = end
@@ -132,7 +129,7 @@ func newVectorSelector(e *logicalplan.VectorSelector, storage *engstore.Selector
132129
operators = append(operators, exchange.NewConcurrent(operator, 2, opts))
133130
}
134131

135-
return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, batchsize*int64(numShards), operators...), nil
132+
return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, batchsize*int64(numShards), operators...)
136133
}
137134

138135
func newCall(e *parser.Call, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
@@ -143,28 +140,6 @@ func newCall(e *parser.Call, storage *engstore.SelectorPool, opts *query.Options
143140
if e.Func.Name == "absent_over_time" {
144141
return newAbsentOverTimeOperator(e, storage, opts, hints)
145142
}
146-
if e.Func.Name == "timestamp" {
147-
switch arg := e.Args[0].(type) {
148-
case *logicalplan.VectorSelector:
149-
arg.SelectTimestamp = true
150-
return newVectorSelector(arg, storage, opts, hints)
151-
case *parser.StepInvariantExpr:
152-
// Step invariant expressions on vector selectors need to be unwrapped so that we
153-
// can return the original timestamp rather than the step invariant one.
154-
switch vs := arg.Expr.(type) {
155-
case *logicalplan.VectorSelector:
156-
// Prometheus weirdness.
157-
if vs.Timestamp != nil {
158-
vs.OriginalOffset = 0
159-
}
160-
vs.SelectTimestamp = true
161-
return newVectorSelector(vs, storage, opts, hints)
162-
}
163-
return newInstantVectorFunction(e, storage, opts, hints)
164-
}
165-
return newInstantVectorFunction(e, storage, opts, hints)
166-
}
167-
168143
// TODO(saswatamcode): Range vector result might need new operator
169144
// before it can be non-nested. https://github.com/thanos-io/promql-engine/issues/39
170145
for i := range e.Args {

logicalplan/plan.go

+35-4
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ func New(expr parser.Expr, opts *query.Options) Plan {
4848
setOffsetForAtModifier(opts.Start.UnixMilli(), expr)
4949
setOffsetForInnerSubqueries(expr, opts)
5050

51-
// * the engine handles sorting at the presentation layer
52-
// * parens are just annoying and getting rid of them doesnt change the query
51+
// the engine handles sorting at the presentation layer
5352
expr = trimSorts(expr)
5453

5554
// replace scanners by our logical nodes
@@ -68,8 +67,10 @@ func (p *plan) Optimize(optimizers []Optimizer) (Plan, annotations.Annotations)
6867
p.expr, a = o.Optimize(p.expr, p.opts)
6968
annos.Merge(a)
7069
}
70+
// parens are just annoying and getting rid of them doesnt change the query
71+
expr := trimParens(p.expr)
7172

72-
return &plan{expr: trimParens(p.expr), opts: p.opts}, *annos
73+
return &plan{expr: expr, opts: p.opts}, *annos
7374
}
7475

7576
func (p *plan) Expr() parser.Expr {
@@ -174,7 +175,6 @@ func TraverseBottomUp(parent *parser.Expr, current *parser.Expr, transform func(
174175
}
175176
return transform(parent, current)
176177
}
177-
178178
return true
179179
}
180180

@@ -185,6 +185,23 @@ func replaceSelectors(plan parser.Expr) parser.Expr {
185185
*current = &MatrixSelector{MatrixSelector: t, OriginalString: t.String()}
186186
case *parser.VectorSelector:
187187
*current = &VectorSelector{VectorSelector: t}
188+
case *parser.Call:
189+
if t.Func.Name != "timestamp" {
190+
return
191+
}
192+
switch v := unwrapParens(t.Args[0]).(type) {
193+
case *parser.VectorSelector:
194+
*current = &VectorSelector{VectorSelector: v, SelectTimestamp: true}
195+
case *parser.StepInvariantExpr:
196+
vs, ok := unwrapParens(v.Expr).(*parser.VectorSelector)
197+
if ok {
198+
// Prometheus weirdness
199+
if vs.Timestamp != nil {
200+
vs.OriginalOffset = 0
201+
}
202+
*current = &VectorSelector{VectorSelector: vs, SelectTimestamp: true}
203+
}
204+
}
188205
}
189206
})
190207
return plan
@@ -355,6 +372,15 @@ func newStepInvariantExpr(expr parser.Expr) parser.Expr {
355372
return &parser.StepInvariantExpr{Expr: expr}
356373
}
357374

375+
func unwrapParens(expr parser.Expr) parser.Expr {
376+
switch t := expr.(type) {
377+
case *parser.ParenExpr:
378+
return unwrapParens(t.Expr)
379+
default:
380+
return t
381+
}
382+
}
383+
358384
// Copy from https://github.com/prometheus/prometheus/blob/v2.39.1/promql/engine.go#L2658.
359385
func setOffsetForAtModifier(evalTime int64, expr parser.Expr) {
360386
getOffset := func(ts *int64, originalOffset time.Duration, path []parser.Node) time.Duration {
@@ -439,6 +465,11 @@ type VectorSelector struct {
439465
}
440466

441467
func (f VectorSelector) String() string {
468+
if f.SelectTimestamp {
469+
// If we pushed down timestamp into the vector selector we need to render the proper
470+
// PromQL again.
471+
return fmt.Sprintf("timestamp(%s)", f.VectorSelector.String())
472+
}
442473
return f.VectorSelector.String()
443474
}
444475

0 commit comments

Comments
 (0)