Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/ingester/client/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ type StreamingSeriesSource struct {
}

type memoryConsumptionTracker interface {
IncreaseMemoryConsumption(b uint64) error
DecreaseMemoryConsumption(b uint64)
IncreaseMemoryConsumption(b uint64, source limiter.MemoryConsumptionSource) error
DecreaseMemoryConsumption(b uint64, source limiter.MemoryConsumptionSource)
}

func NewSeriesChunksStreamReader(ctx context.Context, client Ingester_QueryStreamClient, ingesterName string, expectedSeriesCount int, queryLimiter *limiter.QueryLimiter, memoryTracker memoryConsumptionTracker, cleanup func(), log log.Logger) *SeriesChunksStreamReader {
Expand Down Expand Up @@ -97,7 +97,7 @@ func (s *SeriesChunksStreamReader) Close() {
// It is safe to call FreeBuffer multiple times, or to alternate GetChunks and FreeBuffer calls.
func (s *SeriesChunksStreamReader) FreeBuffer() {
if s.lastMessage != nil {
s.memoryTracker.DecreaseMemoryConsumption(uint64(s.lastMessage.Size()))
s.memoryTracker.DecreaseMemoryConsumption(uint64(s.lastMessage.Size()), limiter.IngesterChunks)
s.lastMessage.FreeBuffer()
s.lastMessage = nil
}
Expand All @@ -110,7 +110,7 @@ func (s *SeriesChunksStreamReader) setLastMessage(msg *QueryStreamResponse) erro
if s.lastMessage != nil {
return fmt.Errorf("must call FreeBuffer() before storing the next message - this indicates a bug")
}
if err := s.memoryTracker.IncreaseMemoryConsumption(uint64(msg.Size())); err != nil {
if err := s.memoryTracker.IncreaseMemoryConsumption(uint64(msg.Size()), limiter.IngesterChunks); err != nil {
return err
}
s.lastMessage = msg
Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/block_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func (bqs *blockStreamingQuerierSeries) Iterator(reuse chunkenc.Iterator) chunke
}

type memoryConsumptionTracker interface {
IncreaseMemoryConsumption(b uint64) error
DecreaseMemoryConsumption(b uint64)
IncreaseMemoryConsumption(b uint64, source limiter.MemoryConsumptionSource) error
DecreaseMemoryConsumption(b uint64, source limiter.MemoryConsumptionSource)
}

// storeGatewayStreamReader is responsible for managing the streaming of chunks from a storegateway and buffering
Expand Down Expand Up @@ -196,7 +196,7 @@ func (s *storeGatewayStreamReader) Close() {
// It is safe to call FreeBuffer multiple times, or to alternate GetChunks and FreeBuffer calls.
func (s *storeGatewayStreamReader) FreeBuffer() {
if s.lastMessage != nil {
s.memoryTracker.DecreaseMemoryConsumption(uint64(s.lastMessage.Size()))
s.memoryTracker.DecreaseMemoryConsumption(uint64(s.lastMessage.Size()), limiter.StoreGatewayChunks)
s.lastMessage.FreeBuffer()
s.lastMessage = nil
}
Expand All @@ -209,7 +209,7 @@ func (s *storeGatewayStreamReader) setLastMessage(msg *storepb.SeriesResponse) e
if s.lastMessage != nil {
return fmt.Errorf("must call FreeBuffer() before storing the next message - this indicates a bug")
}
if err := s.memoryTracker.IncreaseMemoryConsumption(uint64(msg.Size())); err != nil {
if err := s.memoryTracker.IncreaseMemoryConsumption(uint64(msg.Size()), limiter.StoreGatewayChunks); err != nil {
return err
}
s.lastMessage = msg
Expand Down
1 change: 1 addition & 0 deletions pkg/streamingpromql/operators/aggregations/quantile.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ var qGroupPool = types.NewLimitingBucketedPool(
pool.NewBucketedPool(maxExpectedQuantileGroups, func(size int) []qGroup {
return make([]qGroup, 0, size)
}),
limiter.QuantileGroupSlices,
uint64(unsafe.Sizeof(qGroup{})),
false,
nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ var instantQuerySeriesSlicePool = types.NewLimitingBucketedPool(
pool.NewBucketedPool(types.MaxExpectedSeriesPerResult, func(size int) []instantQuerySeries {
return make([]instantQuerySeries, 0, size)
}),
limiter.TopKBottomKInstantQuerySeriesSlices,
uint64(unsafe.Sizeof(instantQuerySeries{})),
true,
nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ var rangeQuerySeriesSlicePool = types.NewLimitingBucketedPool(
pool.NewBucketedPool(types.MaxExpectedSeriesPerResult, func(size int) []rangeQuerySeries {
return make([]rangeQuerySeries, 0, size)
}),
limiter.TopKBottomKRangeQuerySeriesSlices,
uint64(unsafe.Sizeof(rangeQuerySeries{})),
true,
nil,
Expand All @@ -503,6 +504,7 @@ var intSliceSlicePool = types.NewLimitingBucketedPool(
pool.NewBucketedPool(types.MaxExpectedPointsPerSeries, func(size int) [][]int {
return make([][]int, 0, size)
}),
limiter.IntSliceSlice,
uint64(unsafe.Sizeof([][]int{})),
true,
nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ func TestOneToOneVectorVectorBinaryOperation_SeriesMerging(t *testing.T) {
}
for _, s := range testCase.input {
// Count the memory for the given floats + histograms
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*uint64(len(s.Floats))+types.HPointSize*uint64(len(s.Histograms))))
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*uint64(len(s.Floats)), limiter.FPointSlices))
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.HPointSize*uint64(len(s.Histograms)), limiter.HPointSlices))
}

result, err := o.mergeSingleSide(testCase.input, testCase.sourceSeriesIndices, testCase.sourceSeriesMetadata, "right")
Expand Down
6 changes: 4 additions & 2 deletions pkg/streamingpromql/operators/functions/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func TestFloatTransformationFunc(t *testing.T) {
},
}
// Increase the memory tracker for 2 FPoints, and 1 HPoint
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*2+types.HPointSize*1))
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*2, limiter.FPointSlices))
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.HPointSize*1, limiter.HPointSlices))

