Skip to content

Commit

Permalink
test: experiment with sliced of atomic integers instead of sync map
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Aug 18, 2023
1 parent 6765d04 commit cf9d7f9
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 87 deletions.
99 changes: 49 additions & 50 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,55 @@ func TestQueryExplain(t *testing.T) {
}
}

func TestQueryLimits(t *testing.T) {
load := `load 1s
example{foo="bar"} 1+0x3600
example{foo="baz"} 1+0x3600
`
test, err := promql.NewTest(t, load)
testutil.Ok(t, err)
defer test.Close()
testutil.Ok(t, test.Run())

ctx := test.Context()

newEngine := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{
MaxSamples: 1000,
Timeout: 10 * time.Second,
}})

t.Run("one series too many samples", func(t *testing.T) {
query := `sum_over_time(example{foo="bar"}[1h])`
q1, err := newEngine.NewInstantQuery(ctx, test.Queryable(), nil, query, time.Unix(1800, 0))
testutil.Ok(t, err)

newResult := q1.Exec(ctx)
testutil.NotOk(t, newResult.Err)
testutil.Equals(t, "query processing would load too many samples into memory in query execution", newResult.Err.Error())
})
t.Run("two series too many samples", func(t *testing.T) {
query := `sum_over_time(example[1h])`
q1, err := newEngine.NewInstantQuery(ctx, test.Queryable(), nil, query, time.Unix(900, 0))
testutil.Ok(t, err)

newResult := q1.Exec(ctx)
testutil.NotOk(t, newResult.Err)
testutil.Equals(t, "query processing would load too many samples into memory in query execution", newResult.Err.Error())
})
t.Run("range query should only account for samples at each batch", func(t *testing.T) {
query := `sum(example)`
start := time.Unix(0, 0)
end := start.Add(time.Hour)
step := time.Second

q1, err := newEngine.NewRangeQuery(ctx, test.Queryable(), nil, query, start, end, step)
testutil.Ok(t, err)
newResult := q1.Exec(ctx)
testutil.Ok(t, newResult.Err)
})
}

