Skip to content

Commit

Permalink
test: experiment with sliced of atomic integers instead of sync map
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Aug 17, 2023
1 parent 6765d04 commit bfeae3a
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 12 deletions.
2 changes: 1 addition & 1 deletion execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, min
// TODO(fpetkovski): Adjust the step for sub-queries once they are supported.
Step: step.Milliseconds(),
}
limits := limits.NewLimits(maxSamples)
limits := limits.NewLimits(maxSamples, opts)

return newOperator(expr, selectorPool, opts, hints, limits)
}
Expand Down
62 changes: 53 additions & 9 deletions execution/limits/limits.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,79 @@
package limits

import (
"sync"

"github.com/efficientgo/core/errors"
"go.uber.org/atomic"

"github.com/thanos-io/promql-engine/query"
)

type Limits struct {
maxSamples int

curSamplesPerTimestamp sync.Map
start int64
step int64
stepsBatch int64

samplesPerTimestamp []*atomic.Int64
periodPerTimestamp []*atomic.Int64
}

func NewLimits(maxSamples int) *Limits {
return &Limits{
// NewLimits returns a pointer to a Limits struct. It can be used to
// track samples that enter the engine in some timestamp and limit it
// to a maximum number. Since the engine processes "stepsBatch" timestamps
// in parallel the resulting memory overhead will be "O(stepsBatch*maxSamples)".
func NewLimits(maxSamples int, opts *query.Options) *Limits {
step := opts.Step.Milliseconds()
if opts.NumSteps() == 1 {
step = 1
}
start := opts.Start.UnixMilli()
stepsBatch := opts.StepsBatch

res := &Limits{
maxSamples: maxSamples,

start: start,
step: step,
stepsBatch: stepsBatch,

periodPerTimestamp: make([]*atomic.Int64, stepsBatch),
samplesPerTimestamp: make([]*atomic.Int64, stepsBatch),
}

for i := int64(0); i < stepsBatch; i++ {
res.periodPerTimestamp[i] = atomic.NewInt64(0)
res.samplesPerTimestamp[i] = atomic.NewInt64(0)
}

return res
}

// AccountSamplesForTimestamp keeps track of the samples used for timestamp t.
// It will return an error if a step wants to add use more samples then the configured
// maxSamples value.
func (l *Limits) AccountSamplesForTimestamp(t int64, n int) error {
if l.maxSamples == 0 {
return nil
}
v, _ := l.curSamplesPerTimestamp.LoadOrStore(t, atomic.NewInt64(0))
av := v.(*atomic.Int64)
// TODO: properly this method would need a lock but so far it seems to work well enough.

idx := ((t - l.start) / l.step)
idxmod := idx % l.stepsBatch

// This assumes that if we process "stepsBatch+1" we have processed "1" already.
// This is accurate so long we dont move the coalesce operator higher up the execution
// tree. It allows us to account only for the last "stepsBatch" timestamps and keep a
// constant memory overhead.
if period := idx / l.stepsBatch; period > l.periodPerTimestamp[idxmod].Load() {
l.periodPerTimestamp[idxmod].Store(period)
l.samplesPerTimestamp[idxmod].Store(0)
}

if cur := av.Load(); cur+int64(n) > int64(l.maxSamples) {
if cur := l.samplesPerTimestamp[idxmod].Load(); cur+int64(n) > int64(l.maxSamples) {
return errors.New("query processing would load too many samples into memory in query execution")
}
l.samplesPerTimestamp[idxmod].Add(int64(n))

av.Add(int64(n))
return nil
}
4 changes: 2 additions & 2 deletions execution/scan/matrix_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) {
var err error

if !o.isExtFunction {
rangeSamples, err = selectPoints(series.samples, o.limits, seriesTs, mint, maxt, o.scanners[i].previousSamples)
rangeSamples, err = selectPoints(series.samples, o.limits, ts, mint, maxt, o.scanners[i].previousSamples)
} else {
rangeSamples, err = selectExtPoints(series.samples, o.limits, seriesTs, mint, maxt, o.scanners[i].previousSamples, o.extLookbackDelta, &o.scanners[i].metricAppearedTs)
rangeSamples, err = selectExtPoints(series.samples, o.limits, ts, mint, maxt, o.scanners[i].previousSamples, o.extLookbackDelta, &o.scanners[i].metricAppearedTs)
}

if err != nil {
Expand Down

0 comments on commit bfeae3a

Please sign in to comment.