Skip to content

Commit

Permalink
Batcher just the frame
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Oct 23, 2024
1 parent 866c7fc commit 7365e27
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 0 deletions.
141 changes: 141 additions & 0 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"
"math"
"sync"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/internal"
)

type batch struct {
ctx context.Context
req internal.Request
idxList []uint64
}

type Batcher struct {
batchCfg exporterbatcher.Config

queue Queue[internal.Request]
maxWorkers int

exportFunc func(context.Context, internal.Request) error

readingBatch *batch
timer *time.Timer
shutdownCh chan bool

stopWG sync.WaitGroup
}

func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request],
maxWorkers int, exportFunc func(context.Context, internal.Request) error) *Batcher {
return &Batcher{
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
exportFunc: exportFunc,
stopWG: sync.WaitGroup{},
shutdownCh: make(chan bool, 1),
}
}

// If preconditions pass, flush() take an item from the head of batch list and exports it.
func (qb *Batcher) flush(batchToFlush batch) {
err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req)
for _, idx := range batchToFlush.idxList {
qb.queue.OnProcessingFinished(idx, err)
}
}

// allocateFlusher() starts a goroutine that calls flushIfNecessary(). It blocks until a worker is available.
func (qb *Batcher) allocateFlusher(batchToFlush batch) {
// maxWorker = 0 means we don't limit the number of flushers.
if qb.maxWorkers == 0 {
qb.stopWG.Add(1)
go func() {
qb.flush(batchToFlush)
qb.stopWG.Done()
}()
return
}
panic("not implemented")
}

// Start ensures that queue and all consumers are started.
func (qb *Batcher) Start(ctx context.Context, host component.Host) error {
if err := qb.queue.Start(ctx, host); err != nil {
return err
}

if qb.batchCfg.Enabled {
panic("not implemented")
}

// Timer doesn't do anything yet, but adding it so compiler won't complain about qb.timer.C
qb.timer = time.NewTimer(math.MaxInt)
qb.timer.Stop()

if qb.maxWorkers != 0 {
panic("not implemented")
}

// This goroutine keeps reading until flush is triggered because of request size.
qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
for {
idx, _, req, ok := qb.queue.Read(context.Background())

if !ok {
qb.shutdownCh <- true
if qb.readingBatch != nil {
panic("batching is supported yet so reading batch should always be nil")
}

return
}
if !qb.batchCfg.Enabled {
qb.readingBatch = &batch{
req: req,
ctx: context.Background(),
idxList: []uint64{idx}}
qb.allocateFlusher(*qb.readingBatch)
qb.readingBatch = nil
} else {
panic("not implemented")
}
}
}()

qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
for {
select {
case <-qb.shutdownCh:
return
case <-qb.timer.C:
panic("batching is not yet implemented. Having timer here just so compiler won't complain")
}

}
}()
return nil
}

// Shutdown ensures that queue and all Batcher are stopped.
func (qb *Batcher) Shutdown(ctx context.Context) error {
if err := qb.queue.Shutdown(ctx); err != nil {
return err
}
qb.stopWG.Wait()
return nil
}
58 changes: 58 additions & 0 deletions exporter/internal/queue/batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/internal"
)

func testExportFunc(ctx context.Context, req internal.Request) error {
return req.Export(ctx)
}

func TestBatcher_BatchNotEnabled_InfiniteWorkerPool(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = false

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

maxWorkers := 0
ba := NewBatcher(cfg, q, maxWorkers, testExportFunc)

require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, ba.Shutdown(context.Background()))
})

sink := newFakeRequestSink()

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8
}, 20*time.Millisecond, 10*time.Millisecond)

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 25
}, 20*time.Millisecond, 10*time.Millisecond)

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink}))

assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 38
}, 20*time.Millisecond, 10*time.Millisecond)
}
63 changes: 63 additions & 0 deletions exporter/internal/queue/fake_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"
"errors"
"sync/atomic"
"time"

"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/internal"
)

type fakeRequestSink struct {
requestsCount *atomic.Int64
itemsCount *atomic.Int64
}

func newFakeRequestSink() *fakeRequestSink {
return &fakeRequestSink{
requestsCount: new(atomic.Int64),
itemsCount: new(atomic.Int64),
}
}

type fakeRequest struct {
items int
exportErr error
delay time.Duration
sink *fakeRequestSink
}

func (r *fakeRequest) Export(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(r.delay):
}
if r.exportErr != nil {
return r.exportErr
}
if r.sink != nil {
r.sink.requestsCount.Add(1)
r.sink.itemsCount.Add(int64(r.items))
}
return nil
}

func (r *fakeRequest) ItemsCount() int {
return r.items
}

func (r *fakeRequest) Merge(_ context.Context,
_ internal.Request) (internal.Request, error) {
return nil, errors.New("not implemented")
}

func (r *fakeRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig,
_ internal.Request) ([]internal.Request, error) {
return nil, errors.New("not implemented")
}

0 comments on commit 7365e27

Please sign in to comment.