Skip to content

Commit

Permalink
Merge branch 'main' into mhoffm-add-sample-limits
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann authored Aug 18, 2023
2 parents 318b752 + 5816f4f commit 4c647ab
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 0 deletions.
31 changes: 31 additions & 0 deletions engine/distributed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"testing"
"time"

"github.com/efficientgo/core/errors"
"github.com/efficientgo/core/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/promql-engine/api"
"github.com/thanos-io/promql-engine/engine"
Expand Down Expand Up @@ -329,3 +331,32 @@ func TestDistributedAggregations(t *testing.T) {
}
}
}

func TestDistributedEngineWarnings(t *testing.T) {
querier := &storage.MockQueryable{
MockQuerier: &storage.MockQuerier{
SelectMockFunction: func(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
return newWarningsSeriesSet(storage.Warnings{errors.New("test warning")})
},
},
}

opts := engine.Opts{
EngineOpts: promql.EngineOpts{
MaxSamples: math.MaxInt64,
Timeout: 1 * time.Minute,
},
}
remote := engine.NewRemoteEngine(opts, querier, math.MinInt64, math.MaxInt64, nil)
ng := engine.NewDistributedEngine(opts, api.NewStaticEndpoints([]api.RemoteEngine{remote}))
var (
start = time.UnixMilli(0)
end = time.UnixMilli(600)
step = 30 * time.Second
)
q, err := ng.NewRangeQuery(context.Background(), nil, nil, "test", start, end, step)
testutil.Ok(t, err)

res := q.Exec(context.Background())
testutil.Equals(t, 1, len(res.Warnings))
}
1 change: 1 addition & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type distributedEngine struct {

func NewDistributedEngine(opts Opts, endpoints api.RemoteEndpoints) v1.QueryEngine {
opts.LogicalOptimizers = []logicalplan.Optimizer{
logicalplan.PassthroughOptimizer{Endpoints: endpoints},
logicalplan.DistributedExecutionOptimizer{Endpoints: endpoints},
}

Expand Down
2 changes: 2 additions & 0 deletions execution/remote/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/thanos-io/promql-engine/execution/scan"
engstore "github.com/thanos-io/promql-engine/execution/storage"
"github.com/thanos-io/promql-engine/execution/tracking"
"github.com/thanos-io/promql-engine/execution/warnings"
"github.com/thanos-io/promql-engine/query"
)

Expand Down Expand Up @@ -103,6 +104,7 @@ func (s *storageAdapter) GetSeries(ctx context.Context, _, _ int) ([]engstore.Si

func (s *storageAdapter) executeQuery(ctx context.Context) {
result := s.query.Exec(ctx)
warnings.AddToContext(result.Warnings, ctx)
if result.Err != nil {
s.err = result.Err
return
Expand Down
28 changes: 28 additions & 0 deletions logicalplan/passthrough.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) The Thanos Community Authors.
// Licensed under the Apache License 2.0.

package logicalplan

import (
"github.com/thanos-io/promql-engine/api"
"github.com/thanos-io/promql-engine/parser"
)

// PassthroughOptimizer optimizes queries which can be simply passed
// through to a RemoteEngine.
type PassthroughOptimizer struct {
Endpoints api.RemoteEndpoints
}

func (m PassthroughOptimizer) Optimize(plan parser.Expr, opts *Opts) parser.Expr {
engines := m.Endpoints.Engines()
if len(engines) == 1 {
return RemoteExecution{
Engine: engines[0],
Query: plan.String(),
QueryRangeStart: opts.Start,
}
}

return plan
}
47 changes: 47 additions & 0 deletions logicalplan/passthrough_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) The Thanos Community Authors.
// Licensed under the Apache License 2.0.

package logicalplan

import (
"math"
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/prometheus/prometheus/model/labels"

"github.com/thanos-io/promql-engine/api"
"github.com/thanos-io/promql-engine/parser"
)

func TestPassthrough(t *testing.T) {
expr, err := parser.ParseExpr(`time()`)
testutil.Ok(t, err)

t.Run("optimized with one engine", func(t *testing.T) {
engines := []api.RemoteEngine{
newEngineMock(math.MinInt64, math.MinInt64, []labels.Labels{labels.FromStrings("region", "east"), labels.FromStrings("region", "south")}),
}
optimizers := []Optimizer{PassthroughOptimizer{Endpoints: api.NewStaticEndpoints(engines)}}

plan := New(expr, &Opts{Start: time.Unix(0, 0), End: time.Unix(0, 0)})
optimizedPlan := plan.Optimize(optimizers)

testutil.Equals(t, "remote(time())", optimizedPlan.Expr().String())
})

t.Run("not optimized with one engine", func(t *testing.T) {
engines := []api.RemoteEngine{
newEngineMock(math.MinInt64, math.MinInt64, []labels.Labels{labels.FromStrings("region", "east"), labels.FromStrings("region", "south")}),
newEngineMock(math.MinInt64, math.MinInt64, []labels.Labels{labels.FromStrings("region", "west")}),
}
optimizers := []Optimizer{PassthroughOptimizer{Endpoints: api.NewStaticEndpoints(engines)}}

plan := New(expr, &Opts{Start: time.Unix(0, 0), End: time.Unix(0, 0)})
optimizedPlan := plan.Optimize(optimizers)

testutil.Equals(t, "time()", optimizedPlan.Expr().String())
})

}

0 comments on commit 4c647ab

Please sign in to comment.