Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporterqueue] Bare minimum frame of queue batcher + unit test. #11532

Merged
merged 10 commits into from
Oct 26, 2024
78 changes: 78 additions & 0 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/internal"
)

type batch struct {
ctx context.Context
req internal.Request
idxList []uint64
}

// Batcher is in charge of reading items from the queue and send them out asynchronously.
type Batcher interface {
component.Component
}

type BaseBatcher struct {
batchCfg exporterbatcher.Config
queue Queue[internal.Request]
maxWorkers int
stopWG sync.WaitGroup
}

func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], maxWorkers int) Batcher {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it return error instead of panic.

if maxWorkers != 0 {
panic("not implemented")

Check warning on line 35 in exporter/internal/queue/batcher.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/batcher.go#L35

Added line #L35 was not covered by tests
}

if batchCfg.Enabled {
panic("not implemented")

Check warning on line 39 in exporter/internal/queue/batcher.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/batcher.go#L39

Added line #L39 was not covered by tests
}

return &DisabledBatcher{
BaseBatcher{
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
stopWG: sync.WaitGroup{},
},
}
}

// flush exports the incoming batch synchronously.
func (qb *BaseBatcher) flush(batchToFlush batch) {
err := batchToFlush.req.Export(batchToFlush.ctx)
for _, idx := range batchToFlush.idxList {
qb.queue.OnProcessingFinished(idx, err)
}
}

// flushAsync starts a goroutine that calls flushIfNecessary. It blocks until a worker is available.
func (qb *BaseBatcher) flushAsync(batchToFlush batch) {
// maxWorker = 0 means we don't limit the number of flushers.
if qb.maxWorkers == 0 {
qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
qb.flush(batchToFlush)
}()
return
}
panic("not implemented")

Check warning on line 71 in exporter/internal/queue/batcher.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/batcher.go#L71

Added line #L71 was not covered by tests
}

// Shutdown ensures that queue and all Batcher are stopped.
func (qb *BaseBatcher) Shutdown(ctx context.Context) error {
qb.stopWG.Wait()
return nil
}
38 changes: 38 additions & 0 deletions exporter/internal/queue/disabled_batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"

"go.opentelemetry.io/collector/component"
)

// DisabledBatcher is a special-case of Batcher that has no size limit for sending. Any items read from the queue will
// be sent out (asynchronously) immediately regardless of the size.
type DisabledBatcher struct {
BaseBatcher
}

// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
// This goroutine reads and then flushes.
// 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped.
// 2. flushAsync() blocks until there are idle workers in the worker pool.
qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
for {
idx, _, req, ok := qb.queue.Read(context.Background())
if !ok {
return
}
qb.flushAsync(batch{
req: req,
ctx: context.Background(),
idxList: []uint64{idx}})
}
}()
return nil
}
58 changes: 58 additions & 0 deletions exporter/internal/queue/disabled_batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/internal"
)

func TestDisabledBatcher_InfiniteWorkerPool(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = false

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

maxWorkers := 0
ba := NewBatcher(cfg, q, maxWorkers)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, q.Shutdown(context.Background()))
require.NoError(t, ba.Shutdown(context.Background()))
})

sink := newFakeRequestSink()

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8
}, 20*time.Millisecond, 10*time.Millisecond)

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 25
}, 20*time.Millisecond, 10*time.Millisecond)

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink}))

assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 38
}, 20*time.Millisecond, 10*time.Millisecond)
}
63 changes: 63 additions & 0 deletions exporter/internal/queue/fake_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"
"errors"
"sync/atomic"
"time"

"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/internal"
)

type fakeRequestSink struct {
requestsCount *atomic.Int64
itemsCount *atomic.Int64
}

func newFakeRequestSink() *fakeRequestSink {
return &fakeRequestSink{
requestsCount: new(atomic.Int64),
itemsCount: new(atomic.Int64),
}
}

type fakeRequest struct {
items int
exportErr error
delay time.Duration
sink *fakeRequestSink
}

func (r *fakeRequest) Export(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()

Check warning on line 38 in exporter/internal/queue/fake_request.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/fake_request.go#L37-L38

Added lines #L37 - L38 were not covered by tests
case <-time.After(r.delay):
}
if r.exportErr != nil {
return r.exportErr
}
if r.sink != nil {
r.sink.requestsCount.Add(1)
r.sink.itemsCount.Add(int64(r.items))
}
return nil
}

func (r *fakeRequest) ItemsCount() int {
return r.items

Check warning on line 52 in exporter/internal/queue/fake_request.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/fake_request.go#L51-L52

Added lines #L51 - L52 were not covered by tests
}

func (r *fakeRequest) Merge(_ context.Context,
_ internal.Request) (internal.Request, error) {
return nil, errors.New("not implemented")

Check warning on line 57 in exporter/internal/queue/fake_request.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/fake_request.go#L56-L57

Added lines #L56 - L57 were not covered by tests
}

func (r *fakeRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig,
_ internal.Request) ([]internal.Request, error) {
return nil, errors.New("not implemented")

Check warning on line 62 in exporter/internal/queue/fake_request.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/fake_request.go#L61-L62

Added lines #L61 - L62 were not covered by tests
}
Loading