Skip to content

Commit

Permalink
[exporterqueue] Limited worker pool support for queue batcher (#11540)
Browse files Browse the repository at this point in the history
#### Description

This PR follows
#11532 and
implements support for limited worker pool for queue batcher.

Design doc:

https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing

#### Link to tracking issue

#8122
#10368
  • Loading branch information
sfc-gh-sili authored Oct 29, 2024
1 parent 78036de commit f1de0ff
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 57 deletions.
24 changes: 20 additions & 4 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,11 @@ type BaseBatcher struct {
batchCfg exporterbatcher.Config
queue Queue[internal.Request]
maxWorkers int
workerPool chan bool
stopWG sync.WaitGroup
}

func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], maxWorkers int) (Batcher, error) {
if maxWorkers != 0 {
return nil, errors.ErrUnsupported
}

if batchCfg.Enabled {
return nil, errors.ErrUnsupported
}
Expand All @@ -50,6 +47,16 @@ func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request],
}, nil
}

func (qb *BaseBatcher) startWorkerPool() {
if qb.maxWorkers == 0 {
return
}
qb.workerPool = make(chan bool, qb.maxWorkers)
for i := 0; i < qb.maxWorkers; i++ {
qb.workerPool <- true
}
}

// flush exports the incoming batch synchronously.
func (qb *BaseBatcher) flush(batchToFlush batch) {
err := batchToFlush.req.Export(batchToFlush.ctx)
Expand All @@ -61,9 +68,18 @@ func (qb *BaseBatcher) flush(batchToFlush batch) {
// flushAsync starts a goroutine that calls flushIfNecessary. It blocks until a worker is available.
func (qb *BaseBatcher) flushAsync(batchToFlush batch) {
qb.stopWG.Add(1)
if qb.maxWorkers == 0 {
go func() {
defer qb.stopWG.Done()
qb.flush(batchToFlush)
}()
return
}
<-qb.workerPool
go func() {
defer qb.stopWG.Done()
qb.flush(batchToFlush)
qb.workerPool <- true
}()
}

Expand Down
2 changes: 2 additions & 0 deletions exporter/internal/queue/disabled_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type DisabledBatcher struct {

// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
qb.startWorkerPool()

// This goroutine reads and then flushes.
// 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped.
// 2. flushAsync() blocks until there are idle workers in the worker pool.
Expand Down
104 changes: 51 additions & 53 deletions exporter/internal/queue/disabled_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,60 +17,58 @@ import (
"go.opentelemetry.io/collector/exporter/internal"
)

func TestDisabledBatcher_InfiniteWorkerPool(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = false

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

maxWorkers := 0
ba, err := NewBatcher(cfg, q, maxWorkers)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, q.Shutdown(context.Background()))
require.NoError(t, ba.Shutdown(context.Background()))
})

sink := newFakeRequestSink()

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8
}, 30*time.Millisecond, 10*time.Millisecond)

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 25
}, 30*time.Millisecond, 10*time.Millisecond)

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink}))

assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 38
}, 30*time.Millisecond, 10*time.Millisecond)
}

func TestDisabledBatcher_LimitedWorkerNotImplemented(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = false
maxWorkers := 1

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
func TestDisabledBatcher_Basic(t *testing.T) {
tests := []struct {
name string
maxWorkers int
}{
{
name: "infinate_workers",
maxWorkers: 0,
},
{
name: "one_worker",
maxWorkers: 1,
},
{
name: "three_workers",
maxWorkers: 3,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = false

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

ba, err := NewBatcher(cfg, q, tt.maxWorkers)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, q.Shutdown(context.Background()))
require.NoError(t, ba.Shutdown(context.Background()))
})

sink := newFakeRequestSink()

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 35, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 2, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 75
}, 30*time.Millisecond, 10*time.Millisecond)
})

_, err := NewBatcher(cfg, q, maxWorkers)
require.Error(t, err)
}
}

func TestDisabledBatcher_BatchingNotImplemented(t *testing.T) {
Expand Down

0 comments on commit f1de0ff

Please sign in to comment.