Skip to content

Commit

Permalink
Implement missing parts
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Jan 3, 2025
1 parent 5d19f58 commit c3a5bbd
Show file tree
Hide file tree
Showing 34 changed files with 545 additions and 255 deletions.
71 changes: 43 additions & 28 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,15 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine {
disableDuplicateLabelChecks: opts.DisableDuplicateLabelChecks,
disableFallback: opts.DisableFallback,

logger: opts.Logger,
lookbackDelta: opts.LookbackDelta,
enablePerStepStats: opts.EnablePerStepStats,
logicalOptimizers: opts.getLogicalOptimizers(),
timeout: opts.Timeout,
metrics: metrics,
extLookbackDelta: opts.ExtLookbackDelta,
enableAnalysis: opts.EnableAnalysis,
logger: opts.Logger,
lookbackDelta: opts.LookbackDelta,
enablePerStepStats: opts.EnablePerStepStats,
logicalOptimizers: opts.getLogicalOptimizers(),
timeout: opts.Timeout,
metrics: metrics,
extLookbackDelta: opts.ExtLookbackDelta,
enableAnalysis: opts.EnableAnalysis,
enableDelayedNameRemoval: opts.EnableDelayedNameRemoval,
noStepSubqueryIntervalFn: func(d time.Duration) time.Duration {
return time.Duration(opts.NoStepSubqueryIntervalFn(d.Milliseconds()) * 1000000)
},
Expand Down Expand Up @@ -225,6 +226,8 @@ type Engine struct {
extLookbackDelta time.Duration
decodingConcurrency int
enableAnalysis bool
enablePartialResponses bool
enableDelayedNameRemoval bool
noStepSubqueryIntervalFn func(time.Duration) time.Duration
}

Expand Down Expand Up @@ -290,14 +293,15 @@ func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts
return nil, err
}
return &compatibilityQuery{
Query: &Query{exec: exec, opts: opts},
engine: e,
plan: lplan,
ts: ts,
warns: warns,
t: InstantQuery,
resultSort: resultSort,
scanners: scanners,
Query: &Query{exec: exec, opts: opts},
engine: e,
plan: lplan,
ts: ts,
warns: warns,
t: InstantQuery,
resultSort: resultSort,
scanners: scanners,
enableDelayedNameRemoval: e.enableDelayedNameRemoval,
}, nil
}

Expand Down Expand Up @@ -350,8 +354,9 @@ func (e *Engine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryabl
warns: warns,
t: InstantQuery,
// TODO(fpetkovski): Infer the sort order from the plan, ideally without copying the newResultSort function.
resultSort: noSortResultSort{},
scanners: scnrs,
resultSort: noSortResultSort{},
scanners: scnrs,
enableDelayedNameRemoval: e.enableDelayedNameRemoval,
}, nil
}

Expand Down Expand Up @@ -404,12 +409,13 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr
}

return &compatibilityQuery{
Query: &Query{exec: exec, opts: opts},
engine: e,
plan: lplan,
warns: warns,
t: RangeQuery,
scanners: scnrs,
Query: &Query{exec: exec, opts: opts},
engine: e,
plan: lplan,
warns: warns,
t: RangeQuery,
scanners: scnrs,
enableDelayedNameRemoval: e.enableDelayedNameRemoval,
}, nil
}

Expand Down Expand Up @@ -471,6 +477,8 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
EnablePerStepStats: e.enablePerStepStats && opts.EnablePerStepStats(),
ExtLookbackDelta: e.extLookbackDelta,
EnableAnalysis: e.enableAnalysis,
EnablePartialResponses: e.enablePartialResponses,
EnableDelayedNameRemoval: e.enableDelayedNameRemoval,
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
DecodingConcurrency: e.decodingConcurrency,
}
Expand Down Expand Up @@ -517,9 +525,10 @@ type compatibilityQuery struct {
ts time.Time // Empty for range queries.
warns annotations.Annotations

t QueryType
resultSort resultSorter
cancel context.CancelFunc
t QueryType
resultSort resultSorter
cancel context.CancelFunc
enableDelayedNameRemoval bool

scanners engstorage.Scanners
}
Expand Down Expand Up @@ -561,7 +570,10 @@ func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) {

series := make([]promql.Series, len(resultSeries))
for i, s := range resultSeries {
series[i].Metric = s
if s.DropName && q.enableDelayedNameRemoval {
s.Metric = s.Metric.DropMetricName()
}
series[i] = s
}
loop:
for {
Expand Down Expand Up @@ -653,6 +665,9 @@ loop:
}
}
sort.Slice(vector, q.resultSort.comparer(&vector))
if vector.ContainsSameLabelset() {
return newErrResult(ret, extlabels.ErrDuplicateLabelSet)
}
result = vector
case parser.ValueTypeScalar:
v := math.NaN()
Expand Down
4 changes: 3 additions & 1 deletion engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,16 @@ func TestPromqlAcceptance(t *testing.T) {

engine := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{
Logger: promslog.NewNopLogger(),
EnableAtModifier: true,
EnableNegativeOffset: true,
MaxSamples: 5e10,
Timeout: 1 * time.Hour,
NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 { return 30 * time.Second.Milliseconds() },
EnableDelayedNameRemoval: true,
}})