expected := types.InstantVectorSeriesData{
Floats: []promql.FPoint{
Expand Down Expand Up @@ -78,7 +79,8 @@ func TestFloatTransformationDropHistogramsFunc(t *testing.T) {
},
}
// Increase the memory tracker for 2 FPoints, and 1 HPoint
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*2+types.HPointSize*1))
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*2, limiter.FPointSlices))
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.HPointSize*1, limiter.HPointSlices))

expected := types.InstantVectorSeriesData{
Floats: []promql.FPoint{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var pointBucketPool = types.NewLimitingBucketedPool(
pool.NewBucketedPool(types.MaxExpectedPointsPerSeries, func(size int) []promql.Buckets {
return make([]promql.Buckets, 0, size)
}),
limiter.BucketSlices,
uint64(unsafe.Sizeof(promql.Buckets{})),
true,
mangleBuckets,
Expand All @@ -98,6 +99,7 @@ var bucketSliceBucketedPool = types.NewLimitingBucketedPool(
pool.NewBucketedPool(maxExpectedBucketsPerHistogram, func(size int) []promql.Bucket {
return make([]promql.Bucket, 0, size)
}),
limiter.BucketSlices,
uint64(unsafe.Sizeof(promql.Bucket{})),
true,
nil,
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operators/operator_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestInstantVectorOperatorBuffer_BufferingSubsetOfInputSeries(t *testing.T)

seriesUsed := []bool{true, false, true, true, true}
memoryConsumptionTracker := limiter.NewMemoryConsumptionTracker(0, nil, "")
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*6)) // We have 6 FPoints from the inner series.
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*6, limiter.FPointSlices)) // We have 6 FPoints from the inner series.
buffer := NewInstantVectorOperatorBuffer(inner, seriesUsed, 4, memoryConsumptionTracker)
ctx := context.Background()

Expand Down Expand Up @@ -115,7 +115,7 @@ func TestInstantVectorOperatorBuffer_BufferingAllInputSeries(t *testing.T) {
}

memoryConsumptionTracker := limiter.NewMemoryConsumptionTracker(0, nil, "")
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*6)) // We have 6 FPoints from the inner series.
require.NoError(t, memoryConsumptionTracker.IncreaseMemoryConsumption(types.FPointSize*6, limiter.FPointSlices)) // We have 6 FPoints from the inner series.
buffer := NewInstantVectorOperatorBuffer(inner, nil, 6, memoryConsumptionTracker)
ctx := context.Background()

Expand Down
17 changes: 14 additions & 3 deletions pkg/streamingpromql/types/limiting_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []promql.FPoint {
return make([]promql.FPoint, 0, size)
}),
limiter.FPointSlices,
FPointSize,
false,
nil,
Expand All @@ -54,6 +55,7 @@ var (
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []promql.HPoint {
return make([]promql.HPoint, 0, size)
}),
limiter.HPointSlices,
HPointSize,
false,
func(point promql.HPoint) promql.HPoint {
Expand All @@ -66,6 +68,7 @@ var (
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) promql.Vector {
return make(promql.Vector, 0, size)
}),
limiter.Vectors,
VectorSampleSize,
false,
nil,
Expand All @@ -75,6 +78,7 @@ var (
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []float64 {
return make([]float64, 0, size)
}),
limiter.Float64Slices,
Float64Size,
true,
nil,
Expand All @@ -84,6 +88,7 @@ var (
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []int {
return make([]int, 0, size)
}),
limiter.IntSlices,
IntSize,
true,
nil,
Expand All @@ -93,6 +98,7 @@ var (
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []int64 {
return make([]int64, 0, size)
}),
limiter.Int64Slices,
Int64Size,
true,
nil,
Expand All @@ -102,6 +108,7 @@ var (
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []bool {
return make([]bool, 0, size)
}),
limiter.BoolSlices,
BoolSize,
true,
nil,
Expand All @@ -111,6 +118,7 @@ var (
pool.NewBucketedPool(MaxExpectedPointsPerSeries, func(size int) []*histogram.FloatHistogram {
return make([]*histogram.FloatHistogram, 0, size)
}),
limiter.HistogramPointerSlices,
HistogramPointerSize,
true,
mangleHistogram,
Expand All @@ -120,6 +128,7 @@ var (
pool.NewBucketedPool(MaxExpectedSeriesPerResult, func(size int) []SeriesMetadata {
return make([]SeriesMetadata, 0, size)
}),
limiter.SeriesMetadataSlices,
SeriesMetadataSize,
true,
nil,
Expand Down Expand Up @@ -156,14 +165,16 @@ func mangleHistogram(h *histogram.FloatHistogram) *histogram.FloatHistogram {
// assumes all native histograms are the same size, and assumes all elements of a promql.Vector are float samples.
type LimitingBucketedPool[S ~[]E, E any] struct {
inner *pool.BucketedPool[S, E]
source limiter.MemoryConsumptionSource
elementSize uint64
clearOnGet bool
mangle func(E) E
}

func NewLimitingBucketedPool[S ~[]E, E any](inner *pool.BucketedPool[S, E], elementSize uint64, clearOnGet bool, mangle func(E) E) *LimitingBucketedPool[S, E] {
func NewLimitingBucketedPool[S ~[]E, E any](inner *pool.BucketedPool[S, E], source limiter.MemoryConsumptionSource, elementSize uint64, clearOnGet bool, mangle func(E) E) *LimitingBucketedPool[S, E] {
return &LimitingBucketedPool[S, E]{
inner: inner,
source: source,
elementSize: elementSize,
clearOnGet: clearOnGet,
mangle: mangle,
Expand All @@ -187,7 +198,7 @@ func (p *LimitingBucketedPool[S, E]) Get(size int, tracker *limiter.MemoryConsum
// - there's no guarantee the slice will have size 'size' when it's returned to us in putWithElementSize, so using 'size' would make the accounting below impossible
estimatedBytes := uint64(cap(s)) * p.elementSize

if err := tracker.IncreaseMemoryConsumption(estimatedBytes); err != nil {
if err := tracker.IncreaseMemoryConsumption(estimatedBytes, p.source); err != nil {
p.inner.Put(s)
return nil, err
}
Expand All @@ -211,7 +222,7 @@ func (p *LimitingBucketedPool[S, E]) Put(s S, tracker *limiter.MemoryConsumption
}
}

tracker.DecreaseMemoryConsumption(uint64(cap(s)) * p.elementSize)
tracker.DecreaseMemoryConsumption(uint64(cap(s))*p.elementSize, p.source)
p.inner.Put(s)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/streamingpromql/types/limiting_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestLimitingBucketedPool_Unlimited(t *testing.T) {

p := NewLimitingBucketedPool(
pool.NewBucketedPool(1024, func(size int) []promql.FPoint { return make([]promql.FPoint, 0, size) }),
limiter.FPointSlices,
FPointSize,
false,
nil,
Expand Down Expand Up @@ -79,6 +80,7 @@ func TestLimitingPool_Limited(t *testing.T) {

p := NewLimitingBucketedPool(
pool.NewBucketedPool(1024, func(size int) []promql.FPoint { return make([]promql.FPoint, 0, size) }),
limiter.FPointSlices,
FPointSize,
false,
nil,
Expand Down Expand Up @@ -204,6 +206,7 @@ func TestLimitingPool_Mangling(t *testing.T) {

p := NewLimitingBucketedPool(
pool.NewBucketedPool(1024, func(size int) []int { return make([]int, 0, size) }),
limiter.IntSlices,
1,
false,
func(_ int) int { return 123 },
Expand Down
Loading