Skip to content

Commit 3d650aa

Browse files
committed
test: experiment with sliced of atomic integers instead of sync map
Signed-off-by: Michael Hoffmann <[email protected]>
1 parent 6765d04 commit 3d650aa

File tree

3 files changed

+55
-12
lines changed

3 files changed

+55
-12
lines changed

execution/execution.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, min
6868
// TODO(fpetkovski): Adjust the step for sub-queries once they are supported.
6969
Step: step.Milliseconds(),
7070
}
71-
limits := limits.NewLimits(maxSamples)
71+
limits := limits.NewLimits(maxSamples, opts)
7272

7373
return newOperator(expr, selectorPool, opts, hints, limits)
7474
}

execution/limits/limits.go

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,78 @@
11
package limits
22

33
import (
4-
"sync"
5-
64
"github.com/efficientgo/core/errors"
5+
"github.com/thanos-io/promql-engine/query"
76
"go.uber.org/atomic"
87
)
98

109
type Limits struct {
1110
maxSamples int
1211

13-
curSamplesPerTimestamp sync.Map
12+
start int64
13+
step int64
14+
stepsBatch int64
15+
16+
samplesPerTimestamp []*atomic.Int64
17+
periodPerTimestamp []*atomic.Int64
1418
}
1519

16-
func NewLimits(maxSamples int) *Limits {
17-
return &Limits{
20+
// NewLimits returns a pointer to a Limits struct. It can be used to
21+
// track samples that enter the engine in some timestamp and limit it
22+
// to a maximum number. Since the engine processes "stepsBatch" timestamps
23+
// in parallel the resulting memory overhead will be "O(stepsBatch*maxSamples)".
24+
func NewLimits(maxSamples int, opts *query.Options) *Limits {
25+
step := opts.Step.Milliseconds()
26+
if opts.NumSteps() == 1 {
27+
step = 1
28+
}
29+
start := opts.Start.UnixMilli()
30+
stepsBatch := opts.StepsBatch
31+
32+
res := &Limits{
1833
maxSamples: maxSamples,
34+
35+
start: start,
36+
step: step,
37+
stepsBatch: stepsBatch,
38+
39+
periodPerTimestamp: make([]*atomic.Int64, stepsBatch),
40+
samplesPerTimestamp: make([]*atomic.Int64, stepsBatch),
1941
}
42+
43+
for i := int64(0); i < stepsBatch; i++ {
44+
res.periodPerTimestamp[i] = atomic.NewInt64(0)
45+
res.samplesPerTimestamp[i] = atomic.NewInt64(0)
46+
}
47+
48+
return res
2049
}
2150

51+
// AccountSamplesForTimestamp keeps track of the samples used for timestamp t.
52+
// It will return an error if a step wants to add use more samples then the configured
53+
// maxSamples value.
2254
func (l *Limits) AccountSamplesForTimestamp(t int64, n int) error {
2355
if l.maxSamples == 0 {
2456
return nil
2557
}
26-
v, _ := l.curSamplesPerTimestamp.LoadOrStore(t, atomic.NewInt64(0))
27-
av := v.(*atomic.Int64)
58+
// TODO: properly this method would need a lock but so far it seems to work well enough.
59+
60+
idx := ((t - l.start) / l.step)
61+
idxmod := idx % l.stepsBatch
62+
63+
// This assumes that if we process "stepsBatch+1" we have processed "1" already.
64+
// This is accurate so long we dont move the coalesce operator higher up the execution
65+
// tree. It allows us to account only for the last "stepsBatch" timestamps and keep a
66+
// constant memory overhead.
67+
if period := idx / l.stepsBatch; period > l.periodPerTimestamp[idxmod].Load() {
68+
l.periodPerTimestamp[idxmod].Store(period)
69+
l.samplesPerTimestamp[idxmod].Store(0)
70+
}
2871

29-
if cur := av.Load(); cur+int64(n) > int64(l.maxSamples) {
72+
if cur := l.samplesPerTimestamp[idxmod].Load(); cur+int64(n) > int64(l.maxSamples) {
3073
return errors.New("query processing would load too many samples into memory in query execution")
3174
}
75+
l.samplesPerTimestamp[idxmod].Add(int64(n))
3276

33-
av.Add(int64(n))
3477
return nil
3578
}

execution/scan/matrix_selector.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,9 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) {
172172
var err error
173173

174174
if !o.isExtFunction {
175-
rangeSamples, err = selectPoints(series.samples, o.limits, seriesTs, mint, maxt, o.scanners[i].previousSamples)
175+
rangeSamples, err = selectPoints(series.samples, o.limits, ts, mint, maxt, o.scanners[i].previousSamples)
176176
} else {
177-
rangeSamples, err = selectExtPoints(series.samples, o.limits, seriesTs, mint, maxt, o.scanners[i].previousSamples, o.extLookbackDelta, &o.scanners[i].metricAppearedTs)
177+
rangeSamples, err = selectExtPoints(series.samples, o.limits, ts, mint, maxt, o.scanners[i].previousSamples, o.extLookbackDelta, &o.scanners[i].metricAppearedTs)
178178
}
179179

180180
if err != nil {

0 commit comments

Comments
 (0)