From 1d05504c28dfc8ab8a41bba834f948e0427ce969 Mon Sep 17 00:00:00 2001 From: wmdanor Date: Sat, 5 Apr 2025 22:50:58 +0100 Subject: [PATCH 1/3] prototype --- sdk/log/batch.go | 94 ---------- sdk/log/batch_test.go | 122 ------------- sdk/log/batchv2.go | 343 ++++++++++++++++++++++++++++++++++++ sdk/log/batchv2_test.go | 381 ++++++++++++++++++++++++++++++++++++++++ sdk/log/bench_test.go | 17 ++ sdk/log/benchv2_test.go | 115 ++++++++++++ sdk/log/queue.go | 115 ++++++++++++ sdk/log/queue_test.go | 136 ++++++++++++++ 8 files changed, 1107 insertions(+), 216 deletions(-) create mode 100644 sdk/log/batchv2.go create mode 100644 sdk/log/batchv2_test.go create mode 100644 sdk/log/benchv2_test.go create mode 100644 sdk/log/queue.go create mode 100644 sdk/log/queue_test.go diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 28c969262b4..6ef2cfa63c5 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -7,7 +7,6 @@ import ( "context" "errors" "slices" - "sync" "sync/atomic" "time" @@ -251,99 +250,6 @@ func (b *BatchProcessor) ForceFlush(ctx context.Context) error { return errors.Join(err, b.exporter.ForceFlush(ctx)) } -// queue holds a queue of logging records. -// -// When the queue becomes full, the oldest records in the queue are -// overwritten. -type queue struct { - sync.Mutex - - dropped atomic.Uint64 - cap, len int - read, write *ring -} - -func newQueue(size int) *queue { - r := newRing(size) - return &queue{ - cap: size, - read: r, - write: r, - } -} - -// Dropped returns the number of Records dropped during enqueueing since the -// last time Dropped was called. -func (q *queue) Dropped() uint64 { - return q.dropped.Swap(0) -} - -// Enqueue adds r to the queue. The queue size, including the addition of r, is -// returned. -// -// If enqueueing r will exceed the capacity of q, the oldest Record held in q -// will be dropped and r retained. -func (q *queue) Enqueue(r Record) int { - q.Lock() - defer q.Unlock() - - q.write.Value = r - q.write = q.write.Next() - - q.len++ - if q.len > q.cap { - // Overflow. Advance read to be the new "oldest". - q.len = q.cap - q.read = q.read.Next() - q.dropped.Add(1) - } - return q.len -} - -// TryDequeue attempts to dequeue up to len(buf) Records. The available Records -// will be assigned into buf and passed to write. If write fails, returning -// false, the Records will not be removed from the queue. If write succeeds, -// returning true, the dequeued Records are removed from the queue. The number -// of Records remaining in the queue are returned. -// -// When write is called the lock of q is held. The write function must not call -// other methods of this q that acquire the lock. -func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int { - q.Lock() - defer q.Unlock() - - origRead := q.read - - n := min(len(buf), q.len) - for i := 0; i < n; i++ { - buf[i] = q.read.Value - q.read = q.read.Next() - } - - if write(buf[:n]) { - q.len -= n - } else { - q.read = origRead - } - return q.len -} - -// Flush returns all the Records held in the queue and resets it to be -// empty. -func (q *queue) Flush() []Record { - q.Lock() - defer q.Unlock() - - out := make([]Record, q.len) - for i := range out { - out[i] = q.read.Value - q.read = q.read.Next() - } - q.len = 0 - - return out -} - type batchConfig struct { maxQSize setting[int] expInterval setting[time.Duration] diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index b2e993a5bfa..828337421d1 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -7,7 +7,6 @@ import ( "bytes" "context" stdlog "log" - "slices" "strconv" "strings" "sync" @@ -522,127 +521,6 @@ func TestBatchProcessor(t *testing.T) { }) } -func TestQueue(t *testing.T) { - var r Record - r.SetBody(log.BoolValue(true)) - - t.Run("newQueue", func(t *testing.T) { - const size = 1 - q := newQueue(size) - assert.Equal(t, 0, q.len) - assert.Equal(t, size, q.cap, "capacity") - assert.Equal(t, size, q.read.Len(), "read ring") - assert.Same(t, q.read, q.write, "different rings") - }) - - t.Run("Enqueue", func(t *testing.T) { - const size = 2 - q := newQueue(size) - - var notR Record - notR.SetBody(log.IntValue(10)) - - assert.Equal(t, 1, q.Enqueue(notR), "incomplete batch") - assert.Equal(t, 1, q.len, "length") - assert.Equal(t, size, q.cap, "capacity") - - assert.Equal(t, 2, q.Enqueue(r), "complete batch") - assert.Equal(t, 2, q.len, "length") - assert.Equal(t, size, q.cap, "capacity") - - assert.Equal(t, 2, q.Enqueue(r), "overflow batch") - assert.Equal(t, 2, q.len, "length") - assert.Equal(t, size, q.cap, "capacity") - - assert.Equal(t, []Record{r, r}, q.Flush(), "flushed Records") - }) - - t.Run("Dropped", func(t *testing.T) { - q := newQueue(1) - - _ = q.Enqueue(r) - _ = q.Enqueue(r) - assert.Equal(t, uint64(1), q.Dropped(), "fist") - - _ = q.Enqueue(r) - _ = q.Enqueue(r) - assert.Equal(t, uint64(2), q.Dropped(), "second") - }) - - t.Run("Flush", func(t *testing.T) { - const size = 2 - q := newQueue(size) - q.write.Value = r - q.write = q.write.Next() - q.len = 1 - - assert.Equal(t, []Record{r}, q.Flush(), "flushed") - }) - - t.Run("TryFlush", func(t *testing.T) { - const size = 3 - q := newQueue(size) - for i := 0; i < size-1; i++ { - q.write.Value = r - q.write = q.write.Next() - q.len++ - } - - buf := make([]Record, 1) - f := func([]Record) bool { return false } - assert.Equal(t, size-1, q.TryDequeue(buf, f), "not flushed") - require.Equal(t, size-1, q.len, "length") - require.NotSame(t, q.read, q.write, "read ring advanced") - - var flushed []Record - f = func(r []Record) bool { - flushed = append(flushed, r...) - return true - } - if assert.Equal(t, size-2, q.TryDequeue(buf, f), "did not flush len(buf)") { - assert.Equal(t, []Record{r}, flushed, "Records") - } - - buf = slices.Grow(buf, size) - flushed = flushed[:0] - if assert.Equal(t, 0, q.TryDequeue(buf, f), "did not flush len(queue)") { - assert.Equal(t, []Record{r}, flushed, "Records") - } - }) - - t.Run("ConcurrentSafe", func(t *testing.T) { - const goRoutines = 10 - - flushed := make(chan []Record, goRoutines) - out := make([]Record, 0, goRoutines) - done := make(chan struct{}) - go func() { - defer close(done) - for recs := range flushed { - out = append(out, recs...) - } - }() - - var wg sync.WaitGroup - wg.Add(goRoutines) - - b := newQueue(goRoutines) - for i := 0; i < goRoutines; i++ { - go func() { - defer wg.Done() - b.Enqueue(Record{}) - flushed <- b.Flush() - }() - } - - wg.Wait() - close(flushed) - <-done - - assert.Len(t, out, goRoutines, "flushed Records") - }) -} - func BenchmarkBatchProcessorOnEmit(b *testing.B) { r := new(Record) body := log.BoolValue(true) diff --git a/sdk/log/batchv2.go b/sdk/log/batchv2.go new file mode 100644 index 00000000000..ebc3fee8ce3 --- /dev/null +++ b/sdk/log/batchv2.go @@ -0,0 +1,343 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package log // import "go.opentelemetry.io/otel/sdk/log" + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/internal/global" +) + +type recordsBufferPool struct { + pool sync.Pool +} + +func newRecordsBufferPool(bufferSize int) *recordsBufferPool { + return &recordsBufferPool{ + pool: sync.Pool{ + New: func() any { + slice := make([]Record, bufferSize) + return &slice + }, + }, + } +} + +func (p *recordsBufferPool) Get() *[]Record { + return p.pool.Get().(*[]Record) +} + +func (p *recordsBufferPool) Put(recordsBuffer *[]Record) { + p.pool.Put(recordsBuffer) +} + +type recordsChunk struct { + ctx context.Context + buf *[]Record + records int + + respCh chan<- error +} + +// Compile-time check BatchProcessor implements Processor. +var _ Processor = (*BatchProcessorV2)(nil) + +// BatchProcessorV2 is a processor that exports batches of log records. +// +// Use [NewBatchProcessorV2] to create a BatchProcessor. An empty BatchProcessor +// is shut down by default, no records will be batched or exported. +type BatchProcessorV2 struct { + // The BatchProcessor is designed to provide the highest throughput of + // log records possible while being compatible with OpenTelemetry. The + // entry point of log records is the OnEmit method. This method is designed + // to receive records as fast as possible while still honoring shutdown + // commands. All records received are enqueued to queue. + // + // In order to block OnEmit as little as possible, a separate "poll" + // goroutine is spawned at the creation of a BatchProcessor. This + // goroutine is responsible for batching the queue at regular polled + // intervals, or when it is directly signaled to. + // + // To keep the polling goroutine from backing up, all batches it makes are + // exported with a bufferedExporter. This exporter allows the poll + // goroutine to enqueue an export payload that will be handled in a + // separate goroutine dedicated to the export. This asynchronous behavior + // allows the poll goroutine to maintain accurate interval polling. + // + // ___BatchProcessor____ __Poll Goroutine__ __Export Goroutine__ + // || || || || || || + // || ********** || || || || ********** || + // || Records=>* OnEmit * || || | - ticker || || * export * || + // || ********** || || | - trigger || || ********** || + // || || || || | || || || || + // || || || || | || || || || + // || __________\/___ || || |*********** || || ______/\_______ || + // || (____queue______)>=||=||===|* batch *===||=||=>[_export_buffer_] || + // || || || |*********** || || || + // ||_____________________|| ||__________________|| ||____________________|| + // + // + // The "release valve" in this processing is the record queue. This queue + // is a ring buffer. It will overwrite the oldest records first when writes + // to OnEmit are made faster than the queue can be flushed. If batches + // cannot be flushed to the export buffer, the records will remain in the + // queue. + + exporter Exporter + + // queue is the active queue of records that have not yet been exported. + queue *queue + // batchSize is the minimum number of records needed before an export is + // triggered (unless the interval expires). + batchSize int + + // pollTrigger triggers the poll goroutine to flush a batch from the queue. + // This is sent to when it is known that the queue contains at least one + // complete batch. + // + // When a send is made to the channel, the poll loop will be reset after + // the flush. If there is still enough records in the queue for another + // batch the reset of the poll loop will automatically re-trigger itself. + // There is no need for the original sender to monitor and resend. + pollTrigger chan struct{} + // pollKill kills the poll goroutine. This is only expected to be closed + // once by the Shutdown method. + pollKill chan struct{} + // pollDone signals the poll goroutine has completed. + pollDone chan struct{} + + // stopped holds the stopped state of the BatchProcessor. + stopped atomic.Bool + + queuedChunks chan recordsChunk + queuedChunksClosed atomic.Bool + queuedChunksMu sync.Mutex + queuedChunksDone chan struct{} + recordsBufPool *recordsBufferPool + + noCmp [0]func() //nolint: unused // This is indeed used. +} + +// NewBatchProcessorV2 decorates the provided exporter +// so that the log records are batched before exporting. +// +// All of the exporter's methods are called synchronously. +func NewBatchProcessorV2(exporter Exporter, opts ...BatchProcessorOption) *BatchProcessorV2 { + cfg := newBatchConfig(opts) + if exporter == nil { + // Do not panic on nil export. + exporter = defaultNoopExporter + } + // Order is important here. Wrap the timeoutExporter with the chunkExporter + // to ensure each export completes in timeout (instead of all chunked + // exports). + exporter = newTimeoutExporter(exporter, cfg.expTimeout.Value) + // Use a chunkExporter to ensure ForceFlush and Shutdown calls are batched + // appropriately on export. + // exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value) + + b := &BatchProcessorV2{ + exporter: exporter, + + queue: newQueue(cfg.maxQSize.Value), + batchSize: cfg.expMaxBatchSize.Value, + pollTrigger: make(chan struct{}, 1), + pollKill: make(chan struct{}), + + queuedChunks: make(chan recordsChunk, cfg.expBufferSize.Value), + recordsBufPool: newRecordsBufferPool(cfg.expMaxBatchSize.Value), + } + b.pollDone = b.poll(cfg.expInterval.Value) + b.queuedChunksDone = b.processQueuedChunks() + return b +} + +// poll spawns a goroutine to handle interval polling and batch exporting. The +// returned done chan is closed when the spawned goroutine completes. +func (b *BatchProcessorV2) poll(interval time.Duration) (done chan struct{}) { + done = make(chan struct{}) + + ticker := time.NewTicker(interval) + go func() { + defer close(done) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + case <-b.pollTrigger: + ticker.Reset(interval) + case <-b.pollKill: + return + } + + if d := b.queue.Dropped(); d > 0 { + global.Warn("dropped log records", "dropped", d) + } + + enqueued, recordsInQueue := b.enqueueExport(nil) + if !enqueued || recordsInQueue >= b.batchSize { + // There is another full batch ready. Immediately trigger + // another export attempt. + select { + case b.pollTrigger <- struct{}{}: + default: + // Another flush signal already received. + } + } + } + }() + + return done +} + +func (b *BatchProcessorV2) processQueuedChunks() (done chan struct{}) { + done = make(chan struct{}) + + go func() { + defer close(done) + for chunk := range b.queuedChunks { + err := b.exporter.Export(chunk.ctx, (*chunk.buf)[:chunk.records]) + + b.recordsBufPool.Put(chunk.buf) + + select { + case chunk.respCh <- err: + default: + // e.respCh is nil or busy, default to otel.Handler. + if err != nil { + otel.Handle(err) + } + } + } + }() + + return done +} + +func (b *BatchProcessorV2) enqueueExport(respCh chan<- error) (enqueued bool, queueLen int) { + b.queuedChunksMu.Lock() + defer b.queuedChunksMu.Unlock() + + if len(b.queuedChunks) == cap(b.queuedChunks) { + if respCh != nil { + respCh <- nil + } + return false, 0 + } + + buf := b.recordsBufPool.Get() + + queueLen, n := b.queue.Dequeue(*buf) + + data := recordsChunk{ctx: context.Background(), respCh: respCh, buf: buf, records: n} + + if b.queuedChunksClosed.Load() { + if respCh != nil { + respCh <- nil + } + return true, 0 + } + + // done in sync as available space is guaranteed by len check + b.queuedChunks <- data + + return true, queueLen +} + +// OnEmit batches provided log record. +func (b *BatchProcessorV2) OnEmit(_ context.Context, r *Record) error { + if b.stopped.Load() || b.queue == nil { + return nil + } + // The record is cloned so that changes done by subsequent processors + // are not going to lead to a data race. + if n := b.queue.Enqueue(r.Clone()); n >= b.batchSize { + select { + case b.pollTrigger <- struct{}{}: + default: + // Flush chan full. The poll goroutine will handle this by + // re-sending any trigger until the queue has less than batchSize + // records. + } + } + return nil +} + +// Shutdown flushes queued log records and shuts down the decorated exporter. +func (b *BatchProcessorV2) Shutdown(ctx context.Context) error { + if b.stopped.Swap(true) || b.queue == nil { + return nil + } + + // Stop the poll goroutine. + close(b.pollKill) + select { + case <-b.pollDone: + case <-ctx.Done(): + // Out of time. + return errors.Join(ctx.Err(), b.exporter.Shutdown(ctx)) + } + + err := b.flush(ctx) + if err != nil { + return errors.Join(err, b.exporter.Shutdown(ctx)) + } + + b.queuedChunksClosed.Store(true) + b.queuedChunksMu.Lock() + defer b.queuedChunksMu.Unlock() + close(b.queuedChunks) + select { + case <-b.queuedChunksDone: + case <-ctx.Done(): + // Out of time. + return errors.Join(ctx.Err(), b.exporter.Shutdown(ctx)) + } + + return errors.Join(err, b.exporter.Shutdown(ctx)) +} + +// ForceFlush flushes queued log records and flushes the decorated exporter. +func (b *BatchProcessorV2) ForceFlush(ctx context.Context) error { + if b.stopped.Load() || b.queue == nil { + return nil + } + + err := b.flush(ctx) + + return errors.Join(err, b.exporter.ForceFlush(ctx)) +} + +func (b *BatchProcessorV2) flush(ctx context.Context) error { + var err error + for { + respCh := make(chan error, 1) + enqueued, queueLen := b.enqueueExport(respCh) + + select { + case respErr := <-respCh: + if respErr != nil { + err = errors.Join(respErr, errPartialFlush) + } + case <-ctx.Done(): + err = errors.Join(ctxErr(ctx), errPartialFlush) + } + + if err != nil { + break + } + + if enqueued && queueLen == 0 { + break + } + } + + return err +} diff --git a/sdk/log/batchv2_test.go b/sdk/log/batchv2_test.go new file mode 100644 index 00000000000..008cfd5a2e0 --- /dev/null +++ b/sdk/log/batchv2_test.go @@ -0,0 +1,381 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package log // import "go.opentelemetry.io/otel/sdk/log" + +import ( + "context" + stdlog "log" + "strings" + "sync" + "testing" + "time" + "unsafe" + + "github.com/go-logr/stdr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/log" +) + +func TestBatchProcessorV2(t *testing.T) { + ctx := context.Background() + + t.Run("NilExporter", func(t *testing.T) { + assert.NotPanics(t, func() { NewBatchProcessorV2(nil) }) + }) + + t.Run("Polling", func(t *testing.T) { + e := newTestExporter(nil) + const size = 15 + b := NewBatchProcessorV2( + e, + WithMaxQueueSize(2*size), + WithExportMaxBatchSize(2*size), + WithExportInterval(time.Nanosecond), + WithExportTimeout(time.Hour), + ) + for i := 0; i < size; i++ { + assert.NoError(t, b.OnEmit(ctx, new(Record))) + } + var got []Record + assert.Eventually(t, func() bool { + for _, r := range e.Records() { + got = append(got, r...) + } + return len(got) == size + }, 2*time.Second, time.Microsecond) + _ = b.Shutdown(ctx) + }) + + t.Run("OnEmit", func(t *testing.T) { + const batch = 10 + e := newTestExporter(nil) + b := NewBatchProcessorV2( + e, + WithMaxQueueSize(10*batch), + WithExportMaxBatchSize(batch), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + for i := 0; i < 10*batch; i++ { + assert.NoError(t, b.OnEmit(ctx, new(Record))) + } + assert.Eventually(t, func() bool { + return e.ExportN() > 1 + }, 2*time.Second, time.Microsecond, "multi-batch flush") + + assert.NoError(t, b.Shutdown(ctx)) + assert.GreaterOrEqual(t, e.ExportN(), 10) + }) + + t.Run("RetriggerFlushNonBlocking", func(t *testing.T) { + e := newTestExporter(nil) + e.ExportTrigger = make(chan struct{}) + + const batch = 10 + b := NewBatchProcessorV2( + e, + WithMaxQueueSize(3*batch), + WithExportMaxBatchSize(batch), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + for i := 0; i < 2*batch; i++ { + assert.NoError(t, b.OnEmit(ctx, new(Record))) + } + + var n int + require.Eventually(t, func() bool { + n = e.ExportN() + return n > 0 + }, 2*time.Second, time.Microsecond, "blocked export not attempted") + + var err error + require.Eventually(t, func() bool { + err = b.OnEmit(ctx, new(Record)) + return true + }, time.Second, time.Microsecond, "OnEmit blocked") + assert.NoError(t, err) + + e.ExportTrigger <- struct{}{} + assert.Eventually(t, func() bool { + return e.ExportN() > n + }, 2*time.Second, time.Microsecond, "flush not retriggered") + + close(e.ExportTrigger) + assert.NoError(t, b.Shutdown(ctx)) + assert.Equal(t, 3, e.ExportN()) + }) + + t.Run("Shutdown", func(t *testing.T) { + t.Run("Error", func(t *testing.T) { + e := newTestExporter(assert.AnError) + b := NewBatchProcessorV2(e) + assert.ErrorIs(t, b.Shutdown(ctx), assert.AnError, "exporter error not returned") + assert.NoError(t, b.Shutdown(ctx)) + }) + + t.Run("Multiple", func(t *testing.T) { + e := newTestExporter(nil) + b := NewBatchProcessorV2(e) + + const shutdowns = 3 + for i := 0; i < shutdowns; i++ { + assert.NoError(t, b.Shutdown(ctx)) + } + assert.Equal(t, 1, e.ShutdownN(), "exporter Shutdown calls") + }) + + t.Run("OnEmit", func(t *testing.T) { + e := newTestExporter(nil) + b := NewBatchProcessorV2(e) + assert.NoError(t, b.Shutdown(ctx)) + + want := e.ExportN() + assert.NoError(t, b.OnEmit(ctx, new(Record))) + assert.Equal(t, want, e.ExportN(), "Export called after shutdown") + }) + + t.Run("ForceFlush", func(t *testing.T) { + e := newTestExporter(nil) + b := NewBatchProcessorV2(e) + + assert.NoError(t, b.OnEmit(ctx, new(Record))) + assert.NoError(t, b.Shutdown(ctx)) + + assert.NoError(t, b.ForceFlush(ctx)) + assert.Equal(t, 0, e.ForceFlushN(), "ForceFlush called after shutdown") + }) + + t.Run("CanceledContext", func(t *testing.T) { + e := newTestExporter(nil) + e.ExportTrigger = make(chan struct{}) + t.Cleanup(func() { close(e.ExportTrigger) }) + b := NewBatchProcessorV2(e) + + ctx := context.Background() + c, cancel := context.WithCancel(ctx) + cancel() + + assert.ErrorIs(t, b.Shutdown(c), context.Canceled) + }) + }) + + t.Run("ForceFlush", func(t *testing.T) { + t.Run("Flush", func(t *testing.T) { + e := newTestExporter(assert.AnError) + b := NewBatchProcessorV2( + e, + WithMaxQueueSize(100), + WithExportMaxBatchSize(10), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + t.Cleanup(func() { _ = b.Shutdown(ctx) }) + + r := new(Record) + r.SetBody(log.BoolValue(true)) + require.NoError(t, b.OnEmit(ctx, r)) + + assert.ErrorIs(t, b.ForceFlush(ctx), assert.AnError, "exporter error not returned") + assert.Equal(t, 1, e.ForceFlushN(), "exporter ForceFlush calls") + if assert.Equal(t, 1, e.ExportN(), "exporter Export calls") { + got := e.Records() + if assert.Len(t, got[0], 1, "records received") { + assert.Equal(t, *r, got[0][0]) + } + } + }) + + t.Run("ErrorPartialFlush", func(t *testing.T) { + e := newTestExporter(nil) + e.ExportTrigger = make(chan struct{}) + + ctxErrCalled := make(chan struct{}) + orig := ctxErr + ctxErr = func(ctx context.Context) error { + close(ctxErrCalled) + return orig(ctx) + } + t.Cleanup(func() { ctxErr = orig }) + + const batch = 1 + b := NewBatchProcessorV2( + e, + WithMaxQueueSize(10*batch), + WithExportMaxBatchSize(batch), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + + // Enqueue 10 x "batch size" amount of records. + for i := 0; i < 10*batch; i++ { + require.NoError(t, b.OnEmit(ctx, new(Record))) + } + assert.Eventually(t, func() bool { + return e.ExportN() > 0 && len(b.queuedChunks) == cap(b.queuedChunks) + }, 2*time.Second, time.Microsecond) + // 1 export being performed, 1 export in buffer chan, >1 batch + // still in queue that an attempt to flush will be made on. + // + // Stop the poll routine to prevent contention with the queue lock. + // This is outside of "normal" operations, but we are testing if + // ForceFlush will return the correct error when an EnqueueExport + // fails and not if ForceFlush will ever get the queue lock in high + // throughput situations. + close(b.pollDone) + <-b.pollDone + + // Cancel the flush ctx from the start so errPartialFlush is + // returned right away. + fCtx, cancel := context.WithCancel(ctx) + cancel() + + errCh := make(chan error, 1) + go func() { + errCh <- b.ForceFlush(fCtx) + close(errCh) + }() + // Wait for ctxErrCalled to close before closing ExportTrigger so + // we know the errPartialFlush will be returned in ForceFlush. + <-ctxErrCalled + close(e.ExportTrigger) + + err := <-errCh + assert.ErrorIs(t, err, errPartialFlush, "partial flush error") + assert.ErrorIs(t, err, context.Canceled, "ctx canceled error") + }) + + t.Run("CanceledContext", func(t *testing.T) { + e := newTestExporter(nil) + e.ExportTrigger = make(chan struct{}) + b := NewBatchProcessorV2(e) + t.Cleanup(func() { _ = b.Shutdown(ctx) }) + + r := new(Record) + r.SetBody(log.BoolValue(true)) + _ = b.OnEmit(ctx, r) + t.Cleanup(func() { _ = b.Shutdown(ctx) }) + t.Cleanup(func() { close(e.ExportTrigger) }) + + c, cancel := context.WithCancel(ctx) + cancel() + assert.ErrorIs(t, b.ForceFlush(c), context.Canceled) + }) + }) + + t.Run("DroppedLogs", func(t *testing.T) { + orig := global.GetLogger() + t.Cleanup(func() { global.SetLogger(orig) }) + // Use concurrentBuffer for concurrent-safe reading. + buf := new(concurrentBuffer) + stdr.SetVerbosity(1) + global.SetLogger(stdr.New(stdlog.New(buf, "", 0))) + + e := newTestExporter(nil) + e.ExportTrigger = make(chan struct{}) + + b := NewBatchProcessorV2( + e, + WithMaxQueueSize(1), + WithExportMaxBatchSize(1), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + r := new(Record) + // First record will be blocked by testExporter.Export + assert.NoError(t, b.OnEmit(ctx, r), "exported record") + require.Eventually(t, func() bool { + return e.ExportN() > 0 + }, 2*time.Second, time.Microsecond, "blocked export not attempted") + + // Second record will be written to export queue + assert.NoError(t, b.OnEmit(ctx, r), "export queue record") + require.Eventually(t, func() bool { + return len(b.queuedChunks) == cap(b.queuedChunks) + }, 2*time.Second, time.Microsecond, "blocked queue read not attempted") + + // Third record will be written to BatchProcessor.q + assert.NoError(t, b.OnEmit(ctx, r), "first queued") + // The previous record will be dropped, as the new one will be written to BatchProcessor.q + assert.NoError(t, b.OnEmit(ctx, r), "second queued") + + wantMsg := `"level"=1 "msg"="dropped log records" "dropped"=1` + assert.Eventually(t, func() bool { + return strings.Contains(buf.String(), wantMsg) + }, 2*time.Second, time.Microsecond) + + close(e.ExportTrigger) + _ = b.Shutdown(ctx) + }) + + t.Run("ConcurrentSafe", func(t *testing.T) { + const goRoutines = 10 + + e := newTestExporter(nil) + b := NewBatchProcessorV2(e) + + ctx, cancel := context.WithCancel(ctx) + var wg sync.WaitGroup + for i := 0; i < goRoutines-1; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + assert.NoError(t, b.OnEmit(ctx, new(Record))) + // Ignore partial flush errors. + _ = b.ForceFlush(ctx) + } + } + }() + } + + require.Eventually(t, func() bool { + return e.ExportN() > 0 + }, 2*time.Second, time.Microsecond, "export before shutdown") + + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, b.Shutdown(ctx)) + cancel() + }() + + wg.Wait() + }) +} + +func BenchmarkBatchProcessorV2OnEmit(b *testing.B) { + r := new(Record) + body := log.BoolValue(true) + r.SetBody(body) + + rSize := unsafe.Sizeof(r) + unsafe.Sizeof(body) + ctx := context.Background() + bp := NewBatchProcessorV2( + defaultNoopExporter, + WithMaxQueueSize(b.N+1), + WithExportMaxBatchSize(b.N+1), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + b.Cleanup(func() { _ = bp.Shutdown(ctx) }) + + b.SetBytes(int64(rSize)) + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + var err error + for pb.Next() { + err = bp.OnEmit(ctx, r) + } + _ = err + }) +} diff --git a/sdk/log/bench_test.go b/sdk/log/bench_test.go index 835f68c7aba..74689d7f387 100644 --- a/sdk/log/bench_test.go +++ b/sdk/log/bench_test.go @@ -13,6 +13,17 @@ import ( "github.com/stretchr/testify/assert" ) +type mockDelayExporter struct{} + +func (mockDelayExporter) Export(context.Context, []Record) error { + time.Sleep(time.Millisecond * 5) + return nil +} + +func (mockDelayExporter) Shutdown(context.Context) error { return nil } + +func (mockDelayExporter) ForceFlush(context.Context) error { return nil } + func BenchmarkProcessor(b *testing.B) { for _, tc := range []struct { name string @@ -30,6 +41,12 @@ func BenchmarkProcessor(b *testing.B) { return []LoggerProviderOption{WithProcessor(NewBatchProcessor(noopExporter{}))} }, }, + { + name: "BatchSimulateExport", + f: func() []LoggerProviderOption { + return []LoggerProviderOption{WithProcessor(NewBatchProcessor(mockDelayExporter{}))} + }, + }, { name: "SetTimestampSimple", f: func() []LoggerProviderOption { diff --git a/sdk/log/benchv2_test.go b/sdk/log/benchv2_test.go new file mode 100644 index 00000000000..d8590318eda --- /dev/null +++ b/sdk/log/benchv2_test.go @@ -0,0 +1,115 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package log + +import ( + "context" + "testing" + + "go.opentelemetry.io/otel/log" + + "github.com/stretchr/testify/assert" +) + +func BenchmarkV2Processor(b *testing.B) { + for _, tc := range []struct { + name string + f func() []LoggerProviderOption + }{ + { + name: "Simple", + f: func() []LoggerProviderOption { + return []LoggerProviderOption{WithProcessor(NewSimpleProcessor(noopExporter{}))} + }, + }, + { + name: "Batch", + f: func() []LoggerProviderOption { + return []LoggerProviderOption{WithProcessor(NewBatchProcessorV2(noopExporter{}))} + }, + }, + { + name: "BatchSimulateExport", + f: func() []LoggerProviderOption { + return []LoggerProviderOption{WithProcessor(NewBatchProcessorV2(mockDelayExporter{}))} + }, + }, + { + name: "SetTimestampSimple", + f: func() []LoggerProviderOption { + return []LoggerProviderOption{ + WithProcessor(timestampProcessor{}), + WithProcessor(NewSimpleProcessor(noopExporter{})), + } + }, + }, + { + name: "SetTimestampBatch", + f: func() []LoggerProviderOption { + return []LoggerProviderOption{ + WithProcessor(timestampProcessor{}), + WithProcessor(NewBatchProcessorV2(noopExporter{})), + } + }, + }, + { + name: "AddAttributesSimple", + f: func() []LoggerProviderOption { + return []LoggerProviderOption{ + WithProcessor(attrAddProcessor{}), + WithProcessor(NewSimpleProcessor(noopExporter{})), + } + }, + }, + { + name: "AddAttributesBatch", + f: func() []LoggerProviderOption { + return []LoggerProviderOption{ + WithProcessor(attrAddProcessor{}), + WithProcessor(NewBatchProcessorV2(noopExporter{})), + } + }, + }, + { + name: "SetAttributesSimple", + f: func() []LoggerProviderOption { + return []LoggerProviderOption{ + WithProcessor(attrSetDecorator{}), + WithProcessor(NewSimpleProcessor(noopExporter{})), + } + }, + }, + { + name: "SetAttributesBatch", + f: func() []LoggerProviderOption { + return []LoggerProviderOption{ + WithProcessor(attrSetDecorator{}), + WithProcessor(NewBatchProcessorV2(noopExporter{})), + } + }, + }, + } { + b.Run(tc.name, func(b *testing.B) { + provider := NewLoggerProvider(tc.f()...) + b.Cleanup(func() { assert.NoError(b, provider.Shutdown(context.Background())) }) + logger := provider.Logger(b.Name()) + + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + r := log.Record{} + r.SetBody(log.StringValue("message")) + r.SetSeverity(log.SeverityInfo) + r.AddAttributes( + log.String("foo", "bar"), + log.Float64("float", 3.14), + log.Int("int", 123), + log.Bool("bool", true), + ) + logger.Emit(context.Background(), r) + } + }) + }) + } +} diff --git a/sdk/log/queue.go b/sdk/log/queue.go new file mode 100644 index 00000000000..16ce9d836c6 --- /dev/null +++ b/sdk/log/queue.go @@ -0,0 +1,115 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package log // import "go.opentelemetry.io/otel/sdk/log" + +import ( + "sync" + "sync/atomic" +) + +// queue holds a queue of logging records. +// When the queue becomes full, the oldest records in the queue are +// overwritten. +type queue struct { + sync.Mutex + + dropped atomic.Uint64 + cap, len int + read, write *ring +} + +func newQueue(size int) *queue { + r := newRing(size) + return &queue{ + cap: size, + read: r, + write: r, + } +} + +// Dropped returns the number of Records dropped during enqueueing since the +// last time Dropped was called. +func (q *queue) Dropped() uint64 { + return q.dropped.Swap(0) +} + +// Enqueue adds r to the queue. The queue size, including the addition of r, is +// returned. +// +// If enqueueing r will exceed the capacity of q, the oldest Record held in q +// will be dropped and r retained. +func (q *queue) Enqueue(r Record) int { + q.Lock() + defer q.Unlock() + + q.write.Value = r + q.write = q.write.Next() + + q.len++ + if q.len > q.cap { + // Overflow. Advance read to be the new "oldest". + q.len = q.cap + q.read = q.read.Next() + q.dropped.Add(1) + } + return q.len +} + +// TryDequeue attempts to dequeue up to len(buf) Records. The available Records +// will be assigned into buf and passed to write. If write fails, returning +// false, the Records will not be removed from the queue. If write succeeds, +// returning true, the dequeued Records are removed from the queue. The number +// of Records remaining in the queue are returned. +// +// When write is called the lock of q is held. The write function must not call +// other methods of this q that acquire the lock. +func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int { + q.Lock() + defer q.Unlock() + + origRead := q.read + + n := min(len(buf), q.len) + for i := 0; i < n; i++ { + buf[i] = q.read.Value + q.read = q.read.Next() + } + + if write(buf[:n]) { + q.len -= n + } else { + q.read = origRead + } + return q.len +} + +func (q *queue) Dequeue(buf []Record) (queueLen, written int) { + q.Lock() + defer q.Unlock() + + n := min(len(buf), q.len) + for i := 0; i < n; i++ { + buf[i] = q.read.Value + q.read = q.read.Next() + } + + q.len -= n + return q.len, n +} + +// Flush returns all the Records held in the queue and resets it to be +// empty. +func (q *queue) Flush() []Record { + q.Lock() + defer q.Unlock() + + out := make([]Record, q.len) + for i := range out { + out[i] = q.read.Value + q.read = q.read.Next() + } + q.len = 0 + + return out +} diff --git a/sdk/log/queue_test.go b/sdk/log/queue_test.go new file mode 100644 index 00000000000..7552353eb42 --- /dev/null +++ b/sdk/log/queue_test.go @@ -0,0 +1,136 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package log // import "go.opentelemetry.io/otel/sdk/log" + +import ( + "slices" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/log" +) + +func TestQueue(t *testing.T) { + var r Record + r.SetBody(log.BoolValue(true)) + + t.Run("newQueue", func(t *testing.T) { + const size = 1 + q := newQueue(size) + assert.Equal(t, 0, q.len) + assert.Equal(t, size, q.cap, "capacity") + assert.Equal(t, size, q.read.Len(), "read ring") + assert.Same(t, q.read, q.write, "different rings") + }) + + t.Run("Enqueue", func(t *testing.T) { + const size = 2 + q := newQueue(size) + + var notR Record + notR.SetBody(log.IntValue(10)) + + assert.Equal(t, 1, q.Enqueue(notR), "incomplete batch") + assert.Equal(t, 1, q.len, "length") + assert.Equal(t, size, q.cap, "capacity") + + assert.Equal(t, 2, q.Enqueue(r), "complete batch") + assert.Equal(t, 2, q.len, "length") + assert.Equal(t, size, q.cap, "capacity") + + assert.Equal(t, 2, q.Enqueue(r), "overflow batch") + assert.Equal(t, 2, q.len, "length") + assert.Equal(t, size, q.cap, "capacity") + + assert.Equal(t, []Record{r, r}, q.Flush(), "flushed Records") + }) + + t.Run("Dropped", func(t *testing.T) { + q := newQueue(1) + + _ = q.Enqueue(r) + _ = q.Enqueue(r) + assert.Equal(t, uint64(1), q.Dropped(), "fist") + + _ = q.Enqueue(r) + _ = q.Enqueue(r) + assert.Equal(t, uint64(2), q.Dropped(), "second") + }) + + t.Run("Flush", func(t *testing.T) { + const size = 2 + q := newQueue(size) + q.write.Value = r + q.write = q.write.Next() + q.len = 1 + + assert.Equal(t, []Record{r}, q.Flush(), "flushed") + }) + + t.Run("TryFlush", func(t *testing.T) { + const size = 3 + q := newQueue(size) + for i := 0; i < size-1; i++ { + q.write.Value = r + q.write = q.write.Next() + q.len++ + } + + buf := make([]Record, 1) + f := func([]Record) bool { return false } + assert.Equal(t, size-1, q.TryDequeue(buf, f), "not flushed") + require.Equal(t, size-1, q.len, "length") + require.NotSame(t, q.read, q.write, "read ring advanced") + + var flushed []Record + f = func(r []Record) bool { + flushed = append(flushed, r...) + return true + } + if assert.Equal(t, size-2, q.TryDequeue(buf, f), "did not flush len(buf)") { + assert.Equal(t, []Record{r}, flushed, "Records") + } + + buf = slices.Grow(buf, size) + flushed = flushed[:0] + if assert.Equal(t, 0, q.TryDequeue(buf, f), "did not flush len(queue)") { + assert.Equal(t, []Record{r}, flushed, "Records") + } + }) + + t.Run("ConcurrentSafe", func(t *testing.T) { + const goRoutines = 10 + + flushed := make(chan []Record, goRoutines) + out := make([]Record, 0, goRoutines) + done := make(chan struct{}) + go func() { + defer close(done) + for recs := range flushed { + out = append(out, recs...) + } + }() + + var wg sync.WaitGroup + wg.Add(goRoutines) + + b := newQueue(goRoutines) + for i := 0; i < goRoutines; i++ { + go func() { + defer wg.Done() + b.Enqueue(Record{}) + flushed <- b.Flush() + }() + } + + wg.Wait() + close(flushed) + <-done + + assert.Len(t, out, goRoutines, "flushed Records") + }) +} From 14f15e77bb12780f5cd1f1911637211525c3ea29 Mon Sep 17 00:00:00 2001 From: wmdanor Date: Thu, 10 Apr 2025 22:11:52 +0100 Subject: [PATCH 2/3] improve --- sdk/log/batch.go | 211 +++++++++++++++++----- sdk/log/batch_test.go | 4 +- sdk/log/batchv2.go | 343 ------------------------------------ sdk/log/batchv2_test.go | 381 ---------------------------------------- sdk/log/bench_test.go | 8 +- sdk/log/benchv2_test.go | 115 ------------ 6 files changed, 178 insertions(+), 884 deletions(-) delete mode 100644 sdk/log/batchv2.go delete mode 100644 sdk/log/batchv2_test.go delete mode 100644 sdk/log/benchv2_test.go diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 6ef2cfa63c5..92161b841eb 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -6,10 +6,11 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( "context" "errors" - "slices" + "sync" "sync/atomic" "time" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/internal/global" ) @@ -29,6 +30,40 @@ const ( // Compile-time check BatchProcessor implements Processor. var _ Processor = (*BatchProcessor)(nil) +type recordsBufferPool struct { + pool sync.Pool +} + +func newRecordsBufferPool(bufferSize int) *recordsBufferPool { + return &recordsBufferPool{ + pool: sync.Pool{ + New: func() any { + slice := make([]Record, bufferSize) + return &slice + }, + }, + } +} + +func (p *recordsBufferPool) Get() *[]Record { + return p.pool.Get().(*[]Record) +} + +func (p *recordsBufferPool) Put(recordsBuffer *[]Record) { + p.pool.Put(recordsBuffer) +} + +type recordsBatch struct { + ctx context.Context + buf *[]Record + records int + + respCh chan<- error +} + +// Compile-time check BatchProcessor implements Processor. +var _ Processor = (*BatchProcessor)(nil) + // BatchProcessor is a processor that exports batches of log records. // // Use [NewBatchProcessor] to create a BatchProcessor. An empty BatchProcessor @@ -70,11 +105,10 @@ type BatchProcessor struct { // cannot be flushed to the export buffer, the records will remain in the // queue. - // exporter is the bufferedExporter all batches are exported with. - exporter *bufferExporter + exporter Exporter - // q is the active queue of records that have not yet been exported. - q *queue + // queue is the active queue of records that have not yet been exported. + queue *queue // batchSize is the minimum number of records needed before an export is // triggered (unless the interval expires). batchSize int @@ -97,6 +131,13 @@ type BatchProcessor struct { // stopped holds the stopped state of the BatchProcessor. stopped atomic.Bool + recordsBatches chan recordsBatch + recordsBatchesClosed bool + recordsBatchesMu sync.Mutex + recordsBatchesDone chan struct{} + + recordsBufPool *recordsBufferPool + noCmp [0]func() //nolint: unused // This is indeed used. } @@ -116,17 +157,21 @@ func NewBatchProcessor(exporter Exporter, opts ...BatchProcessorOption) *BatchPr exporter = newTimeoutExporter(exporter, cfg.expTimeout.Value) // Use a chunkExporter to ensure ForceFlush and Shutdown calls are batched // appropriately on export. - exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value) + // exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value) b := &BatchProcessor{ - exporter: newBufferExporter(exporter, cfg.expBufferSize.Value), + exporter: exporter, - q: newQueue(cfg.maxQSize.Value), + queue: newQueue(cfg.maxQSize.Value), batchSize: cfg.expMaxBatchSize.Value, pollTrigger: make(chan struct{}, 1), pollKill: make(chan struct{}), + + recordsBatches: make(chan recordsBatch, cfg.expBufferSize.Value), + recordsBufPool: newRecordsBufferPool(cfg.expMaxBatchSize.Value), } b.pollDone = b.poll(cfg.expInterval.Value) + b.recordsBatchesDone = b.processRecordsBatches() return b } @@ -136,8 +181,6 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { done = make(chan struct{}) ticker := time.NewTicker(interval) - // TODO: investigate using a sync.Pool instead of cloning. - buf := make([]Record, b.batchSize) go func() { defer close(done) defer ticker.Stop() @@ -151,18 +194,12 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { return } - if d := b.q.Dropped(); d > 0 { + if d := b.queue.Dropped(); d > 0 { global.Warn("dropped log records", "dropped", d) } - qLen := b.q.TryDequeue(buf, func(r []Record) bool { - ok := b.exporter.EnqueueExport(r) - if ok { - buf = slices.Clone(buf) - } - return ok - }) - if qLen >= b.batchSize { + ok, recordsInQueue := b.tryDequeue(nil) + if !ok || recordsInQueue >= b.batchSize { // There is another full batch ready. Immediately trigger // another export attempt. select { @@ -173,17 +210,82 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { } } }() + + return done +} + +func (b *BatchProcessor) processRecordsBatches() (done chan struct{}) { + done = make(chan struct{}) + + go func() { + defer close(done) + for chunk := range b.recordsBatches { + err := b.exporter.Export(chunk.ctx, (*chunk.buf)[:chunk.records]) + + b.recordsBufPool.Put(chunk.buf) + + select { + case chunk.respCh <- err: + default: + // e.respCh is nil or busy, default to otel.Handler. + if err != nil { + otel.Handle(err) + } + } + } + }() + return done } +// Tries to write records batch from the queue to records batches channel. +// If success, ok is true and queueLen is number of records remaining in the records queue. +// If failure, ok is false and queueLen value does not have any meaning. +func (b *BatchProcessor) tryDequeue(respCh chan<- error) (ok bool, queueLen int) { + b.recordsBatchesMu.Lock() + defer b.recordsBatchesMu.Unlock() + + if b.recordsBatchesClosed { + if respCh != nil { + respCh <- nil + } + return true, 0 + } + + if len(b.recordsBatches) == cap(b.recordsBatches) { + if respCh != nil { + respCh <- nil + } + return false, 0 + } + + buf := b.recordsBufPool.Get() + + queueLen, n := b.queue.Dequeue(*buf) + if n == 0 { + b.recordsBufPool.Put(buf) + if respCh != nil { + respCh <- nil + } + return true, 0 + } + + data := recordsBatch{ctx: context.Background(), respCh: respCh, buf: buf, records: n} + + // push in sync as available space is guaranteed by len check and mutex + b.recordsBatches <- data + + return true, queueLen +} + // OnEmit batches provided log record. func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error { - if b.stopped.Load() || b.q == nil { + if b.stopped.Load() || b.queue == nil { return nil } // The record is cloned so that changes done by subsequent processors // are not going to lead to a data race. - if n := b.q.Enqueue(r.Clone()); n >= b.batchSize { + if n := b.queue.Enqueue(r.Clone()); n >= b.batchSize { select { case b.pollTrigger <- struct{}{}: default: @@ -197,7 +299,7 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error { // Shutdown flushes queued log records and shuts down the decorated exporter. func (b *BatchProcessor) Shutdown(ctx context.Context) error { - if b.stopped.Swap(true) || b.q == nil { + if b.stopped.Swap(true) || b.queue == nil { return nil } @@ -205,14 +307,29 @@ func (b *BatchProcessor) Shutdown(ctx context.Context) error { close(b.pollKill) select { case <-b.pollDone: + case <-ctx.Done(): + // Out of time. + return errors.Join(ctx.Err(), b.shutdownExporter(ctx)) + } + + err := b.flush(ctx) + + return errors.Join(err, b.shutdownExporter(ctx)) +} + +func (b *BatchProcessor) shutdownExporter(ctx context.Context) error { + b.recordsBatchesMu.Lock() + defer b.recordsBatchesMu.Unlock() + b.recordsBatchesClosed = true + close(b.recordsBatches) + select { + case <-b.recordsBatchesDone: case <-ctx.Done(): // Out of time. return errors.Join(ctx.Err(), b.exporter.Shutdown(ctx)) } - // Flush remaining queued before exporter shutdown. - err := b.exporter.Export(ctx, b.q.Flush()) - return errors.Join(err, b.exporter.Shutdown(ctx)) + return b.exporter.Shutdown(ctx) } var errPartialFlush = errors.New("partial flush: export buffer full") @@ -224,30 +341,40 @@ var ctxErr = func(ctx context.Context) error { // ForceFlush flushes queued log records and flushes the decorated exporter. func (b *BatchProcessor) ForceFlush(ctx context.Context) error { - if b.stopped.Load() || b.q == nil { + if b.stopped.Load() || b.queue == nil { return nil } - buf := make([]Record, b.q.cap) - notFlushed := func() bool { - var flushed bool - _ = b.q.TryDequeue(buf, func(r []Record) bool { - flushed = b.exporter.EnqueueExport(r) - return flushed - }) - return !flushed - } + err := b.flush(ctx) + + return errors.Join(err, b.exporter.ForceFlush(ctx)) +} + +func (b *BatchProcessor) flush(ctx context.Context) error { var err error - // For as long as ctx allows, try to make a single flush of the queue. - for notFlushed() { - // Use ctxErr instead of calling ctx.Err directly so we can test - // the partial error return. - if e := ctxErr(ctx); e != nil { - err = errors.Join(e, errPartialFlush) + for { + respCh := make(chan error, 1) + ok, queueLen := b.tryDequeue(respCh) + + select { + case respErr := <-respCh: + if respErr != nil { + err = errors.Join(respErr, errPartialFlush) + } + case <-ctx.Done(): + err = errors.Join(ctxErr(ctx), errPartialFlush) + } + + if err != nil { + break + } + + if ok && queueLen == 0 { break } } - return errors.Join(err, b.exporter.ForceFlush(ctx)) + + return err } type batchConfig struct { diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index 828337421d1..3c700160e06 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -385,7 +385,7 @@ func TestBatchProcessor(t *testing.T) { require.NoError(t, b.OnEmit(ctx, new(Record))) } assert.Eventually(t, func() bool { - return e.ExportN() > 0 && len(b.exporter.input) == cap(b.exporter.input) + return e.ExportN() > 0 && len(b.recordsBatches) == cap(b.recordsBatches) }, 2*time.Second, time.Microsecond) // 1 export being performed, 1 export in buffer chan, >1 batch // still in queue that an attempt to flush will be made on. @@ -464,7 +464,7 @@ func TestBatchProcessor(t *testing.T) { // Second record will be written to export queue assert.NoError(t, b.OnEmit(ctx, r), "export queue record") require.Eventually(t, func() bool { - return len(b.exporter.input) == cap(b.exporter.input) + return len(b.recordsBatches) == cap(b.recordsBatches) }, 2*time.Second, time.Microsecond, "blocked queue read not attempted") // Third record will be written to BatchProcessor.q diff --git a/sdk/log/batchv2.go b/sdk/log/batchv2.go deleted file mode 100644 index ebc3fee8ce3..00000000000 --- a/sdk/log/batchv2.go +++ /dev/null @@ -1,343 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package log // import "go.opentelemetry.io/otel/sdk/log" - -import ( - "context" - "errors" - "sync" - "sync/atomic" - "time" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/internal/global" -) - -type recordsBufferPool struct { - pool sync.Pool -} - -func newRecordsBufferPool(bufferSize int) *recordsBufferPool { - return &recordsBufferPool{ - pool: sync.Pool{ - New: func() any { - slice := make([]Record, bufferSize) - return &slice - }, - }, - } -} - -func (p *recordsBufferPool) Get() *[]Record { - return p.pool.Get().(*[]Record) -} - -func (p *recordsBufferPool) Put(recordsBuffer *[]Record) { - p.pool.Put(recordsBuffer) -} - -type recordsChunk struct { - ctx context.Context - buf *[]Record - records int - - respCh chan<- error -} - -// Compile-time check BatchProcessor implements Processor. -var _ Processor = (*BatchProcessorV2)(nil) - -// BatchProcessorV2 is a processor that exports batches of log records. -// -// Use [NewBatchProcessorV2] to create a BatchProcessor. An empty BatchProcessor -// is shut down by default, no records will be batched or exported. -type BatchProcessorV2 struct { - // The BatchProcessor is designed to provide the highest throughput of - // log records possible while being compatible with OpenTelemetry. The - // entry point of log records is the OnEmit method. This method is designed - // to receive records as fast as possible while still honoring shutdown - // commands. All records received are enqueued to queue. - // - // In order to block OnEmit as little as possible, a separate "poll" - // goroutine is spawned at the creation of a BatchProcessor. This - // goroutine is responsible for batching the queue at regular polled - // intervals, or when it is directly signaled to. - // - // To keep the polling goroutine from backing up, all batches it makes are - // exported with a bufferedExporter. This exporter allows the poll - // goroutine to enqueue an export payload that will be handled in a - // separate goroutine dedicated to the export. This asynchronous behavior - // allows the poll goroutine to maintain accurate interval polling. - // - // ___BatchProcessor____ __Poll Goroutine__ __Export Goroutine__ - // || || || || || || - // || ********** || || || || ********** || - // || Records=>* OnEmit * || || | - ticker || || * export * || - // || ********** || || | - trigger || || ********** || - // || || || || | || || || || - // || || || || | || || || || - // || __________\/___ || || |*********** || || ______/\_______ || - // || (____queue______)>=||=||===|* batch *===||=||=>[_export_buffer_] || - // || || || |*********** || || || - // ||_____________________|| ||__________________|| ||____________________|| - // - // - // The "release valve" in this processing is the record queue. This queue - // is a ring buffer. It will overwrite the oldest records first when writes - // to OnEmit are made faster than the queue can be flushed. If batches - // cannot be flushed to the export buffer, the records will remain in the - // queue. - - exporter Exporter - - // queue is the active queue of records that have not yet been exported. - queue *queue - // batchSize is the minimum number of records needed before an export is - // triggered (unless the interval expires). - batchSize int - - // pollTrigger triggers the poll goroutine to flush a batch from the queue. - // This is sent to when it is known that the queue contains at least one - // complete batch. - // - // When a send is made to the channel, the poll loop will be reset after - // the flush. If there is still enough records in the queue for another - // batch the reset of the poll loop will automatically re-trigger itself. - // There is no need for the original sender to monitor and resend. - pollTrigger chan struct{} - // pollKill kills the poll goroutine. This is only expected to be closed - // once by the Shutdown method. - pollKill chan struct{} - // pollDone signals the poll goroutine has completed. - pollDone chan struct{} - - // stopped holds the stopped state of the BatchProcessor. - stopped atomic.Bool - - queuedChunks chan recordsChunk - queuedChunksClosed atomic.Bool - queuedChunksMu sync.Mutex - queuedChunksDone chan struct{} - recordsBufPool *recordsBufferPool - - noCmp [0]func() //nolint: unused // This is indeed used. -} - -// NewBatchProcessorV2 decorates the provided exporter -// so that the log records are batched before exporting. -// -// All of the exporter's methods are called synchronously. -func NewBatchProcessorV2(exporter Exporter, opts ...BatchProcessorOption) *BatchProcessorV2 { - cfg := newBatchConfig(opts) - if exporter == nil { - // Do not panic on nil export. - exporter = defaultNoopExporter - } - // Order is important here. Wrap the timeoutExporter with the chunkExporter - // to ensure each export completes in timeout (instead of all chunked - // exports). - exporter = newTimeoutExporter(exporter, cfg.expTimeout.Value) - // Use a chunkExporter to ensure ForceFlush and Shutdown calls are batched - // appropriately on export. - // exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value) - - b := &BatchProcessorV2{ - exporter: exporter, - - queue: newQueue(cfg.maxQSize.Value), - batchSize: cfg.expMaxBatchSize.Value, - pollTrigger: make(chan struct{}, 1), - pollKill: make(chan struct{}), - - queuedChunks: make(chan recordsChunk, cfg.expBufferSize.Value), - recordsBufPool: newRecordsBufferPool(cfg.expMaxBatchSize.Value), - } - b.pollDone = b.poll(cfg.expInterval.Value) - b.queuedChunksDone = b.processQueuedChunks() - return b -} - -// poll spawns a goroutine to handle interval polling and batch exporting. The -// returned done chan is closed when the spawned goroutine completes. -func (b *BatchProcessorV2) poll(interval time.Duration) (done chan struct{}) { - done = make(chan struct{}) - - ticker := time.NewTicker(interval) - go func() { - defer close(done) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - case <-b.pollTrigger: - ticker.Reset(interval) - case <-b.pollKill: - return - } - - if d := b.queue.Dropped(); d > 0 { - global.Warn("dropped log records", "dropped", d) - } - - enqueued, recordsInQueue := b.enqueueExport(nil) - if !enqueued || recordsInQueue >= b.batchSize { - // There is another full batch ready. Immediately trigger - // another export attempt. - select { - case b.pollTrigger <- struct{}{}: - default: - // Another flush signal already received. - } - } - } - }() - - return done -} - -func (b *BatchProcessorV2) processQueuedChunks() (done chan struct{}) { - done = make(chan struct{}) - - go func() { - defer close(done) - for chunk := range b.queuedChunks { - err := b.exporter.Export(chunk.ctx, (*chunk.buf)[:chunk.records]) - - b.recordsBufPool.Put(chunk.buf) - - select { - case chunk.respCh <- err: - default: - // e.respCh is nil or busy, default to otel.Handler. - if err != nil { - otel.Handle(err) - } - } - } - }() - - return done -} - -func (b *BatchProcessorV2) enqueueExport(respCh chan<- error) (enqueued bool, queueLen int) { - b.queuedChunksMu.Lock() - defer b.queuedChunksMu.Unlock() - - if len(b.queuedChunks) == cap(b.queuedChunks) { - if respCh != nil { - respCh <- nil - } - return false, 0 - } - - buf := b.recordsBufPool.Get() - - queueLen, n := b.queue.Dequeue(*buf) - - data := recordsChunk{ctx: context.Background(), respCh: respCh, buf: buf, records: n} - - if b.queuedChunksClosed.Load() { - if respCh != nil { - respCh <- nil - } - return true, 0 - } - - // done in sync as available space is guaranteed by len check - b.queuedChunks <- data - - return true, queueLen -} - -// OnEmit batches provided log record. -func (b *BatchProcessorV2) OnEmit(_ context.Context, r *Record) error { - if b.stopped.Load() || b.queue == nil { - return nil - } - // The record is cloned so that changes done by subsequent processors - // are not going to lead to a data race. - if n := b.queue.Enqueue(r.Clone()); n >= b.batchSize { - select { - case b.pollTrigger <- struct{}{}: - default: - // Flush chan full. The poll goroutine will handle this by - // re-sending any trigger until the queue has less than batchSize - // records. - } - } - return nil -} - -// Shutdown flushes queued log records and shuts down the decorated exporter. -func (b *BatchProcessorV2) Shutdown(ctx context.Context) error { - if b.stopped.Swap(true) || b.queue == nil { - return nil - } - - // Stop the poll goroutine. - close(b.pollKill) - select { - case <-b.pollDone: - case <-ctx.Done(): - // Out of time. - return errors.Join(ctx.Err(), b.exporter.Shutdown(ctx)) - } - - err := b.flush(ctx) - if err != nil { - return errors.Join(err, b.exporter.Shutdown(ctx)) - } - - b.queuedChunksClosed.Store(true) - b.queuedChunksMu.Lock() - defer b.queuedChunksMu.Unlock() - close(b.queuedChunks) - select { - case <-b.queuedChunksDone: - case <-ctx.Done(): - // Out of time. - return errors.Join(ctx.Err(), b.exporter.Shutdown(ctx)) - } - - return errors.Join(err, b.exporter.Shutdown(ctx)) -} - -// ForceFlush flushes queued log records and flushes the decorated exporter. -func (b *BatchProcessorV2) ForceFlush(ctx context.Context) error { - if b.stopped.Load() || b.queue == nil { - return nil - } - - err := b.flush(ctx) - - return errors.Join(err, b.exporter.ForceFlush(ctx)) -} - -func (b *BatchProcessorV2) flush(ctx context.Context) error { - var err error - for { - respCh := make(chan error, 1) - enqueued, queueLen := b.enqueueExport(respCh) - - select { - case respErr := <-respCh: - if respErr != nil { - err = errors.Join(respErr, errPartialFlush) - } - case <-ctx.Done(): - err = errors.Join(ctxErr(ctx), errPartialFlush) - } - - if err != nil { - break - } - - if enqueued && queueLen == 0 { - break - } - } - - return err -} diff --git a/sdk/log/batchv2_test.go b/sdk/log/batchv2_test.go deleted file mode 100644 index 008cfd5a2e0..00000000000 --- a/sdk/log/batchv2_test.go +++ /dev/null @@ -1,381 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package log // import "go.opentelemetry.io/otel/sdk/log" - -import ( - "context" - stdlog "log" - "strings" - "sync" - "testing" - "time" - "unsafe" - - "github.com/go-logr/stdr" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/otel/internal/global" - "go.opentelemetry.io/otel/log" -) - -func TestBatchProcessorV2(t *testing.T) { - ctx := context.Background() - - t.Run("NilExporter", func(t *testing.T) { - assert.NotPanics(t, func() { NewBatchProcessorV2(nil) }) - }) - - t.Run("Polling", func(t *testing.T) { - e := newTestExporter(nil) - const size = 15 - b := NewBatchProcessorV2( - e, - WithMaxQueueSize(2*size), - WithExportMaxBatchSize(2*size), - WithExportInterval(time.Nanosecond), - WithExportTimeout(time.Hour), - ) - for i := 0; i < size; i++ { - assert.NoError(t, b.OnEmit(ctx, new(Record))) - } - var got []Record - assert.Eventually(t, func() bool { - for _, r := range e.Records() { - got = append(got, r...) - } - return len(got) == size - }, 2*time.Second, time.Microsecond) - _ = b.Shutdown(ctx) - }) - - t.Run("OnEmit", func(t *testing.T) { - const batch = 10 - e := newTestExporter(nil) - b := NewBatchProcessorV2( - e, - WithMaxQueueSize(10*batch), - WithExportMaxBatchSize(batch), - WithExportInterval(time.Hour), - WithExportTimeout(time.Hour), - ) - for i := 0; i < 10*batch; i++ { - assert.NoError(t, b.OnEmit(ctx, new(Record))) - } - assert.Eventually(t, func() bool { - return e.ExportN() > 1 - }, 2*time.Second, time.Microsecond, "multi-batch flush") - - assert.NoError(t, b.Shutdown(ctx)) - assert.GreaterOrEqual(t, e.ExportN(), 10) - }) - - t.Run("RetriggerFlushNonBlocking", func(t *testing.T) { - e := newTestExporter(nil) - e.ExportTrigger = make(chan struct{}) - - const batch = 10 - b := NewBatchProcessorV2( - e, - WithMaxQueueSize(3*batch), - WithExportMaxBatchSize(batch), - WithExportInterval(time.Hour), - WithExportTimeout(time.Hour), - ) - for i := 0; i < 2*batch; i++ { - assert.NoError(t, b.OnEmit(ctx, new(Record))) - } - - var n int - require.Eventually(t, func() bool { - n = e.ExportN() - return n > 0 - }, 2*time.Second, time.Microsecond, "blocked export not attempted") - - var err error - require.Eventually(t, func() bool { - err = b.OnEmit(ctx, new(Record)) - return true - }, time.Second, time.Microsecond, "OnEmit blocked") - assert.NoError(t, err) - - e.ExportTrigger <- struct{}{} - assert.Eventually(t, func() bool { - return e.ExportN() > n - }, 2*time.Second, time.Microsecond, "flush not retriggered") - - close(e.ExportTrigger) - assert.NoError(t, b.Shutdown(ctx)) - assert.Equal(t, 3, e.ExportN()) - }) - - t.Run("Shutdown", func(t *testing.T) { - t.Run("Error", func(t *testing.T) { - e := newTestExporter(assert.AnError) - b := NewBatchProcessorV2(e) - assert.ErrorIs(t, b.Shutdown(ctx), assert.AnError, "exporter error not returned") - assert.NoError(t, b.Shutdown(ctx)) - }) - - t.Run("Multiple", func(t *testing.T) { - e := newTestExporter(nil) - b := NewBatchProcessorV2(e) - - const shutdowns = 3 - for i := 0; i < shutdowns; i++ { - assert.NoError(t, b.Shutdown(ctx)) - } - assert.Equal(t, 1, e.ShutdownN(), "exporter Shutdown calls") - }) - - t.Run("OnEmit", func(t *testing.T) { - e := newTestExporter(nil) - b := NewBatchProcessorV2(e) - assert.NoError(t, b.Shutdown(ctx)) - - want := e.ExportN() - assert.NoError(t, b.OnEmit(ctx, new(Record))) - assert.Equal(t, want, e.ExportN(), "Export called after shutdown") - }) - - t.Run("ForceFlush", func(t *testing.T) { - e := newTestExporter(nil) - b := NewBatchProcessorV2(e) - - assert.NoError(t, b.OnEmit(ctx, new(Record))) - assert.NoError(t, b.Shutdown(ctx)) - - assert.NoError(t, b.ForceFlush(ctx)) - assert.Equal(t, 0, e.ForceFlushN(), "ForceFlush called after shutdown") - }) - - t.Run("CanceledContext", func(t *testing.T) { - e := newTestExporter(nil) - e.ExportTrigger = make(chan struct{}) - t.Cleanup(func() { close(e.ExportTrigger) }) - b := NewBatchProcessorV2(e) - - ctx := context.Background() - c, cancel := context.WithCancel(ctx) - cancel() - - assert.ErrorIs(t, b.Shutdown(c), context.Canceled) - }) - }) - - t.Run("ForceFlush", func(t *testing.T) { - t.Run("Flush", func(t *testing.T) { - e := newTestExporter(assert.AnError) - b := NewBatchProcessorV2( - e, - WithMaxQueueSize(100), - WithExportMaxBatchSize(10), - WithExportInterval(time.Hour), - WithExportTimeout(time.Hour), - ) - t.Cleanup(func() { _ = b.Shutdown(ctx) }) - - r := new(Record) - r.SetBody(log.BoolValue(true)) - require.NoError(t, b.OnEmit(ctx, r)) - - assert.ErrorIs(t, b.ForceFlush(ctx), assert.AnError, "exporter error not returned") - assert.Equal(t, 1, e.ForceFlushN(), "exporter ForceFlush calls") - if assert.Equal(t, 1, e.ExportN(), "exporter Export calls") { - got := e.Records() - if assert.Len(t, got[0], 1, "records received") { - assert.Equal(t, *r, got[0][0]) - } - } - }) - - t.Run("ErrorPartialFlush", func(t *testing.T) { - e := newTestExporter(nil) - e.ExportTrigger = make(chan struct{}) - - ctxErrCalled := make(chan struct{}) - orig := ctxErr - ctxErr = func(ctx context.Context) error { - close(ctxErrCalled) - return orig(ctx) - } - t.Cleanup(func() { ctxErr = orig }) - - const batch = 1 - b := NewBatchProcessorV2( - e, - WithMaxQueueSize(10*batch), - WithExportMaxBatchSize(batch), - WithExportInterval(time.Hour), - WithExportTimeout(time.Hour), - ) - - // Enqueue 10 x "batch size" amount of records. - for i := 0; i < 10*batch; i++ { - require.NoError(t, b.OnEmit(ctx, new(Record))) - } - assert.Eventually(t, func() bool { - return e.ExportN() > 0 && len(b.queuedChunks) == cap(b.queuedChunks) - }, 2*time.Second, time.Microsecond) - // 1 export being performed, 1 export in buffer chan, >1 batch - // still in queue that an attempt to flush will be made on. - // - // Stop the poll routine to prevent contention with the queue lock. - // This is outside of "normal" operations, but we are testing if - // ForceFlush will return the correct error when an EnqueueExport - // fails and not if ForceFlush will ever get the queue lock in high - // throughput situations. - close(b.pollDone) - <-b.pollDone - - // Cancel the flush ctx from the start so errPartialFlush is - // returned right away. - fCtx, cancel := context.WithCancel(ctx) - cancel() - - errCh := make(chan error, 1) - go func() { - errCh <- b.ForceFlush(fCtx) - close(errCh) - }() - // Wait for ctxErrCalled to close before closing ExportTrigger so - // we know the errPartialFlush will be returned in ForceFlush. - <-ctxErrCalled - close(e.ExportTrigger) - - err := <-errCh - assert.ErrorIs(t, err, errPartialFlush, "partial flush error") - assert.ErrorIs(t, err, context.Canceled, "ctx canceled error") - }) - - t.Run("CanceledContext", func(t *testing.T) { - e := newTestExporter(nil) - e.ExportTrigger = make(chan struct{}) - b := NewBatchProcessorV2(e) - t.Cleanup(func() { _ = b.Shutdown(ctx) }) - - r := new(Record) - r.SetBody(log.BoolValue(true)) - _ = b.OnEmit(ctx, r) - t.Cleanup(func() { _ = b.Shutdown(ctx) }) - t.Cleanup(func() { close(e.ExportTrigger) }) - - c, cancel := context.WithCancel(ctx) - cancel() - assert.ErrorIs(t, b.ForceFlush(c), context.Canceled) - }) - }) - - t.Run("DroppedLogs", func(t *testing.T) { - orig := global.GetLogger() - t.Cleanup(func() { global.SetLogger(orig) }) - // Use concurrentBuffer for concurrent-safe reading. - buf := new(concurrentBuffer) - stdr.SetVerbosity(1) - global.SetLogger(stdr.New(stdlog.New(buf, "", 0))) - - e := newTestExporter(nil) - e.ExportTrigger = make(chan struct{}) - - b := NewBatchProcessorV2( - e, - WithMaxQueueSize(1), - WithExportMaxBatchSize(1), - WithExportInterval(time.Hour), - WithExportTimeout(time.Hour), - ) - r := new(Record) - // First record will be blocked by testExporter.Export - assert.NoError(t, b.OnEmit(ctx, r), "exported record") - require.Eventually(t, func() bool { - return e.ExportN() > 0 - }, 2*time.Second, time.Microsecond, "blocked export not attempted") - - // Second record will be written to export queue - assert.NoError(t, b.OnEmit(ctx, r), "export queue record") - require.Eventually(t, func() bool { - return len(b.queuedChunks) == cap(b.queuedChunks) - }, 2*time.Second, time.Microsecond, "blocked queue read not attempted") - - // Third record will be written to BatchProcessor.q - assert.NoError(t, b.OnEmit(ctx, r), "first queued") - // The previous record will be dropped, as the new one will be written to BatchProcessor.q - assert.NoError(t, b.OnEmit(ctx, r), "second queued") - - wantMsg := `"level"=1 "msg"="dropped log records" "dropped"=1` - assert.Eventually(t, func() bool { - return strings.Contains(buf.String(), wantMsg) - }, 2*time.Second, time.Microsecond) - - close(e.ExportTrigger) - _ = b.Shutdown(ctx) - }) - - t.Run("ConcurrentSafe", func(t *testing.T) { - const goRoutines = 10 - - e := newTestExporter(nil) - b := NewBatchProcessorV2(e) - - ctx, cancel := context.WithCancel(ctx) - var wg sync.WaitGroup - for i := 0; i < goRoutines-1; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case <-ctx.Done(): - return - default: - assert.NoError(t, b.OnEmit(ctx, new(Record))) - // Ignore partial flush errors. - _ = b.ForceFlush(ctx) - } - } - }() - } - - require.Eventually(t, func() bool { - return e.ExportN() > 0 - }, 2*time.Second, time.Microsecond, "export before shutdown") - - wg.Add(1) - go func() { - defer wg.Done() - assert.NoError(t, b.Shutdown(ctx)) - cancel() - }() - - wg.Wait() - }) -} - -func BenchmarkBatchProcessorV2OnEmit(b *testing.B) { - r := new(Record) - body := log.BoolValue(true) - r.SetBody(body) - - rSize := unsafe.Sizeof(r) + unsafe.Sizeof(body) - ctx := context.Background() - bp := NewBatchProcessorV2( - defaultNoopExporter, - WithMaxQueueSize(b.N+1), - WithExportMaxBatchSize(b.N+1), - WithExportInterval(time.Hour), - WithExportTimeout(time.Hour), - ) - b.Cleanup(func() { _ = bp.Shutdown(ctx) }) - - b.SetBytes(int64(rSize)) - b.ReportAllocs() - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - var err error - for pb.Next() { - err = bp.OnEmit(ctx, r) - } - _ = err - }) -} diff --git a/sdk/log/bench_test.go b/sdk/log/bench_test.go index 74689d7f387..2a11f899e33 100644 --- a/sdk/log/bench_test.go +++ b/sdk/log/bench_test.go @@ -35,6 +35,12 @@ func BenchmarkProcessor(b *testing.B) { return []LoggerProviderOption{WithProcessor(NewSimpleProcessor(noopExporter{}))} }, }, + { + name: "SimpleDelayExporter", + f: func() []LoggerProviderOption { + return []LoggerProviderOption{WithProcessor(NewSimpleProcessor(mockDelayExporter{}))} + }, + }, { name: "Batch", f: func() []LoggerProviderOption { @@ -42,7 +48,7 @@ func BenchmarkProcessor(b *testing.B) { }, }, { - name: "BatchSimulateExport", + name: "BatchDelayExporter", f: func() []LoggerProviderOption { return []LoggerProviderOption{WithProcessor(NewBatchProcessor(mockDelayExporter{}))} }, diff --git a/sdk/log/benchv2_test.go b/sdk/log/benchv2_test.go deleted file mode 100644 index d8590318eda..00000000000 --- a/sdk/log/benchv2_test.go +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package log - -import ( - "context" - "testing" - - "go.opentelemetry.io/otel/log" - - "github.com/stretchr/testify/assert" -) - -func BenchmarkV2Processor(b *testing.B) { - for _, tc := range []struct { - name string - f func() []LoggerProviderOption - }{ - { - name: "Simple", - f: func() []LoggerProviderOption { - return []LoggerProviderOption{WithProcessor(NewSimpleProcessor(noopExporter{}))} - }, - }, - { - name: "Batch", - f: func() []LoggerProviderOption { - return []LoggerProviderOption{WithProcessor(NewBatchProcessorV2(noopExporter{}))} - }, - }, - { - name: "BatchSimulateExport", - f: func() []LoggerProviderOption { - return []LoggerProviderOption{WithProcessor(NewBatchProcessorV2(mockDelayExporter{}))} - }, - }, - { - name: "SetTimestampSimple", - f: func() []LoggerProviderOption { - return []LoggerProviderOption{ - WithProcessor(timestampProcessor{}), - WithProcessor(NewSimpleProcessor(noopExporter{})), - } - }, - }, - { - name: "SetTimestampBatch", - f: func() []LoggerProviderOption { - return []LoggerProviderOption{ - WithProcessor(timestampProcessor{}), - WithProcessor(NewBatchProcessorV2(noopExporter{})), - } - }, - }, - { - name: "AddAttributesSimple", - f: func() []LoggerProviderOption { - return []LoggerProviderOption{ - WithProcessor(attrAddProcessor{}), - WithProcessor(NewSimpleProcessor(noopExporter{})), - } - }, - }, - { - name: "AddAttributesBatch", - f: func() []LoggerProviderOption { - return []LoggerProviderOption{ - WithProcessor(attrAddProcessor{}), - WithProcessor(NewBatchProcessorV2(noopExporter{})), - } - }, - }, - { - name: "SetAttributesSimple", - f: func() []LoggerProviderOption { - return []LoggerProviderOption{ - WithProcessor(attrSetDecorator{}), - WithProcessor(NewSimpleProcessor(noopExporter{})), - } - }, - }, - { - name: "SetAttributesBatch", - f: func() []LoggerProviderOption { - return []LoggerProviderOption{ - WithProcessor(attrSetDecorator{}), - WithProcessor(NewBatchProcessorV2(noopExporter{})), - } - }, - }, - } { - b.Run(tc.name, func(b *testing.B) { - provider := NewLoggerProvider(tc.f()...) - b.Cleanup(func() { assert.NoError(b, provider.Shutdown(context.Background())) }) - logger := provider.Logger(b.Name()) - - b.ReportAllocs() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - r := log.Record{} - r.SetBody(log.StringValue("message")) - r.SetSeverity(log.SeverityInfo) - r.AddAttributes( - log.String("foo", "bar"), - log.Float64("float", 3.14), - log.Int("int", 123), - log.Bool("bool", true), - ) - logger.Emit(context.Background(), r) - } - }) - }) - } -} From ea73c6b5f1ce01b1313f4816736efde3e7f2e974 Mon Sep 17 00:00:00 2001 From: wmdanor Date: Thu, 10 Apr 2025 22:15:46 +0100 Subject: [PATCH 3/3] remove unused exp --- sdk/log/exporter.go | 233 --------------------- sdk/log/exporter_test.go | 437 --------------------------------------- 2 files changed, 670 deletions(-) diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index e4e3c5402bf..fbffac639c8 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -5,13 +5,7 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( "context" - "errors" - "fmt" - "sync" - "sync/atomic" "time" - - "go.opentelemetry.io/otel" ) // Exporter handles the delivery of log records to external receivers. @@ -67,37 +61,6 @@ func (noopExporter) Shutdown(context.Context) error { return nil } func (noopExporter) ForceFlush(context.Context) error { return nil } -// chunkExporter wraps an Exporter's Export method so it is called with -// appropriately sized export payloads. Any payload larger than a defined size -// is chunked into smaller payloads and exported sequentially. -type chunkExporter struct { - Exporter - - // size is the maximum batch size exported. - size int -} - -// newChunkExporter wraps exporter. Calls to the Export will have their records -// payload chunked so they do not exceed size. If size is less than or equal -// to 0, exporter is returned directly. -func newChunkExporter(exporter Exporter, size int) Exporter { - if size <= 0 { - return exporter - } - return &chunkExporter{Exporter: exporter, size: size} -} - -// Export exports records in chunks no larger than c.size. -func (c chunkExporter) Export(ctx context.Context, records []Record) error { - n := len(records) - for i, j := 0, min(c.size, n); i < n; i, j = i+c.size, min(j+c.size, n) { - if err := c.Exporter.Export(ctx, records[i:j]); err != nil { - return err - } - } - return nil -} - // timeoutExporter wraps an Exporter and ensures any call to Export will have a // timeout for the context. type timeoutExporter struct { @@ -123,199 +86,3 @@ func (e *timeoutExporter) Export(ctx context.Context, records []Record) error { defer cancel() return e.Exporter.Export(ctx, records) } - -// exportSync exports all data from input using exporter in a spawned -// goroutine. The returned chan will be closed when the spawned goroutine -// completes. -func exportSync(input <-chan exportData, exporter Exporter) (done chan struct{}) { - done = make(chan struct{}) - go func() { - defer close(done) - for data := range input { - data.DoExport(exporter.Export) - } - }() - return done -} - -// exportData is data related to an export. -type exportData struct { - ctx context.Context - records []Record - - // respCh is the channel any error returned from the export will be sent - // on. If this is nil, and the export error is non-nil, the error will - // passed to the OTel error handler. - respCh chan<- error -} - -// DoExport calls exportFn with the data contained in e. The error response -// will be returned on e's respCh if not nil. The error will be handled by the -// default OTel error handle if it is not nil and respCh is nil or full. -func (e exportData) DoExport(exportFn func(context.Context, []Record) error) { - if len(e.records) == 0 { - e.respond(nil) - return - } - - e.respond(exportFn(e.ctx, e.records)) -} - -func (e exportData) respond(err error) { - select { - case e.respCh <- err: - default: - // e.respCh is nil or busy, default to otel.Handler. - if err != nil { - otel.Handle(err) - } - } -} - -// bufferExporter provides asynchronous and synchronous export functionality by -// buffering export requests. -type bufferExporter struct { - Exporter - - input chan exportData - inputMu sync.Mutex - - done chan struct{} - stopped atomic.Bool -} - -// newBufferExporter returns a new bufferExporter that wraps exporter. The -// returned bufferExporter will buffer at most size number of export requests. -// If size is less than zero, zero will be used (i.e. only synchronous -// exporting will be supported). -func newBufferExporter(exporter Exporter, size int) *bufferExporter { - if size < 0 { - size = 0 - } - input := make(chan exportData, size) - return &bufferExporter{ - Exporter: exporter, - - input: input, - done: exportSync(input, exporter), - } -} - -var errStopped = errors.New("exporter stopped") - -func (e *bufferExporter) enqueue(ctx context.Context, records []Record, rCh chan<- error) error { - data := exportData{ctx, records, rCh} - - e.inputMu.Lock() - defer e.inputMu.Unlock() - - // Check stopped before enqueueing now that e.inputMu is held. This - // prevents sends on a closed chan when Shutdown is called concurrently. - if e.stopped.Load() { - return errStopped - } - - select { - case e.input <- data: - case <-ctx.Done(): - return ctx.Err() - } - return nil -} - -// EnqueueExport enqueues an export of records in the context of ctx to be -// performed asynchronously. This will return true if the records are -// successfully enqueued (or the bufferExporter is shut down), false otherwise. -// -// The passed records are held after this call returns. -func (e *bufferExporter) EnqueueExport(records []Record) bool { - if len(records) == 0 { - // Nothing to enqueue, do not waste input space. - return true - } - - data := exportData{ctx: context.Background(), records: records} - - e.inputMu.Lock() - defer e.inputMu.Unlock() - - // Check stopped before enqueueing now that e.inputMu is held. This - // prevents sends on a closed chan when Shutdown is called concurrently. - if e.stopped.Load() { - return true - } - - select { - case e.input <- data: - return true - default: - return false - } -} - -// Export synchronously exports records in the context of ctx. This will not -// return until the export has been completed. -func (e *bufferExporter) Export(ctx context.Context, records []Record) error { - if len(records) == 0 { - return nil - } - - resp := make(chan error, 1) - err := e.enqueue(ctx, records, resp) - if err != nil { - if errors.Is(err, errStopped) { - return nil - } - return fmt.Errorf("%w: dropping %d records", err, len(records)) - } - - select { - case err := <-resp: - return err - case <-ctx.Done(): - return ctx.Err() - } -} - -// ForceFlush flushes buffered exports. Any existing exports that is buffered -// is flushed before this returns. -func (e *bufferExporter) ForceFlush(ctx context.Context) error { - resp := make(chan error, 1) - err := e.enqueue(ctx, nil, resp) - if err != nil { - if errors.Is(err, errStopped) { - return nil - } - return err - } - - select { - case <-resp: - case <-ctx.Done(): - return ctx.Err() - } - return e.Exporter.ForceFlush(ctx) -} - -// Shutdown shuts down e. -// -// Any buffered exports are flushed before this returns. -// -// All calls to EnqueueExport or Exporter will return nil without any export -// after this is called. -func (e *bufferExporter) Shutdown(ctx context.Context) error { - if e.stopped.Swap(true) { - return nil - } - e.inputMu.Lock() - defer e.inputMu.Unlock() - - // No more sends will be made. - close(e.input) - select { - case <-e.done: - case <-ctx.Done(): - return errors.Join(ctx.Err(), e.Exporter.Shutdown(ctx)) - } - return e.Exporter.Shutdown(ctx) -} diff --git a/sdk/log/exporter_test.go b/sdk/log/exporter_test.go index 25f05832087..987ee43235b 100644 --- a/sdk/log/exporter_test.go +++ b/sdk/log/exporter_test.go @@ -5,8 +5,6 @@ package log import ( "context" - "io" - stdlog "log" "slices" "sync" "sync/atomic" @@ -14,10 +12,6 @@ import ( "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/log" ) type instruction struct { @@ -130,164 +124,6 @@ func (e *testExporter) ForceFlushN() int { return int(atomic.LoadInt32(e.forceFlushN)) } -func TestChunker(t *testing.T) { - t.Run("ZeroSize", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - c := newChunkExporter(exp, 0) - const size = 100 - _ = c.Export(context.Background(), make([]Record, size)) - - assert.Equal(t, 1, exp.ExportN()) - records := exp.Records() - assert.Len(t, records, 1) - assert.Len(t, records[0], size) - }) - - t.Run("ForceFlush", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - c := newChunkExporter(exp, 0) - _ = c.ForceFlush(context.Background()) - assert.Equal(t, 1, exp.ForceFlushN(), "ForceFlush not passed through") - }) - - t.Run("Shutdown", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - c := newChunkExporter(exp, 0) - _ = c.Shutdown(context.Background()) - assert.Equal(t, 1, exp.ShutdownN(), "Shutdown not passed through") - }) - - t.Run("Chunk", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - c := newChunkExporter(exp, 10) - assert.NoError(t, c.Export(context.Background(), make([]Record, 5))) - assert.NoError(t, c.Export(context.Background(), make([]Record, 25))) - - wantLens := []int{5, 10, 10, 5} - records := exp.Records() - require.Len(t, records, len(wantLens), "chunks") - for i, n := range wantLens { - assert.Lenf(t, records[i], n, "chunk %d", i) - } - }) - - t.Run("ExportError", func(t *testing.T) { - exp := newTestExporter(assert.AnError) - t.Cleanup(exp.Stop) - c := newChunkExporter(exp, 0) - ctx := context.Background() - records := make([]Record, 25) - err := c.Export(ctx, records) - assert.ErrorIs(t, err, assert.AnError, "no chunking") - - c = newChunkExporter(exp, 10) - err = c.Export(ctx, records) - assert.ErrorIs(t, err, assert.AnError, "with chunking") - }) -} - -func TestExportSync(t *testing.T) { - eventuallyDone := func(t *testing.T, done chan struct{}) { - assert.Eventually(t, func() bool { - select { - case <-done: - return true - default: - return false - } - }, 2*time.Second, time.Microsecond) - } - - t.Run("ErrorHandler", func(t *testing.T) { - var got error - handler := otel.ErrorHandlerFunc(func(err error) { got = err }) - otel.SetErrorHandler(handler) - t.Cleanup(func() { - l := stdlog.New(io.Discard, "", stdlog.LstdFlags) - otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { - l.Print(err) - })) - }) - - in := make(chan exportData, 1) - exp := newTestExporter(assert.AnError) - t.Cleanup(exp.Stop) - done := exportSync(in, exp) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - - in <- exportData{ - ctx: context.Background(), - records: make([]Record, 1), - } - }() - - wg.Wait() - close(in) - eventuallyDone(t, done) - - assert.ErrorIs(t, got, assert.AnError, "error not passed to ErrorHandler") - }) - - t.Run("ConcurrentSafe", func(t *testing.T) { - in := make(chan exportData, 1) - exp := newTestExporter(assert.AnError) - t.Cleanup(exp.Stop) - done := exportSync(in, exp) - - const goRoutines = 10 - var wg sync.WaitGroup - wg.Add(goRoutines) - for i := 0; i < goRoutines; i++ { - go func(n int) { - defer wg.Done() - - var r Record - r.SetBody(log.IntValue(n)) - - resp := make(chan error, 1) - in <- exportData{ - ctx: context.Background(), - records: []Record{r}, - respCh: resp, - } - - assert.ErrorIs(t, <-resp, assert.AnError) - }(i) - } - - // Empty records should be ignored. - in <- exportData{ctx: context.Background()} - - wg.Wait() - - close(in) - eventuallyDone(t, done) - - assert.Equal(t, goRoutines, exp.ExportN(), "Export calls") - - want := make([]log.Value, goRoutines) - for i := range want { - want[i] = log.IntValue(i) - } - records := exp.Records() - got := make([]log.Value, len(records)) - for i := range got { - if assert.Len(t, records[i], 1, "number of records exported") { - got[i] = records[i][0].Body() - } - } - assert.ElementsMatch(t, want, got, "record bodies") - }) -} - func TestTimeoutExporter(t *testing.T) { t.Run("ZeroTimeout", func(t *testing.T) { exp := newTestExporter(nil) @@ -324,276 +160,3 @@ func TestTimeoutExporter(t *testing.T) { close(out) }) } - -func TestBufferExporter(t *testing.T) { - t.Run("ConcurrentSafe", func(t *testing.T) { - const goRoutines = 10 - - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - e := newBufferExporter(exp, goRoutines) - - ctx := context.Background() - records := make([]Record, 10) - - stop := make(chan struct{}) - var wg sync.WaitGroup - for i := 0; i < goRoutines; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case <-stop: - return - default: - _ = e.EnqueueExport(records) - _ = e.Export(ctx, records) - _ = e.ForceFlush(ctx) - } - } - }() - } - - assert.Eventually(t, func() bool { - return exp.ExportN() > 0 - }, 2*time.Second, time.Microsecond) - - assert.NoError(t, e.Shutdown(ctx)) - close(stop) - wg.Wait() - }) - - t.Run("Shutdown", func(t *testing.T) { - t.Run("Multiple", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - e := newBufferExporter(exp, 1) - - assert.NoError(t, e.Shutdown(context.Background())) - assert.Equal(t, 1, exp.ShutdownN(), "first Shutdown") - - assert.NoError(t, e.Shutdown(context.Background())) - assert.Equal(t, 1, exp.ShutdownN(), "second Shutdown") - }) - - t.Run("ContextCancelled", func(t *testing.T) { - // Discard error logs. - defer func(orig otel.ErrorHandler) { - otel.SetErrorHandler(orig) - }(otel.GetErrorHandler()) - handler := otel.ErrorHandlerFunc(func(err error) {}) - otel.SetErrorHandler(handler) - - exp := newTestExporter(assert.AnError) - t.Cleanup(exp.Stop) - - trigger := make(chan struct{}) - exp.ExportTrigger = trigger - t.Cleanup(func() { close(trigger) }) - e := newBufferExporter(exp, 1) - - // Make sure there is something to flush. - require.True(t, e.EnqueueExport(make([]Record, 1))) - - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - err := e.Shutdown(ctx) - assert.ErrorIs(t, err, context.Canceled) - assert.ErrorIs(t, err, assert.AnError) - }) - - t.Run("Error", func(t *testing.T) { - exp := newTestExporter(assert.AnError) - t.Cleanup(exp.Stop) - - e := newBufferExporter(exp, 1) - assert.ErrorIs(t, e.Shutdown(context.Background()), assert.AnError) - }) - }) - - t.Run("ForceFlush", func(t *testing.T) { - t.Run("Multiple", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - e := newBufferExporter(exp, 2) - - ctx := context.Background() - records := make([]Record, 1) - require.NoError(t, e.enqueue(ctx, records, nil), "enqueue") - - assert.NoError(t, e.ForceFlush(ctx), "ForceFlush records") - assert.Equal(t, 1, exp.ExportN(), "Export number incremented") - assert.Len(t, exp.Records(), 1, "exported Record batches") - - // Nothing to flush. - assert.NoError(t, e.ForceFlush(ctx), "ForceFlush empty") - assert.Equal(t, 1, exp.ExportN(), "Export number changed") - assert.Empty(t, exp.Records(), "exported non-zero Records") - }) - - t.Run("ContextCancelled", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - - trigger := make(chan struct{}) - exp.ExportTrigger = trigger - t.Cleanup(func() { close(trigger) }) - e := newBufferExporter(exp, 1) - - ctx, cancel := context.WithCancel(context.Background()) - require.True(t, e.EnqueueExport(make([]Record, 1))) - - got := make(chan error, 1) - go func() { got <- e.ForceFlush(ctx) }() - require.Eventually(t, func() bool { - return exp.ExportN() > 0 - }, 2*time.Second, time.Microsecond) - cancel() // Canceled before export response. - err := <-got - assert.ErrorIs(t, err, context.Canceled, "enqueued") - _ = e.Shutdown(ctx) - - // Zero length buffer - e = newBufferExporter(exp, 0) - assert.ErrorIs(t, e.ForceFlush(ctx), context.Canceled, "not enqueued") - }) - - t.Run("Error", func(t *testing.T) { - exp := newTestExporter(assert.AnError) - t.Cleanup(exp.Stop) - - e := newBufferExporter(exp, 1) - assert.ErrorIs(t, e.ForceFlush(context.Background()), assert.AnError) - }) - - t.Run("Stopped", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - - e := newBufferExporter(exp, 1) - - ctx := context.Background() - _ = e.Shutdown(ctx) - assert.NoError(t, e.ForceFlush(ctx)) - }) - }) - - t.Run("Export", func(t *testing.T) { - t.Run("ZeroRecords", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - e := newBufferExporter(exp, 1) - - assert.NoError(t, e.Export(context.Background(), nil)) - assert.Equal(t, 0, exp.ExportN()) - }) - - t.Run("Multiple", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - e := newBufferExporter(exp, 1) - - ctx := context.Background() - records := make([]Record, 1) - records[0].SetBody(log.BoolValue(true)) - - assert.NoError(t, e.Export(ctx, records)) - - n := exp.ExportN() - assert.Equal(t, 1, n, "first Export number") - assert.Equal(t, [][]Record{records}, exp.Records()) - - assert.NoError(t, e.Export(ctx, records)) - assert.Equal(t, n+1, exp.ExportN(), "second Export number") - assert.Equal(t, [][]Record{records}, exp.Records()) - }) - - t.Run("ContextCancelled", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - - trigger := make(chan struct{}) - exp.ExportTrigger = trigger - t.Cleanup(func() { close(trigger) }) - e := newBufferExporter(exp, 1) - - records := make([]Record, 1) - ctx, cancel := context.WithCancel(context.Background()) - - got := make(chan error, 1) - go func() { got <- e.Export(ctx, records) }() - require.Eventually(t, func() bool { - return exp.ExportN() > 0 - }, 2*time.Second, time.Microsecond) - cancel() // Canceled before export response. - err := <-got - assert.ErrorIs(t, err, context.Canceled, "enqueued") - _ = e.Shutdown(ctx) - - // Zero length buffer - e = newBufferExporter(exp, 0) - assert.ErrorIs(t, e.Export(ctx, records), context.Canceled, "not enqueued") - }) - - t.Run("Error", func(t *testing.T) { - exp := newTestExporter(assert.AnError) - t.Cleanup(exp.Stop) - - e := newBufferExporter(exp, 1) - ctx, records := context.Background(), make([]Record, 1) - assert.ErrorIs(t, e.Export(ctx, records), assert.AnError) - }) - - t.Run("Stopped", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - - e := newBufferExporter(exp, 1) - - ctx := context.Background() - _ = e.Shutdown(ctx) - assert.NoError(t, e.Export(ctx, make([]Record, 1))) - assert.Equal(t, 0, exp.ExportN(), "Export called") - }) - }) - - t.Run("EnqueueExport", func(t *testing.T) { - t.Run("ZeroRecords", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - e := newBufferExporter(exp, 1) - - assert.True(t, e.EnqueueExport(nil)) - e.ForceFlush(context.Background()) - assert.Equal(t, 0, exp.ExportN(), "empty batch enqueued") - }) - - t.Run("Multiple", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - e := newBufferExporter(exp, 2) - - records := make([]Record, 1) - records[0].SetBody(log.BoolValue(true)) - - assert.True(t, e.EnqueueExport(records)) - assert.True(t, e.EnqueueExport(records)) - e.ForceFlush(context.Background()) - - n := exp.ExportN() - assert.Equal(t, 2, n, "Export number") - assert.Equal(t, [][]Record{records, records}, exp.Records()) - }) - - t.Run("Stopped", func(t *testing.T) { - exp := newTestExporter(nil) - t.Cleanup(exp.Stop) - e := newBufferExporter(exp, 1) - - _ = e.Shutdown(context.Background()) - assert.True(t, e.EnqueueExport(make([]Record, 1))) - }) - }) -}