Skip to content

Commit fe19e9a

Browse files
MQE: label each source of memory consumption to make it easier to identify where something is returned to the pool multiple times (#11654)
* Introduce `MemoryConsumptionSource` type * Label each source of memory consumption to make it easier to identify where something is returned to the pool multiple times * Code review changes Signed-off-by: Nick Pillitteri <[email protected]> --------- Signed-off-by: Nick Pillitteri <[email protected]> Co-authored-by: Nick Pillitteri <[email protected]>
1 parent d94b80e commit fe19e9a

File tree

13 files changed

+150
-48
lines changed

13 files changed

+150
-48
lines changed

pkg/ingester/client/streaming.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ type StreamingSeriesSource struct {
3838
}
3939

4040
type memoryConsumptionTracker interface {
41-
IncreaseMemoryConsumption(b uint64) error
42-
DecreaseMemoryConsumption(b uint64)
41+
IncreaseMemoryConsumption(b uint64, source limiter.MemoryConsumptionSource) error
42+
DecreaseMemoryConsumption(b uint64, source limiter.MemoryConsumptionSource)
4343
}
4444

4545
func NewSeriesChunksStreamReader(ctx context.Context, client Ingester_QueryStreamClient, ingesterName string, expectedSeriesCount int, queryLimiter *limiter.QueryLimiter, memoryTracker memoryConsumptionTracker, cleanup func(), log log.Logger) *SeriesChunksStreamReader {
@@ -97,7 +97,7 @@ func (s *SeriesChunksStreamReader) Close() {
9797
// It is safe to call FreeBuffer multiple times, or to alternate GetChunks and FreeBuffer calls.
9898
func (s *SeriesChunksStreamReader) FreeBuffer() {
9999
if s.lastMessage != nil {
100-
s.memoryTracker.DecreaseMemoryConsumption(uint64(s.lastMessage.Size()))
100+
s.memoryTracker.DecreaseMemoryConsumption(uint64(s.lastMessage.Size()), limiter.IngesterChunks)
101101
s.lastMessage.FreeBuffer()
102102
s.lastMessage = nil
103103
}
@@ -110,7 +110,7 @@ func (s *SeriesChunksStreamReader) setLastMessage(msg *QueryStreamResponse) erro
110110
if s.lastMessage != nil {
111111
return fmt.Errorf("must call FreeBuffer() before storing the next message - this indicates a bug")
112112
}
113-
if err := s.memoryTracker.IncreaseMemoryConsumption(uint64(msg.Size())); err != nil {
113+
if err := s.memoryTracker.IncreaseMemoryConsumption(uint64(msg.Size()), limiter.IngesterChunks); err != nil {
114114
return err
115115
}
116116
s.lastMessage = msg

pkg/querier/block_streaming.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,8 @@ func (bqs *blockStreamingQuerierSeries) Iterator(reuse chunkenc.Iterator) chunke
144144
}
145145

146146
type memoryConsumptionTracker interface {
147-
IncreaseMemoryConsumption(b uint64) error
148-
DecreaseMemoryConsumption(b uint64)
147+
IncreaseMemoryConsumption(b uint64, source limiter.MemoryConsumptionSource) error
148+
DecreaseMemoryConsumption(b uint64, source limiter.MemoryConsumptionSource)
149149
}
150150

151151
// storeGatewayStreamReader is responsible for managing the streaming of chunks from a storegateway and buffering
@@ -196,7 +196,7 @@ func (s *storeGatewayStreamReader) Close() {
196196
// It is safe to call FreeBuffer multiple times, or to alternate GetChunks and FreeBuffer calls.
197197
func (s *storeGatewayStreamReader) FreeBuffer() {
198198
if s.lastMessage != nil {
199-
s.memoryTracker.DecreaseMemoryConsumption(uint64(s.lastMessage.Size()))
199+
s.memoryTracker.DecreaseMemoryConsumption(uint64(s.lastMessage.Size()), limiter.StoreGatewayChunks)
200200
s.lastMessage.FreeBuffer()
201201
s.lastMessage = nil
202202
}
@@ -209,7 +209,7 @@ func (s *storeGatewayStreamReader) setLastMessage(msg *storepb.SeriesResponse) e
209209
if s.lastMessage != nil {
210210
return fmt.Errorf("must call FreeBuffer() before storing the next message - this indicates a bug")
211211
}
212-
if err := s.memoryTracker.IncreaseMemoryConsumption(uint64(msg.Size())); err != nil {
212+
if err := s.memoryTracker.IncreaseMemoryConsumption(uint64(msg.Size()), limiter.StoreGatewayChunks); err != nil {
213213
return err
214214
}
215215
s.lastMessage = msg

pkg/streamingpromql/operators/aggregations/quantile.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ var qGroupPool = types.NewLimitingBucketedPool(
125125
pool.NewBucketedPool(maxExpectedQuantileGroups, func(size int) []qGroup {
126126
return make([]qGroup, 0, size)
127127
}),
128+
limiter.QuantileGroupSlices,
128129
uint64(unsafe.Sizeof(qGroup{})),
129130
false,
130131
nil,

pkg/streamingpromql/operators/aggregations/topkbottomk/instant_query.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ var instantQuerySeriesSlicePool = types.NewLimitingBucketedPool(
317317
pool.NewBucketedPool(types.MaxExpectedSeriesPerResult, func(size int) []instantQuerySeries {
318318
return make([]instantQuerySeries, 0, size)
319319
}),
320+
limiter.TopKBottomKInstantQuerySeriesSlices,
320321
uint64(unsafe.Sizeof(instantQuerySeries{})),
321322
true,
322323
nil,

pkg/streamingpromql/operators/aggregations/topkbottomk/range_query.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,7 @@ var rangeQuerySeriesSlicePool = types.NewLimitingBucketedPool(
494494
pool.NewBucketedPool(types.MaxExpectedSeriesPerResult, func(size int) []rangeQuerySeries {
495495
return make([]rangeQuerySeries, 0, size)
496496
}),
497+
limiter.TopKBottomKRangeQuerySeriesSlices,
497498
uint64(unsafe.Sizeof(rangeQuerySeries{})),
498499
true,
499500
nil,
@@ -503,6 +504,7 @@ var intSliceSlicePool = types.NewLimitingBucketedPool(
503504
pool.NewBucketedPool(types.MaxExpectedPointsPerSeries, func(size int) [][]int {
504505
return make([][]int, 0, size)
505506
}),
507+
limiter.IntSliceSlice,
506508
uint64(unsafe.Sizeof([][]int{})),
507509
true,
508510
nil,

pkg/streamingpromql/operators/binops/one_to_one_vector_vector_binary_operation_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,8 @@ func TestOneToOneVectorVectorBinaryOperation_SeriesMerging(t *testing.T) {
207207
}
208208
for _, s := range testCase.input {
209209
// Count the memory for the given floats + histograms
210-
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*uint64(len(s.Floats))+types.HPointSize*uint64(len(s.Histograms))))
210+
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*uint64(len(s.Floats)), limiter.FPointSlices))
211+
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.HPointSize*uint64(len(s.Histograms)), limiter.HPointSlices))
211212
}
212213

213214
result, err := o.mergeSingleSide(testCase.input, testCase.sourceSeriesIndices, testCase.sourceSeriesMetadata, "right")

pkg/streamingpromql/operators/functions/common_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ func TestFloatTransformationFunc(t *testing.T) {
4545
},
4646
}
4747
// Increase the memory tracker for 2 FPoints, and 1 HPoint
48-
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*2+types.HPointSize*1))
48+
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*2, limiter.FPointSlices))
49+
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.HPointSize*1, limiter.HPointSlices))
4950