promqltest.RunBuiltinTests(t, engine)
promqltest.RunBuiltinTests(t, engine, promqltest.WithSkipEvalInfo(true), promqltest.WithSkipEvalWarn(true))
}

func TestVectorSelectorWithGaps(t *testing.T) {
Expand Down
25 changes: 15 additions & 10 deletions engine/existing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ func TestRangeQuery(t *testing.T) {
Query: `sum_over_time(bar[30s])`,
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 10, T: 60000}, {F: 1000, T: 120000}},
Metric: labels.Labels{},
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 10, T: 60000}, {F: 1000, T: 120000}},
Metric: labels.Labels{},
DropName: true,
},
},
Start: time.Unix(0, 0),
Expand All @@ -49,8 +50,9 @@ func TestRangeQuery(t *testing.T) {
Query: `sum_over_time(bar[45s])`,
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 11, T: 60000}, {F: 1100, T: 120000}},
Metric: labels.Labels{},
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 11, T: 60000}, {F: 1100, T: 120000}},
Metric: labels.Labels{},
DropName: true,
},
},
Start: time.Unix(0, 0),
Expand All @@ -64,8 +66,9 @@ func TestRangeQuery(t *testing.T) {
Query: `sum_over_time(bar[45s])`,
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 11, T: 60000}, {F: 1100, T: 120000}},
Metric: labels.Labels{},
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 11, T: 60000}, {F: 1100, T: 120000}},
Metric: labels.Labels{},
DropName: true,
},
},
Start: time.Unix(0, 0),
Expand All @@ -79,8 +82,9 @@ func TestRangeQuery(t *testing.T) {
Query: `sum_over_time(bar[45s])`,
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 11, T: 60000}, {F: 1100, T: 120000}, {F: 110000, T: 180000}, {F: 11000000, T: 240000}},
Metric: labels.Labels{},
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 11, T: 60000}, {F: 1100, T: 120000}, {F: 110000, T: 180000}, {F: 11000000, T: 240000}},
Metric: labels.Labels{},
DropName: true,
},
},
Start: time.Unix(0, 0),
Expand All @@ -94,8 +98,9 @@ func TestRangeQuery(t *testing.T) {
Query: `sum_over_time(bar[45s])`,
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 5, T: 0}, {F: 59, T: 60000}, {F: 9, T: 120000}, {F: 956, T: 180000}},
Metric: labels.Labels{},
Floats: []promql.FPoint{{F: 5, T: 0}, {F: 59, T: 60000}, {F: 9, T: 120000}, {F: 956, T: 180000}},
Metric: labels.Labels{},
DropName: true,
},
},
Start: time.Unix(0, 0),
Expand Down
9 changes: 4 additions & 5 deletions engine/user_defined_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ load 30s

expected := promql.Matrix{
promql.Series{
Metric: labels.EmptyLabels(),
Floats: []promql.FPoint{{T: 0, F: 14}, {T: 30000, F: 14}, {T: 60000, F: 14}, {T: 90000, F: 14}},
},
}
Expand Down Expand Up @@ -122,10 +121,10 @@ func (c *vectorSelectorOperator) Next(ctx context.Context) ([]model.StepVector,
return vectors, nil
}

