Skip to content

Commit

Permalink
test: experiment with sliced of atomic integers instead of sync map
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Aug 17, 2023
1 parent 6765d04 commit d9fc3c2
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 32 deletions.
21 changes: 0 additions & 21 deletions engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,6 @@ func BenchmarkRangeQuery(b *testing.B) {
sixHourDataset := setupStorage(b, 1000, 3, 6*samplesPerHour)
defer sixHourDataset.Close()

largeSixHourDataset := setupStorage(b, 10000, 10, 6*samplesPerHour)
defer largeSixHourDataset.Close()

sevenDaysAndTwoHoursDataset := setupStorage(b, 1000, 3, (7*24+2)*samplesPerHour)
defer sevenDaysAndTwoHoursDataset.Close()

start := time.Unix(0, 0)
end := start.Add(2 * time.Hour)
step := time.Second * 30
Expand Down Expand Up @@ -150,21 +144,6 @@ func BenchmarkRangeQuery(b *testing.B) {
query: "rate(http_requests_total[1m])",
test: sixHourDataset,
},
{
name: "rate with large range selection",
query: "rate(http_requests_total[7d])",
test: sevenDaysAndTwoHoursDataset,
},
{
name: "rate with large number of series, 1m range",
query: "rate(http_requests_total[1m])",
test: largeSixHourDataset,
},
{
name: "rate with large number of series, 5m range",
query: "rate(http_requests_total[5m])",
test: largeSixHourDataset,
},
{
name: "sum rate",
query: "sum(rate(http_requests_total[1m]))",
Expand Down
2 changes: 1 addition & 1 deletion execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, min
// TODO(fpetkovski): Adjust the step for sub-queries once they are supported.
Step: step.Milliseconds(),
}
limits := limits.NewLimits(maxSamples)
limits := limits.NewLimits(maxSamples, opts)

return newOperator(expr, selectorPool, opts, hints, limits)
}
Expand Down
49 changes: 41 additions & 8 deletions execution/limits/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,65 @@ import (
"sync"

"github.com/efficientgo/core/errors"
"go.uber.org/atomic"
"github.com/thanos-io/promql-engine/query"
)

type Limits struct {
mu sync.Mutex

maxSamples int

curSamplesPerTimestamp sync.Map
start int64
step int64
stepsBatch int64

samplesPerTimestamp []int64
periodPerTimestamp []int64
}

func NewLimits(maxSamples int) *Limits {
return &Limits{
func NewLimits(maxSamples int, opts *query.Options) *Limits {

step := opts.Step.Milliseconds()
if opts.NumSteps() == 1 {
step = 1
}
start := opts.Start.UnixMilli()
stepsBatch := opts.StepsBatch

res := &Limits{
maxSamples: maxSamples,

start: start,
step: step,
stepsBatch: stepsBatch,

periodPerTimestamp: make([]int64, stepsBatch),
samplesPerTimestamp: make([]int64, stepsBatch),
}

return res
}

func (l *Limits) AccountSamplesForTimestamp(t int64, n int) error {
if l.maxSamples == 0 {
return nil
}
v, _ := l.curSamplesPerTimestamp.LoadOrStore(t, atomic.NewInt64(0))
av := v.(*atomic.Int64)

if cur := av.Load(); cur+int64(n) > int64(l.maxSamples) {
idx := ((t - l.start) / l.step)
idxmod := idx % l.stepsBatch

l.mu.Lock()
defer l.mu.Unlock()

if period := idx / l.stepsBatch; period > l.periodPerTimestamp[idxmod] {
l.periodPerTimestamp[idxmod] = period
l.samplesPerTimestamp[idxmod] = 0
}

if cur := l.samplesPerTimestamp[idxmod]; cur+int64(n) > int64(l.maxSamples) {
return errors.New("query processing would load too many samples into memory in query execution")
}
l.samplesPerTimestamp[idxmod] += int64(n)

av.Add(int64(n))
return nil
}
4 changes: 2 additions & 2 deletions execution/scan/matrix_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) {
var err error

if !o.isExtFunction {
rangeSamples, err = selectPoints(series.samples, o.limits, seriesTs, mint, maxt, o.scanners[i].previousSamples)
rangeSamples, err = selectPoints(series.samples, o.limits, ts, mint, maxt, o.scanners[i].previousSamples)
} else {
rangeSamples, err = selectExtPoints(series.samples, o.limits, seriesTs, mint, maxt, o.scanners[i].previousSamples, o.extLookbackDelta, &o.scanners[i].metricAppearedTs)
rangeSamples, err = selectExtPoints(series.samples, o.limits, ts, mint, maxt, o.scanners[i].previousSamples, o.extLookbackDelta, &o.scanners[i].metricAppearedTs)
}

if err != nil {
Expand Down

0 comments on commit d9fc3c2

Please sign in to comment.