From fc6c3fe3a4e50c88b69c23afd0ca188ea408d464 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Sun, 15 Dec 2024 15:57:52 +0000 Subject: [PATCH] engine: allow to override opts at query time --- engine/engine.go | 75 +++++++++++++++++++++--------------------------- 1 file changed, 32 insertions(+), 43 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index 6a564854..7a02734c 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -96,6 +96,14 @@ func (o Opts) getLogicalOptimizers() []logicalplan.Optimizer { return optimizers } +// QueryOpts implements promql.QueryOpts but allows to override more engine default options +type QueryOpts struct { + promql.QueryOpts + + // DecodingConcurrency can be used to override the DecodingConcurrency engine setting. + DecodingConcurrency int +} + // New creates a new query engine with the given options. The query engine will // use the storage passed in NewInstantQuery and NewRangeQuery for retrieving // data when executing queries. @@ -239,31 +247,12 @@ func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts if err != nil { return nil, err } - - if opts == nil { - opts = promql.NewPrometheusQueryOpts(false, e.lookbackDelta) - } - if opts.LookbackDelta() <= 0 { - opts = promql.NewPrometheusQueryOpts(opts.EnablePerStepStats(), e.lookbackDelta) - } - // determine sorting order before optimizers run, we do this by looking for "sort" // and "sort_desc" and optimize them away afterwards since they are only needed at // the presentation layer and not when computing the results. resultSort := newResultSort(expr) - qOpts := &query.Options{ - Start: ts, - End: ts, - Step: 0, - StepsBatch: stepsBatch, - LookbackDelta: opts.LookbackDelta(), - EnablePerStepStats: e.enablePerStepStats && opts.EnablePerStepStats(), - ExtLookbackDelta: e.extLookbackDelta, - EnableAnalysis: e.enableAnalysis, - NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn, - DecodingConcurrency: e.decodingConcurrency, - } + qOpts := e.makeQueryOpts(ts, ts, 0, opts) if qOpts.StepsBatch > 64 { return nil, ErrStepsBatchTooLarge } @@ -308,13 +297,6 @@ func (e *Engine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryabl } defer e.activeQueryTracker.Delete(idx) - if opts == nil { - opts = promql.NewPrometheusQueryOpts(false, e.lookbackDelta) - } - if opts.LookbackDelta() <= 0 { - opts = promql.NewPrometheusQueryOpts(opts.EnablePerStepStats(), e.lookbackDelta) - } - qOpts := e.makeQueryOpts(ts, ts, 0, opts) if qOpts.StepsBatch > 64 { return nil, ErrStepsBatchTooLarge @@ -371,12 +353,6 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar { return nil, errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type())) } - if opts == nil { - opts = promql.NewPrometheusQueryOpts(false, e.lookbackDelta) - } - if opts.LookbackDelta() <= 0 { - opts = promql.NewPrometheusQueryOpts(opts.EnablePerStepStats(), e.lookbackDelta) - } qOpts := e.makeQueryOpts(start, end, step, opts) if qOpts.StepsBatch > 64 { return nil, ErrStepsBatchTooLarge @@ -420,12 +396,6 @@ func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable, } defer e.activeQueryTracker.Delete(idx) - if opts == nil { - opts = promql.NewPrometheusQueryOpts(false, e.lookbackDelta) - } - if opts.LookbackDelta() <= 0 { - opts = promql.NewPrometheusQueryOpts(opts.EnablePerStepStats(), e.lookbackDelta) - } qOpts := e.makeQueryOpts(start, end, step, opts) if qOpts.StepsBatch > 64 { return nil, ErrStepsBatchTooLarge @@ -462,19 +432,38 @@ func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable, } func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duration, opts promql.QueryOpts) *query.Options { - qOpts := &query.Options{ + res := &query.Options{ Start: start, End: end, Step: step, StepsBatch: stepsBatch, - LookbackDelta: opts.LookbackDelta(), - EnablePerStepStats: e.enablePerStepStats && opts.EnablePerStepStats(), + LookbackDelta: e.lookbackDelta, + EnablePerStepStats: e.enablePerStepStats, ExtLookbackDelta: e.extLookbackDelta, EnableAnalysis: e.enableAnalysis, NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn, DecodingConcurrency: e.decodingConcurrency, } - return qOpts + if opts == nil { + return res + } + + if opts.LookbackDelta() > 0 { + res.LookbackDelta = opts.LookbackDelta() + } + if opts.EnablePerStepStats() { + res.EnablePerStepStats = opts.EnablePerStepStats() + } + + extOpts, ok := opts.(*QueryOpts) + if !ok { + return res + } + + if extOpts.DecodingConcurrency != 0 { + res.DecodingConcurrency = extOpts.DecodingConcurrency + } + return res } func (e *Engine) storageScanners(queryable storage.Queryable, qOpts *query.Options, lplan logicalplan.Plan) (engstorage.Scanners, error) {