Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/ingester/client/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ type StreamingSeriesSource struct {
}

type memoryConsumptionTracker interface {
IncreaseMemoryConsumption(b uint64) error
DecreaseMemoryConsumption(b uint64)
IncreaseMemoryConsumption(b uint64, source limiter.MemoryConsumptionSource) error
DecreaseMemoryConsumption(b uint64, source limiter.MemoryConsumptionSource)
}

func NewSeriesChunksStreamReader(ctx context.Context, client Ingester_QueryStreamClient, ingesterName string, expectedSeriesCount int, queryLimiter *limiter.QueryLimiter, memoryTracker memoryConsumptionTracker, cleanup func(), log log.Logger) *SeriesChunksStreamReader {
Expand Down Expand Up @@ -97,7 +97,7 @@ func (s *SeriesChunksStreamReader) Close() {
// It is safe to call FreeBuffer multiple times, or to alternate GetChunks and FreeBuffer calls.
func (s *SeriesChunksStreamReader) FreeBuffer() {
if s.lastMessage != nil {
s.memoryTracker.DecreaseMemoryConsumption(uint64(s.lastMessage.Size()))
s.memoryTracker.DecreaseMemoryConsumption(uint64(s.lastMessage.Size()), limiter.IngesterChunks)
s.lastMessage.FreeBuffer()
s.lastMessage = nil
}
Expand All @@ -110,7 +110,7 @@ func (s *SeriesChunksStreamReader) setLastMessage(msg *QueryStreamResponse) erro
if s.lastMessage != nil {
return fmt.Errorf("must call FreeBuffer() before storing the next message - this indicates a bug")
}
if err := s.memoryTracker.IncreaseMemoryConsumption(uint64(msg.Size())); err != nil {
if err := s.memoryTracker.IncreaseMemoryConsumption(uint64(msg.Size()), limiter.IngesterChunks); err != nil {
return err
}
s.lastMessage = msg
Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/block_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func (bqs *blockStreamingQuerierSeries) Iterator(reuse chunkenc.Iterator) chunke
}

type memoryConsumptionTracker interface {
IncreaseMemoryConsumption(b uint64) error
DecreaseMemoryConsumption(b uint64)
IncreaseMemoryConsumption(b uint64, source limiter.MemoryConsumptionSource) error
DecreaseMemoryConsumption(b uint64, source limiter.MemoryConsumptionSource)
}

// storeGatewayStreamReader is responsible for managing the streaming of chunks from a storegateway and buffering
Expand Down Expand Up @@ -196,7 +196,7 @@ func (s *storeGatewayStreamReader) Close() {
// It is safe to call FreeBuffer multiple times, or to alternate GetChunks and FreeBuffer calls.
func (s *storeGatewayStreamReader) FreeBuffer() {
if s.lastMessage != nil {
s.memoryTracker.DecreaseMemoryConsumption(uint64(s.lastMessage.Size()))
s.memoryTracker.DecreaseMemoryConsumption(uint64(s.lastMessage.Size()), limiter.StoreGatewayChunks)
s.lastMessage.FreeBuffer()
s.lastMessage = nil
}
Expand All @@ -209,7 +209,7 @@ func (s *storeGatewayStreamReader) setLastMessage(msg *storepb.SeriesResponse) e
if s.lastMessage != nil {
return fmt.Errorf("must call FreeBuffer() before storing the next message - this indicates a bug")
}
if err := s.memoryTracker.IncreaseMemoryConsumption(uint64(msg.Size())); err != nil {
if err := s.memoryTracker.IncreaseMemoryConsumption(uint64(msg.Size()), limiter.StoreGatewayChunks); err != nil {
return err
}
s.lastMessage = msg
Expand Down
1 change: 1 addition & 0 deletions pkg/streamingpromql/operators/aggregations/quantile.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ var qGroupPool = types.NewLimitingBucketedPool(
pool.NewBucketedPool(maxExpectedQuantileGroups, func(size int) []qGroup {
return make([]qGroup, 0, size)
}),
limiter.QuantileGroupSlices,
uint64(unsafe.Sizeof(qGroup{})),
false,
nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ var instantQuerySeriesSlicePool = types.NewLimitingBucketedPool(
pool.NewBucketedPool(types.MaxExpectedSeriesPerResult, func(size int) []instantQuerySeries {
return make([]instantQuerySeries, 0, size)
}),
limiter.TopKBottomKInstantQuerySeriesSlices,
uint64(unsafe.Sizeof(instantQuerySeries{})),
true,
nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ var rangeQuerySeriesSlicePool = types.NewLimitingBucketedPool(
pool.NewBucketedPool(types.MaxExpectedSeriesPerResult, func(size int) []rangeQuerySeries {
return make([]rangeQuerySeries, 0, size)
}),
limiter.TopKBottomKRangeQuerySeriesSlices,
uint64(unsafe.Sizeof(rangeQuerySeries{})),
true,
nil,
Expand All @@ -503,6 +504,7 @@ var intSliceSlicePool = types.NewLimitingBucketedPool(
pool.NewBucketedPool(types.MaxExpectedPointsPerSeries, func(size int) [][]int {
return make([][]int, 0, size)
}),
limiter.IntSliceSlice,
uint64(unsafe.Sizeof([][]int{})),
true,
nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ func TestOneToOneVectorVectorBinaryOperation_SeriesMerging(t *testing.T) {
}
for _, s := range testCase.input {
// Count the memory for the given floats + histograms
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*uint64(len(s.Floats))+types.HPointSize*uint64(len(s.Histograms))))
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*uint64(len(s.Floats)), limiter.FPointSlices))
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.HPointSize*uint64(len(s.Histograms)), limiter.HPointSlices))
}

result, err := o.mergeSingleSide(testCase.input, testCase.sourceSeriesIndices, testCase.sourceSeriesMetadata, "right")
Expand Down
6 changes: 4 additions & 2 deletions pkg/streamingpromql/operators/functions/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func TestFloatTransformationFunc(t *testing.T) {
},
}
// Increase the memory tracker for 2 FPoints, and 1 HPoint
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*2+types.HPointSize*1))
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*2, limiter.FPointSlices))
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.HPointSize*1, limiter.HPointSlices))