5051
expected := types.InstantVectorSeriesData{
5152
Floats: []promql.FPoint{
@@ -78,7 +79,8 @@ func TestFloatTransformationDropHistogramsFunc(t *testing.T) {
7879
},
7980
}
8081
// Increase the memory tracker for 2 FPoints, and 1 HPoint
81-
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*2+types.HPointSize*1))
82+
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*2, limiter.FPointSlices))
83+
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.HPointSize*1, limiter.HPointSlices))
8284

8385
expected := types.InstantVectorSeriesData{
8486
Floats: []promql.FPoint{

pkg/streamingpromql/operators/functions/histogram_function.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ var pointBucketPool = types.NewLimitingBucketedPool(
7979
pool.NewBucketedPool(types.MaxExpectedPointsPerSeries, func(size int) []promql.Buckets {
8080
return make([]promql.Buckets, 0, size)
8181
}),
82+
limiter.BucketSlices,
8283
uint64(unsafe.Sizeof(promql.Buckets{})),
8384
true,
8485
mangleBuckets,
@@ -98,6 +99,7 @@ var bucketSliceBucketedPool = types.NewLimitingBucketedPool(
9899
pool.NewBucketedPool(maxExpectedBucketsPerHistogram, func(size int) []promql.Bucket {
99100
return make([]promql.Bucket, 0, size)
100101
}),
102+
limiter.BucketSlices,
101103
uint64(unsafe.Sizeof(promql.Bucket{})),
102104
true,
103105
nil,

pkg/streamingpromql/operators/operator_buffer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func TestInstantVectorOperatorBuffer_BufferingSubsetOfInputSeries(t *testing.T)
4545

4646
seriesUsed := []bool{true, false, true, true, true}
4747
memoryConsumptionTracker := limiter.NewMemoryConsumptionTracker(0, nil, "")
48-
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*6)) // We have 6 FPoints from the inner series.
48+
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*6, limiter.FPointSlices)) // We have 6 FPoints from the inner series.
4949
buffer := NewInstantVectorOperatorBuffer(inner, seriesUsed, 4, memoryConsumptionTracker)
5050
ctx := context.Background()
5151

@@ -115,7 +115,7 @@ func TestInstantVectorOperatorBuffer_BufferingAllInputSeries(t *testing.T) {
115115
}
116116