func assertExecutionTimeNonZero(t *testing.T, got *engine.AnalyzeOutputNode) bool {
if got != nil {
if got.OperatorTelemetry.ExecutionTimeTaken() <= 0 {
Expand Down Expand Up @@ -3675,56 +3724,6 @@ testmetric2{src="a",dst="b"} 1`,
}
}

func TestQueryLimits(t *testing.T) {
load := `load 1s
example{foo="bar"} 1+0x3600
example{foo="baz"} 1+0x3600
`
test, err := promql.NewTest(t, load)
testutil.Ok(t, err)
defer test.Close()
testutil.Ok(t, test.Run())

ctx := test.Context()

newEngine := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{
MaxSamples: 10,
Timeout: 10 * time.Second,
}})

t.Run("one series too many samples", func(t *testing.T) {
query := `sum_over_time(example{foo="bar"}[20s])`
q1, err := newEngine.NewInstantQuery(ctx, test.Queryable(), nil, query, time.Unix(20, 0))
testutil.Ok(t, err)

newResult := q1.Exec(ctx)
testutil.NotOk(t, newResult.Err)
testutil.Equals(t, "query processing would load too many samples into memory in query execution", newResult.Err.Error())
})
t.Run("two series too many samples", func(t *testing.T) {
query := `sum_over_time(example[10s])`
q1, err := newEngine.NewInstantQuery(ctx, test.Queryable(), nil, query, time.Unix(5, 0))
testutil.Ok(t, err)

newResult := q1.Exec(ctx)
testutil.NotOk(t, newResult.Err)
testutil.Equals(t, "query processing would load too many samples into memory in query execution", newResult.Err.Error())
})
t.Run("range query should only account for samples at each step", func(t *testing.T) {
query := `sum(example)`
start := time.Unix(0, 0)
end := start.Add(time.Hour)
step := time.Second

q1, err := newEngine.NewRangeQuery(ctx, test.Queryable(), nil, query, start, end, step)
testutil.Ok(t, err)

newResult := q1.Exec(ctx)
testutil.Ok(t, newResult.Err)
})
}

func TestQueryCancellation(t *testing.T) {
twelveHours := int64(12 * time.Hour.Seconds())

Expand Down
2 changes: 1 addition & 1 deletion execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, min
// TODO(fpetkovski): Adjust the step for sub-queries once they are supported.
Step: step.Milliseconds(),
}
limits := limits.NewLimits(maxSamples)
limits := limits.NewLimits(maxSamples, opts)

return newOperator(expr, selectorPool, opts, hints, limits)
}
Expand Down
74 changes: 61 additions & 13 deletions execution/limits/limits.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,83 @@
package limits

import (
"sync"

"github.com/efficientgo/core/errors"
"go.uber.org/atomic"

"github.com/thanos-io/promql-engine/query"
)

type Limits struct {
maxSamples int
// We only check every 100 added samples if the limit is breached.
// Doing so for every sample would be prohibitively expensive.
const resolution = 100

curSamplesPerTimestamp sync.Map
type Limits struct {
maxSamples int
samplesPerBatch []*atomic.Int64
}

func NewLimits(maxSamples int) *Limits {
return &Limits{
maxSamples: maxSamples,
// NewLimits returns a pointer to a Limits struct. It can be used to
// track samples that enter the engine in one batch and limit it
// to a maximum number.
func NewLimits(maxSamples int, opts *query.Options) *Limits {
limits := &Limits{
maxSamples: maxSamples,
samplesPerBatch: make([]*atomic.Int64, opts.NumSteps()),
}
for i := range limits.samplesPerBatch {
limits.samplesPerBatch[i] = atomic.NewInt64(0)
}
return limits
}

func (l *Limits) AccountSamplesForTimestamp(t int64, n int) error {
// AccountSamplesForTimestamp keeps track of the samples used for the batch for timestamp t.
// It will return an error if a batch wants to add use more samples then the configured
// maxSamples value.
func (l *Limits) addSamplesAndCheckLimits(batch, n int) error {
if l.maxSamples == 0 {
return nil
}
v, _ := l.curSamplesPerTimestamp.LoadOrStore(t, atomic.NewInt64(0))
av := v.(*atomic.Int64)

if cur := av.Load(); cur+int64(n) > int64(l.maxSamples) {
if l.samplesPerBatch[batch].Load()+int64(n) > int64(l.maxSamples) {
return errors.New("query processing would load too many samples into memory in query execution")
}
l.samplesPerBatch[batch].Add(int64(n))

av.Add(int64(n))
return nil
}

func (l *Limits) Accounter() *Accounter {
return &Accounter{
limits: l,
resolution: resolution,
}
}

// Accounter is used to check limits in one batch. It will only
// check if the sample is safe to add every "resolution" samples.
// It is not safe for concurrent usage!
type Accounter struct {
limits *Limits

curBatch int
samplesAdded int
resolution int
}

func (acc *Accounter) StartNewBatch() {
acc.curBatch++
acc.samplesAdded = 0
}

func (acc *Accounter) AddSample() error {
acc.samplesAdded++
if acc.samplesAdded%acc.resolution == 0 {
if err := acc.limits.addSamplesAndCheckLimits(acc.curBatch-1, acc.samplesAdded); err != nil {
// No need to reset samples here; if we return error; processing stops and no more
// samples will be added.
return err
}
acc.samplesAdded = 0
}
return nil
}
31 changes: 16 additions & 15 deletions execution/scan/matrix_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type matrixSelector struct {
extLookbackDelta int64
model.OperatorTelemetry

limits *limits.Limits
acc *limits.Accounter
}

// NewMatrixSelector creates operator which selects vector of series over time.
Expand Down Expand Up @@ -99,7 +99,7 @@ func NewMatrixSelector(

extLookbackDelta: opts.ExtLookbackDelta.Milliseconds(),

limits: limits,
acc: limits.Accounter(),
}
m.OperatorTelemetry = &model.NoopTelemetry{}
if opts.EnableAnalysis {
Expand Down Expand Up @@ -144,6 +144,7 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) {
if o.currentStep > o.maxt {
return nil, nil
}
o.acc.StartNewBatch()

if err := o.loadSeries(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -172,9 +173,9 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) {
var err error

if !o.isExtFunction {
rangeSamples, err = selectPoints(series.samples, o.limits, seriesTs, mint, maxt, o.scanners[i].previousSamples)
rangeSamples, err = selectPoints(series.samples, o.acc, seriesTs, mint, maxt, o.scanners[i].previousSamples)
} else {
rangeSamples, err = selectExtPoints(series.samples, o.limits, seriesTs, mint, maxt, o.scanners[i].previousSamples, o.extLookbackDelta, &o.scanners[i].metricAppearedTs)
rangeSamples, err = selectExtPoints(series.samples, o.acc, seriesTs, mint, maxt, o.scanners[i].previousSamples, o.extLookbackDelta, &o.scanners[i].metricAppearedTs)
}

if err != nil {
Expand Down Expand Up @@ -277,7 +278,7 @@ func (o *matrixSelector) loadSeries(ctx context.Context) error {
// values). Any such points falling before mint are discarded; points that fall
// into the [mint, maxt] range are retained; only points with later timestamps
// are populated from the iterator.
func selectPoints(it *storage.BufferedSeriesIterator, limits *limits.Limits, ts, mint, maxt int64, out []sample) ([]sample, error) {
func selectPoints(it *storage.BufferedSeriesIterator, acc *limits.Accounter, ts, mint, maxt int64, out []sample) ([]sample, error) {
if len(out) > 0 && out[len(out)-1].T >= mint {
// There is an overlap between previous and current ranges, retain common
// points. In most such cases:
Expand Down Expand Up @@ -314,7 +315,7 @@ loop:
continue loop
}
if t >= mint {
if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil {
if err := acc.AddSample(); err != nil {
return out, err
}
out = append(out, sample{T: t, H: fh})
Expand All @@ -326,7 +327,7 @@ loop:
}
// Values in the buffer are guaranteed to be smaller than maxt.
if t >= mint {
if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil {
if err := acc.AddSample(); err != nil {
return out, err
}
out = append(out, sample{T: t, F: v})
Expand All @@ -339,15 +340,15 @@ loop:
case chunkenc.ValHistogram, chunkenc.ValFloatHistogram:
t, fh := it.AtFloatHistogram()
if t == maxt && !value.IsStaleNaN(fh.Sum) {
if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil {
if err := acc.AddSample(); err != nil {
return out, err
}
out = append(out, sample{T: t, H: fh})
}
case chunkenc.ValFloat:
t, v := it.At()
if t == maxt && !value.IsStaleNaN(v) {
if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil {
if err := acc.AddSample(); err != nil {
return out, err
}
out = append(out, sample{T: t, F: v})
Expand All @@ -365,7 +366,7 @@ loop:
// values). Any such points falling before mint are discarded; points that fall
// into the [mint, maxt] range are retained; only points with later timestamps
// are populated from the iterator.
func selectExtPoints(it *storage.BufferedSeriesIterator, limits *limits.Limits, ts, mint, maxt int64, out []sample, extLookbackDelta int64, metricAppearedTs **int64) ([]sample, error) {
func selectExtPoints(it *storage.BufferedSeriesIterator, acc *limits.Accounter, ts, mint, maxt int64, out []sample, extLookbackDelta int64, metricAppearedTs **int64) ([]sample, error) {
extMint := mint - extLookbackDelta

if len(out) > 0 && out[len(out)-1].T >= mint {
Expand Down Expand Up @@ -418,7 +419,7 @@ loop:
*metricAppearedTs = &t
}
if t >= mint {
if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil {
if err := acc.AddSample(); err != nil {
return out, err
}
out = append(out, sample{T: t, H: fh})
Expand All @@ -436,13 +437,13 @@ loop:
// exists at or before range start, add it and then keep replacing
// it with later points while not yet (strictly) inside the range.
if t >= mint || !appendedPointBeforeMint {
if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil {
if err := acc.AddSample(); err != nil {
return out, err
}
out = append(out, sample{T: t, F: v})
appendedPointBeforeMint = true
} else {
if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil {
if err := acc.AddSample(); err != nil {
return out, err
}
out[len(out)-1] = sample{T: t, F: v}
Expand All @@ -459,7 +460,7 @@ loop:
if *metricAppearedTs == nil {
*metricAppearedTs = &t
}
if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil {
if err := acc.AddSample(); err != nil {
return out, err
}
out = append(out, sample{T: t, H: fh})
Expand All @@ -470,7 +471,7 @@ loop:
if *metricAppearedTs == nil {
*metricAppearedTs = &t
}
if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil {
if err := acc.AddSample(); err != nil {
return out, err
}
out = append(out, sample{T: t, F: v})
Expand Down
Loading

0 comments on commit cf9d7f9

Please sign in to comment.