Skip to content

[POC] Refactor Logs SDK Batch Processor #6673

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

wmdanor
Copy link
Contributor

@wmdanor wmdanor commented Apr 17, 2025

Implements idea from #6569 (comment).

Refactors Batch Processor to unify logic from bufferExporter and chunkExporter inside itself.

Benchmarks

go test -run=^$ '-bench=BenchmarkProcessor/.*Batch' -benchmem -count=10 > xxx.txt
benchstat base.txt new.txt
goos: linux
goarch: amd64
pkg: go.opentelemetry.io/otel/sdk/log
cpu: AMD Ryzen 9 7900X 12-Core Processor            
                                │   base.txt   │               new.txt               │
                                │    sec/op    │   sec/op     vs base                │
Processor/Batch-24                623.3n ±  7%   487.6n ± 5%  -21.77% (p=0.000 n=10)
Processor/BatchDelayExporter-24   492.1n ± 10%   586.3n ± 4%  +19.15% (p=0.000 n=10)
Processor/SetTimestampBatch-24    653.0n ±  4%   463.8n ± 5%  -28.97% (p=0.000 n=10)
Processor/AddAttributesBatch-24   636.4n ±  5%   514.6n ± 8%  -19.13% (p=0.000 n=10)
Processor/SetAttributesBatch-24   584.0n ± 12%   434.4n ± 3%  -25.62% (p=0.000 n=10)
geomean                           594.8n         494.7n       -16.82%

                                │  base.txt   │              new.txt               │
                                │    B/op     │    B/op     vs base                │
Processor/Batch-24                1154.0 ± 3%   617.5 ± 2%  -46.49% (p=0.000 n=10)
Processor/BatchDelayExporter-24    471.5 ± 1%   492.0 ± 0%   +4.35% (p=0.000 n=10)
Processor/SetTimestampBatch-24    1157.0 ± 1%   606.5 ± 1%  -47.58% (p=0.000 n=10)
Processor/AddAttributesBatch-24   1169.0 ± 1%   612.0 ± 1%  -47.65% (p=0.000 n=10)
Processor/SetAttributesBatch-24   1154.5 ± 2%   611.5 ± 1%  -47.03% (p=0.000 n=10)
geomean                            967.9        585.8       -39.48%

                                │  base.txt  │               new.txt               │
                                │ allocs/op  │ allocs/op   vs base                 │
Processor/Batch-24                1.000 ± 0%   1.000 ± 0%       ~ (p=1.000 n=10) ¹
Processor/BatchDelayExporter-24   1.000 ± 0%   1.000 ± 0%       ~ (p=1.000 n=10) ¹
Processor/SetTimestampBatch-24    1.000 ± 0%   1.000 ± 0%       ~ (p=1.000 n=10) ¹
Processor/AddAttributesBatch-24   1.000 ± 0%   1.000 ± 0%       ~ (p=1.000 n=10) ¹
Processor/SetAttributesBatch-24   1.000 ± 0%   1.000 ± 0%       ~ (p=1.000 n=10) ¹
geomean                           1.000        1.000       +0.00%
¹ all samples are equal

@pellared
Copy link
Member

Can you resolve the conflicts?

@pellared
Copy link
Member

Sorry for delay. I hope I will review it next week

@wmdanor
Copy link
Contributor Author

wmdanor commented May 19, 2025

Hey @pellared , do you think you will be able to take a look soon?

@pellared
Copy link
Member

Hey @pellared , do you think you will be able to take a look soon?

I hope so, it is still on my radar. Sorry for the delay. We are working on a new release which is also delayed 😬

return done
}

func (b *BatchProcessor) processRecordsBatches() (done chan struct{}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you able to update this description:

// The BatchProcessor is designed to provide the highest throughput of
// log records possible while being compatible with OpenTelemetry. The
// entry point of log records is the OnEmit method. This method is designed
// to receive records as fast as possible while still honoring shutdown
// commands. All records received are enqueued to queue.
//
// In order to block OnEmit as little as possible, a separate "poll"
// goroutine is spawned at the creation of a BatchProcessor. This
// goroutine is responsible for batching the queue at regular polled
// intervals, or when it is directly signaled to.
//
// To keep the polling goroutine from backing up, all batches it makes are
// exported with a bufferedExporter. This exporter allows the poll
// goroutine to enqueue an export payload that will be handled in a
// separate goroutine dedicated to the export. This asynchronous behavior
// allows the poll goroutine to maintain accurate interval polling.
//
// ___BatchProcessor____ __Poll Goroutine__ __Export Goroutine__
// || || || || || ||
// || ********** || || || || ********** ||
// || Records=>* OnEmit * || || | - ticker || || * export * ||
// || ********** || || | - trigger || || ********** ||
// || || || || | || || || ||
// || || || || | || || || ||
// || __________\/___ || || |*********** || || ______/\_______ ||
// || (____queue______)>=||=||===|* batch *===||=||=>[_export_buffer_] ||
// || || || |*********** || || ||
// ||_____________________|| ||__________________|| ||____________________||
//
//
// The "release valve" in this processing is the record queue. This queue
// is a ring buffer. It will overwrite the oldest records first when writes
// to OnEmit are made faster than the queue can be flushed. If batches
// cannot be flushed to the export buffer, the records will remain in the
// queue.
?

I think it would be helpful in understanding the changes in this PR.

Also according to current description, I think it would be better to rename this to export (as we have an Export Goroutine).

return done
}