117117
memoryConsumptionTracker := limiter.NewMemoryConsumptionTracker(0, nil, "")
118-
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*6)) // We have 6 FPoints from the inner series.
118+
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*6, limiter.FPointSlices)) // We have 6 FPoints from the inner series.
119119
buffer := NewInstantVectorOperatorBuffer(inner, nil, 6, memoryConsumptionTracker)
120120
ctx := context.Background()
121121

pkg/streamingpromql/types/limiting_pool.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ var (
4545
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []promql.FPoint {
4646
return make([]promql.FPoint, 0, size)
4747
}),
48+
limiter.FPointSlices,
4849
FPointSize,
4950
false,
5051
nil,
@@ -54,6 +55,7 @@ var (
5455
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []promql.HPoint {
5556
return make([]promql.HPoint, 0, size)
5657
}),
58+
limiter.HPointSlices,
5759
HPointSize,
5860
false,
5961
func(point promql.HPoint) promql.HPoint {
@@ -66,6 +68,7 @@ var (
6668
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) promql.Vector {
6769
return make(promql.Vector, 0, size)
6870
}),
71+
limiter.Vectors,
6972
VectorSampleSize,
7073
false,
7174
nil,
@@ -75,6 +78,7 @@ var (
7578
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []float64 {
7679
return make([]float64, 0, size)
7780
}),
81+
limiter.Float64Slices,
7882
Float64Size,
7983
true,
8084
nil,
@@ -84,6 +88,7 @@ var (
8488
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []int {
8589
return make([]int, 0, size)
8690
}),
91+
limiter.IntSlices,
8792
IntSize,
8893
true,
8994
nil,
@@ -93,6 +98,7 @@ var (
9398
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []int64 {
9499
return make([]int64, 0, size)
95100
}),
101+
limiter.Int64Slices,
96102
Int64Size,
97103
true,
98104
nil,
@@ -102,6 +108,7 @@ var (
102108
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []bool {
103109
return make([]bool, 0, size)
104110
}),
111+
limiter.BoolSlices,
105112
BoolSize,
106113
true,
107114
nil,
@@ -111,6 +118,7 @@ var (
111118
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []*histogram.FloatHistogram {
112119
return make([]*histogram.FloatHistogram, 0, size)
113120
}),
121+
limiter.HistogramPointerSlices,
114122
HistogramPointerSize,
115123
true,
116124
mangleHistogram,
@@ -120,6 +128,7 @@ var (
120128
pool.NewBucketedPool(MaxExpectedSeriesPerResult, func(size int) []SeriesMetadata {
121129
return make([]SeriesMetadata, 0, size)
122130
}),
131+
limiter.SeriesMetadataSlices,
123132
SeriesMetadataSize,
124133
true,
125134
nil,
@@ -156,14 +165,16 @@ func mangleHistogram(h *histogram.FloatHistogram) *histogram.FloatHistogram {
156165
// assumes all native histograms are the same size, and assumes all elements of a promql.Vector are float samples.
157166
type LimitingBucketedPool[S ~[]E, E any] struct {
158167
inner *pool.BucketedPool[S, E]
168+
source limiter.MemoryConsumptionSource
159169
elementSize uint64
160170
clearOnGet bool
161171
mangle func(E) E
162172
}
163173

164-
func NewLimitingBucketedPool[S ~[]E, E any](inner *pool.BucketedPool[S, E], elementSize uint64, clearOnGet bool, mangle func(E) E) *LimitingBucketedPool[S, E] {
174+
func NewLimitingBucketedPool[S ~[]E, E any](inner *pool.BucketedPool[S, E], source limiter.MemoryConsumptionSource, elementSize uint64, clearOnGet bool, mangle func(E) E) *LimitingBucketedPool[S, E] {
165175
return &LimitingBucketedPool[S, E]{
166176
inner: inner,
177+
source: source,
167178
elementSize: elementSize,
168179
clearOnGet: clearOnGet,
169180
mangle: mangle,
@@ -187,7 +198,7 @@ func (p *LimitingBucketedPool[S, E]) Get(size int, tracker *limiter.MemoryConsum
187198
// - there's no guarantee the slice will have size 'size' when it's returned to us in putWithElementSize, so using 'size' would make the accounting below impossible
188199
estimatedBytes := uint64(cap(s)) * p.elementSize
189200

190-
if err := tracker.IncreaseMemoryConsumption(estimatedBytes); err != nil {
201+
if err := tracker.IncreaseMemoryConsumption(estimatedBytes, p.source); err != nil {
191202
p.inner.Put(s)
192203
return nil, err
193204
}
@@ -211,7 +222,7 @@ func (p *LimitingBucketedPool[S, E]) Put(s S, tracker *limiter.MemoryConsumption
211222
}
212223
}
213224

214-
tracker.DecreaseMemoryConsumption(uint64(cap(s)) * p.elementSize)
225+
tracker.DecreaseMemoryConsumption(uint64(cap(s))*p.elementSize, p.source)
215226
p.inner.Put(s)
216227
}
217228

0 commit comments

Comments
 (0)