func (c *vectorSelectorOperator) Series(ctx context.Context) ([]labels.Labels, error) {
return []labels.Labels{
labels.FromStrings(labels.MetricName, "http_requests_total", "container", "a"),
labels.FromStrings(labels.MetricName, "http_requests_total", "container", "b"),
func (c *vectorSelectorOperator) Series(ctx context.Context) ([]promql.Series, error) {
return []promql.Series{
{Metric: labels.FromStrings(labels.MetricName, "http_requests_total", "container", "a")},
{Metric: labels.FromStrings(labels.MetricName, "http_requests_total", "container", "b")},
}, nil
}

Expand Down
50 changes: 44 additions & 6 deletions execution/aggregate/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package aggregate

import (
"errors"
"math"

"github.com/prometheus/prometheus/model/histogram"
Expand Down Expand Up @@ -70,11 +71,27 @@ func (s *sumAcc) Add(v float64, h *histogram.FloatHistogram) error {
var err error
if h.Schema >= s.histSum.Schema {
if s.histSum, err = s.histSum.Add(h); err != nil {
if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) {
// skip warning
return nil
}
if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) {
// skip warning
return nil
}
return err
}
} else {
t := h.Copy()
if _, err = t.Add(s.histSum); err != nil {
if s.histSum, err = t.Add(s.histSum); err != nil {
if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) {
// skip warning
return nil
}
if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) {
// skip warning
return nil
}
return err
}
s.histSum = t
Expand Down Expand Up @@ -389,6 +406,14 @@ func (a *avgAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) error {
}
for _, h := range hs {
if err := a.Add(0, h); err != nil {
if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) {
// skip warning
return nil
}
if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) {
// skip warning
return nil
}
return err
}
}
Expand Down Expand Up @@ -431,12 +456,21 @@ type statAcc struct {
}

func (s *statAcc) Add(v float64, h *histogram.FloatHistogram) error {
if h != nil {
// ignore native histogram for STDVAR and STDDEV.
return nil
}

s.hasValue = true
s.count++

delta := v - s.mean
s.mean += delta / s.count
s.value += delta * (v - s.mean)
if math.IsNaN(v) || math.IsInf(v, 0) {
s.value = math.NaN()
} else {
delta := v - s.mean
s.mean += delta / s.count
s.value += delta * (v - s.mean)
}
return nil
}

Expand All @@ -458,7 +492,7 @@ type stdDevAcc struct {
statAcc
}

func newStdDevAcc() accumulator {
func newStdDevAcc() *stdDevAcc {
return &stdDevAcc{}
}

Expand All @@ -473,11 +507,15 @@ type stdVarAcc struct {
statAcc
}

func newStdVarAcc() accumulator {
func newStdVarAcc() *stdVarAcc {
return &stdVarAcc{}
}

func (s *stdVarAcc) Value() (float64, *histogram.FloatHistogram) {
if math.IsNaN(s.value) {
return math.NaN(), nil
}

if s.count == 1 {
return 0, nil
}
Expand Down
9 changes: 5 additions & 4 deletions execution/aggregate/count_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"

"github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/query"
Expand All @@ -32,7 +33,7 @@ type countValuesOperator struct {

ts []int64
counts []map[int]int
series []labels.Labels
series []promql.Series

once sync.Once
}
Expand Down Expand Up @@ -70,7 +71,7 @@ func (c *countValuesOperator) String() string {
return fmt.Sprintf("[countValues] without (%v) - param (%v)", c.grouping, c.param)
}

func (c *countValuesOperator) Series(ctx context.Context) ([]labels.Labels, error) {
func (c *countValuesOperator) Series(ctx context.Context) ([]promql.Series, error) {
start := time.Now()
defer func() { c.AddExecutionTimeTaken(time.Since(start)) }()

Expand Down Expand Up @@ -141,7 +142,7 @@ func (c *countValuesOperator) initSeriesOnce(ctx context.Context) error {

ts := make([]int64, 0)
counts := make([]map[int]int, 0)
series := make([]labels.Labels, 0)
series := make([]promql.Series, 0)

b := labels.NewBuilder(labels.EmptyLabels())
for {
Expand Down Expand Up @@ -180,7 +181,7 @@ func (c *countValuesOperator) initSeriesOnce(ctx context.Context) error {
hash := lbls.Hash()
outputId, ok := hashToOutputId[hash]
if !ok {
series = append(series, lbls)
series = append(series, promql.Series{Metric: lbls})
outputId = len(series) - 1
hashToOutputId[hash] = outputId
}
Expand Down
Loading

0 comments on commit c3a5bbd

Please sign in to comment.