expected := types.InstantVectorSeriesData{
Floats: []promql.FPoint{
Expand Down Expand Up @@ -78,7 +79,8 @@ func TestFloatTransformationDropHistogramsFunc(t *testing.T) {
},
}
// Increase the memory tracker for 2 FPoints, and 1 HPoint
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*2+types.HPointSize*1))
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*2, limiter.FPointSlices))
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.HPointSize*1, limiter.HPointSlices))

expected := types.InstantVectorSeriesData{
Floats: []promql.FPoint{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var pointBucketPool = types.NewLimitingBucketedPool(
pool.NewBucketedPool(types.MaxExpectedPointsPerSeries, func(size int) []promql.Buckets {
return make([]promql.Buckets, 0, size)
}),
limiter.BucketSlices,
uint64(unsafe.Sizeof(promql.Buckets{})),
true,
mangleBuckets,
Expand All @@ -98,6 +99,7 @@ var bucketSliceBucketedPool = types.NewLimitingBucketedPool(
pool.NewBucketedPool(maxExpectedBucketsPerHistogram, func(size int) []promql.Bucket {
return make([]promql.Bucket, 0, size)
}),
limiter.BucketSlices,
uint64(unsafe.Sizeof(promql.Bucket{})),
true,
nil,
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operators/operator_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestInstantVectorOperatorBuffer_BufferingSubsetOfInputSeries(t *testing.T)

seriesUsed := []bool{true, false, true, true, true}
memoryConsumptionTracker := limiter.NewMemoryConsumptionTracker(0, nil, "")
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*6)) // We have 6 FPoints from the inner series.
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*6, limiter.FPointSlices)) // We have 6 FPoints from the inner series.
buffer := NewInstantVectorOperatorBuffer(inner, seriesUsed, 4, memoryConsumptionTracker)
ctx := context.Background()

Expand Down Expand Up @@ -115,7 +115,7 @@ func TestInstantVectorOperatorBuffer_BufferingAllInputSeries(t *testing.T) {
}

memoryConsumptionTracker := limiter.NewMemoryConsumptionTracker(0, nil, "")
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*6)) // We have 6 FPoints from the inner series.
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*6, limiter.FPointSlices)) // We have 6 FPoints from the inner series.
buffer := NewInstantVectorOperatorBuffer(inner, nil, 6, memoryConsumptionTracker)
ctx := context.Background()

