Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions api/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,38 @@
package api

import (
"context"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
)

type Opts struct {
Ctx context.Context
Query string
Start time.Time
End time.Time
Step time.Duration
LookbackDelta time.Duration
}

type RemoteEndpoints interface {
Engines() []RemoteEngine
Engines(opts *Opts) []RemoteEngine
}

type RemoteEngine interface {
MaxT() int64
MinT() int64
LabelSets() []labels.Labels
NewRangeQuery(opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error)
NewRangeQuery(ctx context.Context, opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error)
}

type staticEndpoints struct {
engines []RemoteEngine
}

func (m staticEndpoints) Engines() []RemoteEngine {
func (m staticEndpoints) Engines(opts *Opts) []RemoteEngine {
return m.engines
}

Expand Down
22 changes: 12 additions & 10 deletions engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func BenchmarkRangeQuery(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
qry, err := engine.NewRangeQuery(tc.test.Queryable(), nil, tc.query, start, end, step)
qry, err := engine.NewRangeQuery(tc.test.Context(), tc.test.Queryable(), nil, tc.query, start, end, step)
testutil.Ok(b, err)

oldResult := qry.Exec(tc.test.Context())
Expand Down Expand Up @@ -361,7 +361,7 @@ func BenchmarkNativeHistograms(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
qry, err := engine.NewRangeQuery(test.Queryable(), nil, tc.query, start, end, step)
qry, err := engine.NewRangeQuery(test.Context(), test.Queryable(), nil, tc.query, start, end, step)
testutil.Ok(b, err)

oldResult := qry.Exec(test.Context())
Expand All @@ -375,7 +375,7 @@ func BenchmarkNativeHistograms(b *testing.B) {
for i := 0; i < b.N; i++ {
ng := engine.New(engine.Opts{EngineOpts: opts})

qry, err := ng.NewRangeQuery(test.Queryable(), nil, tc.query, start, end, step)
qry, err := ng.NewRangeQuery(test.Context(), test.Queryable(), nil, tc.query, start, end, step)
testutil.Ok(b, err)

newResult := qry.Exec(context.Background())
Expand Down Expand Up @@ -474,7 +474,7 @@ func BenchmarkInstantQuery(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
qry, err := engine.NewInstantQuery(test.Queryable(), nil, tc.query, queryTime)
qry, err := engine.NewInstantQuery(test.Context(), test.Queryable(), nil, tc.query, queryTime)
testutil.Ok(b, err)

res := qry.Exec(test.Context())
Expand All @@ -487,7 +487,7 @@ func BenchmarkInstantQuery(b *testing.B) {
b.ReportAllocs()

for i := 0; i < b.N; i++ {
qry, err := ng.NewInstantQuery(test.Queryable(), nil, tc.query, queryTime)
qry, err := ng.NewInstantQuery(test.Context(), test.Queryable(), nil, tc.query, queryTime)
testutil.Ok(b, err)

res := qry.Exec(context.Background())
Expand Down Expand Up @@ -515,10 +515,11 @@ func BenchmarkMergeSelectorsOptimizer(b *testing.B) {
EngineOpts: promql.EngineOpts{Timeout: 100 * time.Second},
}
ng := engine.New(opts)
qry, err := ng.NewRangeQuery(db, nil, query, start, end, step)
ctx := context.Background()
qry, err := ng.NewRangeQuery(ctx, db, nil, query, start, end, step)
testutil.Ok(b, err)

res := qry.Exec(context.Background())
res := qry.Exec(ctx)
testutil.Ok(b, res.Err)
}
})
Expand All @@ -527,10 +528,11 @@ func BenchmarkMergeSelectorsOptimizer(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
ng := engine.New(engine.Opts{EngineOpts: promql.EngineOpts{Timeout: 100 * time.Second}})
qry, err := ng.NewRangeQuery(db, nil, query, start, end, step)
ctx := context.Background()
qry, err := ng.NewRangeQuery(ctx, db, nil, query, start, end, step)
testutil.Ok(b, err)

res := qry.Exec(context.Background())
res := qry.Exec(ctx)
testutil.Ok(b, res.Err)
}
})
Expand All @@ -543,7 +545,7 @@ func executeRangeQuery(b *testing.B, q string, test *promql.Test, start time.Tim

func executeRangeQueryWithOpts(b *testing.B, q string, test *promql.Test, start time.Time, end time.Time, step time.Duration, opts engine.Opts) *promql.Result {
ng := engine.New(opts)
qry, err := ng.NewRangeQuery(test.Queryable(), nil, q, start, end, step)
qry, err := ng.NewRangeQuery(test.Context(), test.Queryable(), nil, q, start, end, step)
testutil.Ok(b, err)

return qry.Exec(context.Background())
Expand Down
14 changes: 8 additions & 6 deletions engine/distributed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,15 @@ func TestDistributedAggregations(t *testing.T) {
distEngine := engine.NewDistributedEngine(distOpts,
api.NewStaticEndpoints(remoteEngines),
)
distQry, err := distEngine.NewInstantQuery(completeSeriesSet, queryOpts, query.query, instantTS)
ctx := context.Background()
distQry, err := distEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS)
testutil.Ok(t, err)

distResult := distQry.Exec(context.Background())
promEngine := promql.NewEngine(localOpts.EngineOpts)
promQry, err := promEngine.NewInstantQuery(completeSeriesSet, queryOpts, query.query, instantTS)
promQry, err := promEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS)
testutil.Ok(t, err)
promResult := promQry.Exec(context.Background())
promResult := promQry.Exec(ctx)

roundValues(promResult)
roundValues(distResult)
Expand All @@ -278,14 +279,15 @@ func TestDistributedAggregations(t *testing.T) {
distEngine := engine.NewDistributedEngine(distOpts,
api.NewStaticEndpoints(remoteEngines),
)
distQry, err := distEngine.NewRangeQuery(completeSeriesSet, queryOpts, query.query, rangeStart, test.rangeEnd, rangeStep)
ctx := context.Background()
distQry, err := distEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, rangeStart, test.rangeEnd, rangeStep)
testutil.Ok(t, err)

distResult := distQry.Exec(context.Background())
promEngine := promql.NewEngine(localOpts.EngineOpts)
promQry, err := promEngine.NewRangeQuery(completeSeriesSet, queryOpts, query.query, rangeStart, test.rangeEnd, rangeStep)
promQry, err := promEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, rangeStart, test.rangeEnd, rangeStep)
testutil.Ok(t, err)
promResult := promQry.Exec(context.Background())
promResult := promQry.Exec(ctx)

roundValues(promResult)
roundValues(distResult)
Expand Down
24 changes: 14 additions & 10 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func (l remoteEngine) LabelSets() []labels.Labels {
return l.labelSets
}

func (l remoteEngine) NewRangeQuery(opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
return l.engine.NewRangeQuery(l.q, opts, qs, start, end, interval)
func (l remoteEngine) NewRangeQuery(ctx context.Context, opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
return l.engine.NewRangeQuery(ctx, l.q, opts, qs, start, end, interval)
}

type distributedEngine struct {
Expand All @@ -140,22 +140,22 @@ func NewDistributedEngine(opts Opts, endpoints api.RemoteEndpoints) v1.QueryEngi

func (l distributedEngine) SetQueryLogger(log promql.QueryLogger) {}

func (l distributedEngine) NewInstantQuery(q storage.Queryable, opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
func (l distributedEngine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
// Some clients might only support second precision when executing queries.
ts = ts.Truncate(time.Second)

return l.remoteEngine.NewInstantQuery(q, opts, qs, ts)
return l.remoteEngine.NewInstantQuery(ctx, q, opts, qs, ts)
}

func (l distributedEngine) NewRangeQuery(q storage.Queryable, opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
func (l distributedEngine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
// Some clients might only support second precision when executing queries.
start = start.Truncate(time.Second)
end = end.Truncate(time.Second)
interval = interval.Truncate(time.Second)

return l.remoteEngine.NewRangeQuery(q, opts, qs, start, end, interval)
return l.remoteEngine.NewRangeQuery(ctx, q, opts, qs, start, end, interval)
}

func New(opts Opts) *compatibilityEngine {
Expand Down Expand Up @@ -236,7 +236,7 @@ func (e *compatibilityEngine) SetQueryLogger(l promql.QueryLogger) {
e.prom.SetQueryLogger(l)
}

func (e *compatibilityEngine) NewInstantQuery(q storage.Queryable, opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
func (e *compatibilityEngine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
expr, err := parser.ParseExpr(qs)
if err != nil {
return nil, err
Expand All @@ -256,6 +256,8 @@ func (e *compatibilityEngine) NewInstantQuery(q storage.Queryable, opts *promql.
resultSort := newResultSort(expr)

lplan := logicalplan.New(expr, &logicalplan.Opts{
Ctx: ctx,
Query: qs,
Start: ts,
End: ts,
Step: 1,
Expand All @@ -266,7 +268,7 @@ func (e *compatibilityEngine) NewInstantQuery(q storage.Queryable, opts *promql.
exec, err := execution.New(lplan.Expr(), q, ts, ts, 0, opts.LookbackDelta, e.extLookbackDelta)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewInstantQuery(q, opts, qs, ts)
return e.prom.NewInstantQuery(ctx, q, opts, qs, ts)
}
e.metrics.queries.WithLabelValues("false").Inc()
if err != nil {
Expand All @@ -287,7 +289,7 @@ func (e *compatibilityEngine) NewInstantQuery(q storage.Queryable, opts *promql.
}, nil
}

func (e *compatibilityEngine) NewRangeQuery(q storage.Queryable, opts *promql.QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) {
func (e *compatibilityEngine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts *promql.QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) {
expr, err := parser.ParseExpr(qs)
if err != nil {
return nil, err
Expand All @@ -307,6 +309,8 @@ func (e *compatibilityEngine) NewRangeQuery(q storage.Queryable, opts *promql.Qu
}

lplan := logicalplan.New(expr, &logicalplan.Opts{
Ctx: ctx,
Query: qs,
Start: start,
End: end,
Step: step,
Expand All @@ -317,7 +321,7 @@ func (e *compatibilityEngine) NewRangeQuery(q storage.Queryable, opts *promql.Qu
exec, err := execution.New(lplan.Expr(), q, start, end, step, opts.LookbackDelta, e.extLookbackDelta)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewRangeQuery(q, opts, qs, start, end, step)
return e.prom.NewRangeQuery(ctx, q, opts, qs, start, end, step)
}
e.metrics.queries.WithLabelValues("false").Inc()
if err != nil {
Expand Down
Loading