From 7be491d83dfcb7d7fa62f4ade651fafd51154a31 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Wed, 23 Oct 2024 15:28:56 -0700 Subject: [PATCH] Batcher just the frame --- exporter/internal/queue/batcher.go | 141 ++++++++++++++++++++++++ exporter/internal/queue/batcher_test.go | 60 ++++++++++ exporter/internal/queue/fake_request.go | 63 +++++++++++ 3 files changed, 264 insertions(+) create mode 100644 exporter/internal/queue/batcher.go create mode 100644 exporter/internal/queue/batcher_test.go create mode 100644 exporter/internal/queue/fake_request.go diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go new file mode 100644 index 00000000000..1d87002e7ae --- /dev/null +++ b/exporter/internal/queue/batcher.go @@ -0,0 +1,141 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import ( + "context" + "math" + "sync" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/internal" +) + +type batch struct { + ctx context.Context + req internal.Request + idxList []uint64 +} + +type Batcher struct { + batchCfg exporterbatcher.Config + + queue Queue[internal.Request] + maxWorkers int + + exportFunc func(context.Context, internal.Request) error + + readingBatch *batch + timer *time.Timer + shutdownCh chan bool + + stopWG sync.WaitGroup +} + +func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], + maxWorkers int, exportFunc func(context.Context, internal.Request) error) *Batcher { + return &Batcher{ + batchCfg: batchCfg, + queue: queue, + maxWorkers: maxWorkers, + exportFunc: exportFunc, + stopWG: sync.WaitGroup{}, + shutdownCh: make(chan bool, 1), + } +} + +// If preconditions pass, flush() take an item from the head of batch list and exports it. +func (qb *Batcher) flush(batchToFlush batch) { + err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req) + for _, idx := range batchToFlush.idxList { + qb.queue.OnProcessingFinished(idx, err) + } +} + +// allocateFlusher() starts a goroutine that calls flushIfNecessary(). It blocks until a worker is available. +func (qb *Batcher) allocateFlusher(batchToFlush batch) { + // maxWorker = 0 means we don't limit the number of flushers. + if qb.maxWorkers == 0 { + qb.stopWG.Add(1) + go func() { + qb.flush(batchToFlush) + qb.stopWG.Done() + }() + return + } + panic("not implemented") +} + +// Start ensures that queue and all consumers are started. +func (qb *Batcher) Start(ctx context.Context, host component.Host) error { + if err := qb.queue.Start(ctx, host); err != nil { + return err + } + + if qb.batchCfg.Enabled { + panic("not implemented") + } + + // Timer doesn't do anything yet, but adding it so compiler won't complain about qb.timer.C + qb.timer = time.NewTimer(math.MaxInt) + qb.timer.Stop() + + if qb.maxWorkers != 0 { + panic("not implemented") + } + + // This goroutine keeps reading until flush is triggered because of request size. + qb.stopWG.Add(1) + go func() { + defer qb.stopWG.Done() + for { + idx, _, req, ok := qb.queue.Read(context.Background()) + + if !ok { + qb.shutdownCh <- true + if qb.readingBatch != nil { + panic("batching is supported yet so reading batch should always be nil") + } + + return + } + if !qb.batchCfg.Enabled { + qb.readingBatch = &batch{ + req: req, + ctx: context.Background(), + idxList: []uint64{idx}} + qb.allocateFlusher(*qb.readingBatch) + qb.readingBatch = nil + } else { + panic("not implemented") + } + } + }() + + qb.stopWG.Add(1) + go func() { + defer qb.stopWG.Done() + for { + select { + case <-qb.shutdownCh: + return + case <-qb.timer.C: + panic("batching is not yet implemented. Having timer here just so compiler won't complain") + } + + } + }() + return nil +} + +// Shutdown ensures that queue and all Batcher are stopped. +func (qb *Batcher) Shutdown(ctx context.Context) error { + if err := qb.queue.Shutdown(ctx); err != nil { + return err + } + qb.stopWG.Wait() + return nil +} diff --git a/exporter/internal/queue/batcher_test.go b/exporter/internal/queue/batcher_test.go new file mode 100644 index 00000000000..0b1379938dc --- /dev/null +++ b/exporter/internal/queue/batcher_test.go @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/internal" +) + +func testExportFunc(ctx context.Context, req internal.Request) error { + return req.Export(ctx) +} + +func TestBatcher_BatchNotEnabled_InfiniteWorkerPool(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.Enabled = false + + q := NewBoundedMemoryQueue[internal.Request]( + MemoryQueueSettings[internal.Request]{ + Sizer: &RequestSizer[internal.Request]{}, + Capacity: 10, + }) + + maxWorkers := 0 + ba := NewBatcher(cfg, q, maxWorkers, testExportFunc) + + require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, ba.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 + }, 20*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 25 + }, 20*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink})) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 38 + }, 20*time.Millisecond, 10*time.Millisecond) +} diff --git a/exporter/internal/queue/fake_request.go b/exporter/internal/queue/fake_request.go new file mode 100644 index 00000000000..a0983db6d35 --- /dev/null +++ b/exporter/internal/queue/fake_request.go @@ -0,0 +1,63 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import ( + "context" + "errors" + "sync/atomic" + "time" + + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/internal" +) + +type fakeRequestSink struct { + requestsCount *atomic.Int64 + itemsCount *atomic.Int64 +} + +func newFakeRequestSink() *fakeRequestSink { + return &fakeRequestSink{ + requestsCount: new(atomic.Int64), + itemsCount: new(atomic.Int64), + } +} + +type fakeRequest struct { + items int + exportErr error + delay time.Duration + sink *fakeRequestSink +} + +func (r *fakeRequest) Export(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(r.delay): + } + if r.exportErr != nil { + return r.exportErr + } + if r.sink != nil { + r.sink.requestsCount.Add(1) + r.sink.itemsCount.Add(int64(r.items)) + } + return nil +} + +func (r *fakeRequest) ItemsCount() int { + return r.items +} + +func (r *fakeRequest) Merge(_ context.Context, + _ internal.Request) (internal.Request, error) { + return nil, errors.New("not implemented") +} + +func (r *fakeRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, + _ internal.Request) ([]internal.Request, error) { + return nil, errors.New("not implemented") +}