// Tries to write records batch from the queue to records batches channel.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Tries to write records batch from the queue to records batches channel.
// tryDequeue tries to write records batch from the queue to records batches channel.

// Tries to write records batch from the queue to records batches channel.
// If success, ok is true and queueLen is number of records remaining in the records queue.
// If failure, ok is false and queueLen value does not have any meaning.
func (b *BatchProcessor) tryDequeue(respCh chan<- error) (ok bool, queueLen int) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok bool should be the second result parameter

Suggested change
func (b *BatchProcessor) tryDequeue(respCh chan<- error) (ok bool, queueLen int) {
func (b *BatchProcessor) tryDequeue(respCh chan<- error) (queueLen int, ok bool) {

Comment on lines +134 to +136
recordsBatches chan recordsBatch
recordsBatchesClosed bool
recordsBatchesMu sync.Mutex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something is wrong here with how chan recordsBatch is used.
We should not have an additional sync.Mutex.
We should never close the channel from a different gorountine then the one that is writting to the channel.
This is known as "The Channel Closing Principle" (source: https://go101.org/article/channel-closing.html):

One general principle of using Go channels is don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders. In other words, we should only close a channel in a sender goroutine if the sender is the only sender of the channel.

//
// When write is called the lock of q is held. The write function must not call
// other methods of this q that acquire the lock.
func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused

return q.len
}

func (q *queue) Dequeue(buf []Record) (queueLen, written int) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing comment. E.g.

// Dequeue removes up to len(buf) records from the queue and writes them into
// buf. The number of records remaining in the queue is returned.
// The number of records written to buf is returned.


// Flush returns all the Records held in the queue and resets it to be
// empty.
func (q *queue) Flush() []Record {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused

func (q *queue) Dropped() uint64 {
return q.dropped.Swap(0)
}
err := b.flush(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think on high-load scenario this may almost never exit. Previously it tried to flush one export. Now it tries to drain everything (until the queue is empty).

Comment on lines +262 to +271
buf := b.recordsBufPool.Get()

queueLen, n := b.queue.Dequeue(*buf)
if n == 0 {
b.recordsBufPool.Put(buf)
if respCh != nil {
respCh <- nil
}
return true, 0
}
Copy link
Member

@pellared pellared May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still not addressing my idea behind

Maybe the goroutine responsible for actually exporting the log records should be also responsible for getting/dequeuing the log records from the queue instead of doing it here?

My idea was that here we only signal that we want to have the exporter goroutine (processRecordsBatches) to try to export. The exporter goroutine could dequeue by itself. I do not think we need a sync.Pool for batches as we have only one exporting goroutine.

The idea is also to get rid of something like len(b.recordsBatches) == cap(b.recordsBatches) which looks like a code smell.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback, I'm going to work on addressing it and will let you know once I have some updates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was that here we only signal that we want to have the exporter goroutine (processRecordsBatches) to try to export. The exporter goroutine could dequeue by itself. I do not think we need a sync.Pool for batches as we have only one exporting goroutine.

If I understand correctly, using pool for preserving functionality of WithExportBufferSize option would be useful to prevent unneeded memory allocations. Current implementation has a buffer for export batches (by default 1, but can be set higher), which lets export batches sit in memory and wait until they will be exported, this gives additional dampening mechanism in high load scenario to prevent losing log records (this functionality is provided by buffer exporter, logic of which is merged into this PRs batch processor). Current implementation created new buffer every time data was sent for export, but has a comment to explore using pool.

On another point, fully moving deque to processRecordsBatches with intention of preserving records batches buffering would require creating another goroutine for handling those bufferized batches. In this scenario current processRecordsBatches would only be responsible for dequeue and sending records batch to channel with bufferized records batches channel. I am not sure I can see big advantage in doing that.

Please correct me if I misunderstood something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about having storing a single batch variable inside processRecordsBatches.
PS. I am not sure if my idea is viable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants