Skip to content

Commit f72cb9b

Browse files
committed
[chore] [exporterhelper] Add an option for items based queue sizing
1 parent 1c84578 commit f72cb9b

File tree

7 files changed

+525
-167
lines changed

7 files changed

+525
-167
lines changed

exporter/exporterhelper/internal/bounded_memory_queue.go

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,26 @@ import (
1616
// the producer are dropped.
1717
type boundedMemoryQueue[T any] struct {
1818
component.StartFunc
19+
*queueCapacityLimiter[T]
1920
items chan queueRequest[T]
2021
}
2122

2223
// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
2324
// callback for dropped items (e.g. useful to emit metrics).
24-
func NewBoundedMemoryQueue[T any](capacity int) Queue[T] {
25+
func NewBoundedMemoryQueue[T any](sizer Sizer[T], capacity int) Queue[T] {
2526
return &boundedMemoryQueue[T]{
26-
items: make(chan queueRequest[T], capacity),
27+
queueCapacityLimiter: newQueueCapacityLimiter[T](sizer, capacity),
28+
items: make(chan queueRequest[T], capacity),
2729
}
2830
}
2931

3032
// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic.
3133
func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
32-
select {
33-
case q.items <- queueRequest[T]{ctx: ctx, req: req}:
34-
return nil
35-
default:
34+
if !q.queueCapacityLimiter.claim(req) {
3635
return ErrQueueIsFull
3736
}
37+
q.items <- queueRequest[T]{ctx: ctx, req: req}
38+
return nil
3839
}
3940

4041
// Consume applies the provided function on the head of queue.
@@ -45,6 +46,7 @@ func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) err
4546
if !ok {
4647
return false
4748
}
49+
q.queueCapacityLimiter.release(item.req)
4850
// the memory queue doesn't handle consume errors
4951
_ = consumeFunc(item.ctx, item.req)
5052
return true
@@ -56,15 +58,6 @@ func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
5658
return nil
5759
}
5860

59-
// Size returns the current size of the queue
60-
func (q *boundedMemoryQueue[T]) Size() int {
61-
return len(q.items)
62-
}
63-
64-
func (q *boundedMemoryQueue[T]) Capacity() int {
65-
return cap(q.items)
66-
}
67-
6861
type queueRequest[T any] struct {
6962
req T
7063
ctx context.Context

exporter/exporterhelper/internal/bounded_memory_queue_test.go

Lines changed: 33 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package internal
88
import (
99
"context"
1010
"errors"
11-
"fmt"
1211
"strconv"
1312
"sync"
1413
"testing"
@@ -23,7 +22,7 @@ import (
2322
// We want to test the overflow behavior, so we block the consumer
2423
// by holding a startLock before submitting items to the queue.
2524
func TestBoundedQueue(t *testing.T) {
26-
q := NewBoundedMemoryQueue[string](1)
25+
q := NewBoundedMemoryQueue[string](&RequestSizer[string]{}, 1)
2726

2827
assert.NoError(t, q.Offer(context.Background(), "a"))
2928

@@ -73,7 +72,7 @@ func TestBoundedQueue(t *testing.T) {
7372
// only after Stop will mean the consumers are still locked while
7473
// trying to perform the final consumptions.
7574
func TestShutdownWhileNotEmpty(t *testing.T) {
76-
q := NewBoundedMemoryQueue[string](1000)
75+
q := NewBoundedMemoryQueue[string](&RequestSizer[string]{}, 1000)
7776

7877
assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
7978
for i := 0; i < 10; i++ {
@@ -98,75 +97,70 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
9897
}))
9998
}
10099

101-
func Benchmark_QueueUsage_10000_1_50000(b *testing.B) {
102-
benchmarkQueueUsage(b, 10000, 1, 50000)
100+
func Benchmark_QueueUsage_10000_requests_1_50000(b *testing.B) {
101+
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 1, 50000)
103102
}
104103

105-
func Benchmark_QueueUsage_10000_2_50000(b *testing.B) {
106-
benchmarkQueueUsage(b, 10000, 2, 50000)
107-
}
108-
func Benchmark_QueueUsage_10000_5_50000(b *testing.B) {
109-
benchmarkQueueUsage(b, 10000, 5, 50000)
110-
}
111-
func Benchmark_QueueUsage_10000_10_50000(b *testing.B) {
112-
benchmarkQueueUsage(b, 10000, 10, 50000)
104+
func Benchmark_QueueUsage_10000_requests_10_50000(b *testing.B) {
105+
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 10, 50000)
113106
}
114107

115-
func Benchmark_QueueUsage_50000_1_50000(b *testing.B) {
116-
benchmarkQueueUsage(b, 50000, 1, 50000)
108+
func Benchmark_QueueUsage_50000_requests_1_50000(b *testing.B) {
109+
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 50000, 1, 50000)
117110
}
118111

119-
func Benchmark_QueueUsage_50000_2_50000(b *testing.B) {
120-
benchmarkQueueUsage(b, 50000, 2, 50000)
121-
}
122-
func Benchmark_QueueUsage_50000_5_50000(b *testing.B) {
123-
benchmarkQueueUsage(b, 50000, 5, 50000)
112+
func Benchmark_QueueUsage_50000_requests_10_50000(b *testing.B) {
113+
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 50000, 10, 50000)
124114
}
125-
func Benchmark_QueueUsage_50000_10_50000(b *testing.B) {
126-
benchmarkQueueUsage(b, 50000, 10, 50000)
115+
116+
func Benchmark_QueueUsage_10000_requests_1_250000(b *testing.B) {
117+
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 1, 250000)
127118
}
128119

129-
func Benchmark_QueueUsage_10000_1_250000(b *testing.B) {
130-
benchmarkQueueUsage(b, 10000, 1, 250000)
120+
func Benchmark_QueueUsage_10000_requests_10_250000(b *testing.B) {
121+
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 10, 250000)
131122
}
132123

133-
func Benchmark_QueueUsage_10000_2_250000(b *testing.B) {
134-
benchmarkQueueUsage(b, 10000, 2, 250000)
124+
func Benchmark_QueueUsage_1M_items_10_250k(b *testing.B) {
125+
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 1000000, 10, 250000)
135126
}
136-
func Benchmark_QueueUsage_10000_5_250000(b *testing.B) {
137-
benchmarkQueueUsage(b, 10000, 5, 250000)
127+
128+
func Benchmark_QueueUsage_1M_items_10_1M(b *testing.B) {
129+
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 1000000, 10, 1000000)
138130
}
139-
func Benchmark_QueueUsage_10000_10_250000(b *testing.B) {
140-
benchmarkQueueUsage(b, 10000, 10, 250000)
131+
132+
func Benchmark_QueueUsage_100M_items_10_10M(b *testing.B) {
133+
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 100000000, 10, 10000000)
141134
}
142135

143136
func TestQueueUsage(t *testing.T) {
144137
t.Run("with enough workers", func(t *testing.T) {
145-
queueUsage(t, 10000, 5, 1000)
138+
queueUsage(t, &RequestSizer[fakeReq]{}, 10000, 5, 1000)
146139
})
147140
t.Run("past capacity", func(t *testing.T) {
148-
queueUsage(t, 10000, 2, 50000)
141+
queueUsage(t, &RequestSizer[fakeReq]{}, 10000, 2, 50000)
149142
})
150143
}
151144

152-
func benchmarkQueueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int) {
145+
func benchmarkQueueUsage(b *testing.B, sizer Sizer[fakeReq], capacity int, numConsumers int,
146+
numberOfItems int) {
153147
b.ReportAllocs()
154148
for i := 0; i < b.N; i++ {
155-
queueUsage(b, capacity, numConsumers, numberOfItems)
149+
queueUsage(b, sizer, capacity, numConsumers, numberOfItems)
156150
}
157151
}
158152

159-
func queueUsage(tb testing.TB, capacity int, numConsumers int, numberOfItems int) {
153+
func queueUsage(tb testing.TB, sizer Sizer[fakeReq], capacity int, numConsumers int, numberOfItems int) {
160154
var wg sync.WaitGroup
161155
wg.Add(numberOfItems)
162-
q := NewBoundedMemoryQueue[string](capacity)
163-
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) error {
156+
q := NewBoundedMemoryQueue[fakeReq](sizer, capacity)
157+
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, fakeReq) error {
164158
wg.Done()
165159
return nil
166160
})
167161
require.NoError(tb, consumers.Start(context.Background(), componenttest.NewNopHost()))
168162
for j := 0; j < numberOfItems; j++ {
169-
if err := q.Offer(context.Background(), fmt.Sprintf("%d", j)); errors.Is(err, ErrQueueIsFull) {
163+
if err := q.Offer(context.Background(), fakeReq{10}); errors.Is(err, ErrQueueIsFull) {
170164
wg.Done()
171165
}
172166
}
@@ -176,7 +170,7 @@ func queueUsage(tb testing.TB, capacity int, numConsumers int, numberOfItems int
176170
}
177171

178172
func TestZeroSizeNoConsumers(t *testing.T) {
179-
q := NewBoundedMemoryQueue[string](0)
173+
q := NewBoundedMemoryQueue[string](&RequestSizer[string]{}, 0)
180174

181175
err := q.Start(context.Background(), componenttest.NewNopHost())
182176
assert.NoError(t, err)

0 commit comments

Comments
 (0)