Expand Down
17 changes: 14 additions & 3 deletions pkg/streamingpromql/types/limiting_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []promql.FPoint {
return make([]promql.FPoint, 0, size)
}),
limiter.FPointSlices,
FPointSize,
false,
nil,
Expand All @@ -54,6 +55,7 @@ var (
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []promql.HPoint {
return make([]promql.HPoint, 0, size)
}),
limiter.HPointSlices,
HPointSize,
false,
func(point promql.HPoint) promql.HPoint {
Expand All @@ -66,6 +68,7 @@ var (
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) promql.Vector {
return make(promql.Vector, 0, size)
}),
limiter.Vectors,
VectorSampleSize,
false,
nil,
Expand All @@ -75,6 +78,7 @@ var (
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []float64 {
return make([]float64, 0, size)
}),
limiter.Float64Slices,
Float64Size,
true,
nil,
Expand All @@ -84,6 +88,7 @@ var (
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []int {
return make([]int, 0, size)
}),
limiter.IntSlices,
IntSize,
true,
nil,
Expand All @@ -93,6 +98,7 @@ var (
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []int64 {
return make([]int64, 0, size)
}),
limiter.Int64Slices,
Int64Size,
true,
nil,
Expand All @@ -102,6 +108,7 @@ var (
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []bool {
return make([]bool, 0, size)
}),
limiter.BoolSlices,
BoolSize,
true,
nil,
Expand All @@ -111,6 +118,7 @@ var (
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []*histogram.FloatHistogram {
return make([]*histogram.FloatHistogram, 0, size)
}),
limiter.HistogramPointerSlices,
HistogramPointerSize,
true,
mangleHistogram,
Expand All @@ -120,6 +128,7 @@ var (
pool.NewBucketedPool(MaxExpectedSeriesPerResult, func(size int) []SeriesMetadata {
return make([]SeriesMetadata, 0, size)
}),
limiter.SeriesMetadataSlices,
SeriesMetadataSize,
true,
nil,
Expand Down Expand Up @@ -156,14 +165,16 @@ func mangleHistogram(h *histogram.FloatHistogram) *histogram.FloatHistogram {
// assumes all native histograms are the same size, and assumes all elements of a promql.Vector are float samples.
type LimitingBucketedPool[S ~[]E, E any] struct {
inner *pool.BucketedPool[S, E]
source limiter.MemoryConsumptionSource
elementSize uint64
clearOnGet bool
mangle func(E) E
}

func NewLimitingBucketedPool[S ~[]E, E any](inner *pool.BucketedPool[S, E], elementSize uint64, clearOnGet bool, mangle func(E) E) *LimitingBucketedPool[S, E] {
func NewLimitingBucketedPool[S ~[]E, E any](inner *pool.BucketedPool[S, E], source limiter.MemoryConsumptionSource, elementSize uint64, clearOnGet bool, mangle func(E) E) *LimitingBucketedPool[S, E] {
return &LimitingBucketedPool[S, E]{
inner: inner,
source: source,
elementSize: elementSize,
clearOnGet: clearOnGet,
mangle: mangle,
Expand All @@ -187,7 +198,7 @@ func (p *LimitingBucketedPool[S, E]) Get(size int, tracker *limiter.MemoryConsum
// - there's no guarantee the slice will have size 'size' when it's returned to us in putWithElementSize, so using 'size' would make the accounting below impossible
estimatedBytes := uint64(cap(s)) * p.elementSize

if err := tracker.IncreaseMemoryConsumption(estimatedBytes); err != nil {
if err := tracker.IncreaseMemoryConsumption(estimatedBytes, p.source); err != nil {
p.inner.Put(s)
return nil, err
}
Expand All @@ -211,7 +222,7 @@ func (p *LimitingBucketedPool[S, E]) Put(s S, tracker *limiter.MemoryConsumption
}
}

tracker.DecreaseMemoryConsumption(uint64(cap(s)) * p.elementSize)
tracker.DecreaseMemoryConsumption(uint64(cap(s))*p.elementSize, p.source)
p.inner.Put(s)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/streamingpromql/types/limiting_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestLimitingBucketedPool_Unlimited(t *testing.T) {

p := NewLimitingBucketedPool(
pool.NewBucketedPool(1024, func(size int) []promql.FPoint { return make([]promql.FPoint, 0, size) }),
limiter.FPointSlices,
FPointSize,
false,
nil,
Expand Down Expand Up @@ -79,6 +80,7 @@ func TestLimitingPool_Limited(t *testing.T) {

p := NewLimitingBucketedPool(
pool.NewBucketedPool(1024, func(size int) []promql.FPoint { return make([]promql.FPoint, 0, size) }),
limiter.FPointSlices,
FPointSize,
false,
nil,
Expand Down Expand Up @@ -204,6 +206,7 @@ func TestLimitingPool_Mangling(t *testing.T) {

p := NewLimitingBucketedPool(
pool.NewBucketedPool(1024, func(size int) []int { return make([]int, 0, size) }),
limiter.IntSlices,
1,
false,
func(_ int) int { return 123 },
Expand Down
55 changes: 51 additions & 4 deletions pkg/util/limiter/memory_consumption.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package limiter

import (
"context"
"fmt"
"sync"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -34,6 +35,48 @@ func AddMemoryTrackerToContext(ctx context.Context, tracker *MemoryConsumptionTr
return context.WithValue(ctx, interface{}(memoryConsumptionTracker), tracker)
}

type MemoryConsumptionSource int

const (
IngesterChunks MemoryConsumptionSource = iota
StoreGatewayChunks
FPointSlices
HPointSlices
Vectors
Float64Slices
IntSlices
IntSliceSlice
Int64Slices
BoolSlices
HistogramPointerSlices
SeriesMetadataSlices
BucketSlices
QuantileGroupSlices
TopKBottomKInstantQuerySeriesSlices
TopKBottomKRangeQuerySeriesSlices

memoryConsumptionSourceCount = TopKBottomKRangeQuerySeriesSlices + 1
)

var memoryConsumptionSourceNames = map[MemoryConsumptionSource]string{
IngesterChunks: "ingester chunks",
StoreGatewayChunks: "store-gateway chunks",
FPointSlices: "[]promql.FPoint",
HPointSlices: "[]promql.HPoint",
Vectors: "promql.Vector",
Float64Slices: "[]float64",
IntSlices: "[]int",
IntSliceSlice: "[][]int",
Int64Slices: "[]int64",
BoolSlices: "[]bool",
HistogramPointerSlices: "[]*histogram.FloatHistogram",
SeriesMetadataSlices: "[]SeriesMetadata",
BucketSlices: "[]promql.Buckets",
QuantileGroupSlices: "[]aggregations.qGroup",
TopKBottomKInstantQuerySeriesSlices: "[]topkbottom.instantQuerySeries",
TopKBottomKRangeQuerySeriesSlices: "[]topkbottom.rangeQuerySeries",
}

// MemoryConsumptionTracker tracks the current memory utilisation of a single query, and applies any max in-memory bytes limit.
//
// It also tracks the peak number of in-memory bytes for use in query statistics.
Expand All @@ -42,6 +85,8 @@ type MemoryConsumptionTracker struct {
currentEstimatedMemoryConsumptionBytes uint64
peakEstimatedMemoryConsumptionBytes uint64

currentEstimatedMemoryConsumptionBySource [memoryConsumptionSourceCount]uint64

rejectionCount prometheus.Counter
haveRecordedRejection bool
queryDescription string
Expand All @@ -64,7 +109,7 @@ func NewMemoryConsumptionTracker(maxEstimatedMemoryConsumptionBytes uint64, reje
// IncreaseMemoryConsumption attempts to increase the current memory consumption by b bytes.
//
// It returns an error if the query would exceed the maximum memory consumption limit.
func (l *MemoryConsumptionTracker) IncreaseMemoryConsumption(b uint64) error {
func (l *MemoryConsumptionTracker) IncreaseMemoryConsumption(b uint64, source MemoryConsumptionSource) error {
l.mtx.Lock()
defer l.mtx.Unlock()

Expand All @@ -77,22 +122,24 @@ func (l *MemoryConsumptionTracker) IncreaseMemoryConsumption(b uint64) error {
return NewMaxEstimatedMemoryConsumptionPerQueryLimitError(l.maxEstimatedMemoryConsumptionBytes)
}

l.currentEstimatedMemoryConsumptionBySource[source] += b
l.currentEstimatedMemoryConsumptionBytes += b
l.peakEstimatedMemoryConsumptionBytes = max(l.peakEstimatedMemoryConsumptionBytes, l.currentEstimatedMemoryConsumptionBytes)

return nil
}

// DecreaseMemoryConsumption decreases the current memory consumption by b bytes.
func (l *MemoryConsumptionTracker) DecreaseMemoryConsumption(b uint64) {
func (l *MemoryConsumptionTracker) DecreaseMemoryConsumption(b uint64, source MemoryConsumptionSource) {
l.mtx.Lock()
defer l.mtx.Unlock()

if b > l.currentEstimatedMemoryConsumptionBytes {
panic("Estimated memory consumption of this query is negative. This indicates something has been returned to a pool more than once, which is a bug. The affected query is: " + l.queryDescription)
if b > l.currentEstimatedMemoryConsumptionBySource[source] {
panic(fmt.Sprintf("Estimated memory consumption of all instances of %v in this query is negative. This indicates something has been returned to a pool more than once, which is a bug. The affected query is: %v", memoryConsumptionSourceNames[source], l.queryDescription))
}

l.currentEstimatedMemoryConsumptionBytes -= b
l.currentEstimatedMemoryConsumptionBySource[source] -= b
}

// PeakEstimatedMemoryConsumptionBytes returns the peak memory consumption in bytes.
Expand Down
Loading