Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporterqueue] Bare minimum frame of queue batcher + unit test. #11532

Merged
merged 10 commits into from
Oct 26, 2024
74 changes: 74 additions & 0 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"
"errors"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/internal"
)

type batch struct {
ctx context.Context
req internal.Request
idxList []uint64
}

// Batcher is in charge of reading items from the queue and send them out asynchronously.
type Batcher interface {
component.Component
}

type BaseBatcher struct {
batchCfg exporterbatcher.Config
queue Queue[internal.Request]
maxWorkers int
stopWG sync.WaitGroup
}

func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], maxWorkers int) (Batcher, error) {
if maxWorkers != 0 {
return nil, errors.ErrUnsupported
}

if batchCfg.Enabled {
return nil, errors.ErrUnsupported
}

return &DisabledBatcher{
BaseBatcher{
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
stopWG: sync.WaitGroup{},
},
}, nil
}

// flush exports the incoming batch synchronously.
func (qb *BaseBatcher) flush(batchToFlush batch) {
err := batchToFlush.req.Export(batchToFlush.ctx)
for _, idx := range batchToFlush.idxList {
qb.queue.OnProcessingFinished(idx, err)
}
}

// flushAsync starts a goroutine that calls flushIfNecessary. It blocks until a worker is available.
func (qb *BaseBatcher) flushAsync(batchToFlush batch) {
qb.stopWG.Add(1)
go func() {
dmitryax marked this conversation as resolved.
Show resolved Hide resolved
defer qb.stopWG.Done()
qb.flush(batchToFlush)
}()
}

// Shutdown ensures that queue and all Batcher are stopped.
func (qb *BaseBatcher) Shutdown(ctx context.Context) error {
qb.stopWG.Wait()
return nil
}
38 changes: 38 additions & 0 deletions exporter/internal/queue/disabled_batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"

"go.opentelemetry.io/collector/component"
)

// DisabledBatcher is a special-case of Batcher that has no size limit for sending. Any items read from the queue will
// be sent out (asynchronously) immediately regardless of the size.
type DisabledBatcher struct {
BaseBatcher
}

// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
// This goroutine reads and then flushes.
// 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped.
// 2. flushAsync() blocks until there are idle workers in the worker pool.
qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
for {
idx, _, req, ok := qb.queue.Read(context.Background())
if !ok {
return
}
qb.flushAsync(batch{
req: req,
ctx: context.Background(),
idxList: []uint64{idx}})
}
}()
return nil
}
89 changes: 89 additions & 0 deletions exporter/internal/queue/disabled_batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/internal"
)

func TestDisabledBatcher_InfiniteWorkerPool(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = false

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

maxWorkers := 0
ba, err := NewBatcher(cfg, q, maxWorkers)
require.Nil(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, q.Shutdown(context.Background()))
require.NoError(t, ba.Shutdown(context.Background()))
})

sink := newFakeRequestSink()

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8
}, 20*time.Millisecond, 10*time.Millisecond)

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 25
}, 20*time.Millisecond, 10*time.Millisecond)

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink}))

assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 38
}, 20*time.Millisecond, 10*time.Millisecond)
}

func TestDisabledBatcher_LimitedWorkerNotImplemented(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = false
maxWorkers := 1

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

_, err := NewBatcher(cfg, q, maxWorkers)
require.NotNil(t, err)
}

func TestDisabledBatcher_BatchingNotImplemented(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = true
maxWorkers := 0

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

_, err := NewBatcher(cfg, q, maxWorkers)
require.NotNil(t, err)
}
63 changes: 63 additions & 0 deletions exporter/internal/queue/fake_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"
"errors"
"sync/atomic"
"time"

"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/internal"
)

type fakeRequestSink struct {
requestsCount *atomic.Int64
itemsCount *atomic.Int64
}

func newFakeRequestSink() *fakeRequestSink {
return &fakeRequestSink{
requestsCount: new(atomic.Int64),
itemsCount: new(atomic.Int64),
}
}

type fakeRequest struct {
items int
exportErr error
delay time.Duration
sink *fakeRequestSink
}

func (r *fakeRequest) Export(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()

Check warning on line 38 in exporter/internal/queue/fake_request.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/fake_request.go#L37-L38

Added lines #L37 - L38 were not covered by tests
case <-time.After(r.delay):
}
if r.exportErr != nil {
return r.exportErr
}
if r.sink != nil {
r.sink.requestsCount.Add(1)
r.sink.itemsCount.Add(int64(r.items))
}
return nil
}

func (r *fakeRequest) ItemsCount() int {
return r.items

Check warning on line 52 in exporter/internal/queue/fake_request.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/fake_request.go#L51-L52

Added lines #L51 - L52 were not covered by tests
}

func (r *fakeRequest) Merge(_ context.Context,
_ internal.Request) (internal.Request, error) {
return nil, errors.New("not implemented")

Check warning on line 57 in exporter/internal/queue/fake_request.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/fake_request.go#L56-L57

Added lines #L56 - L57 were not covered by tests
}

func (r *fakeRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig,
_ internal.Request) ([]internal.Request, error) {
return nil, errors.New("not implemented")

Check warning on line 62 in exporter/internal/queue/fake_request.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/fake_request.go#L61-L62

Added lines #L61 - L62 were not covered by tests
}
Loading