Skip to content

Commit

Permalink
batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Dec 1, 2023
1 parent 6b26a1d commit 46e6316
Show file tree
Hide file tree
Showing 7 changed files with 684 additions and 22 deletions.
25 changes: 25 additions & 0 deletions .chloggen/batch-exporter-helper.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add experimental batching capabilities to the exporter helper

# One or more tracking issues or pull requests related to the change
issues: [8122]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
319 changes: 319 additions & 0 deletions exporter/exporterhelper/batch_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"

import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
)

// MergeBatcherConfig defines a basic configuration for batching requests based on a timeout and a minimum number of
// items.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type MergeBatcherConfig struct {
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
Enabled bool `mapstructure:"enabled"`

// Timeout sets the time after which a batch will be sent regardless of its size.
// When this is set to zero, data will be sent immediately.
// This is a recommended option, as it will ensure that the data is sent in a timely manner.
Timeout time.Duration `mapstructure:"timeout"` // Is there a better name to avoid confusion with the consumerSender timeout?

// MinSizeItems is the number of items (spans, data points or log records for OTLP) at which the batch should be
// sent regardless of the timeout. There is no guarantee that the batch size always greater than this value.
// This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored.
MinSizeItems int `mapstructure:"min_size_items"`
}

func (c MergeBatcherConfig) Validate() error {
if c.MinSizeItems < 0 {
return errors.New("min_size_items must be greater than or equal to zero")
}
if c.Timeout <= 0 {
return errors.New("timeout must be greater than zero")
}
return nil
}

func NewDefaultMergeBatcherConfig() MergeBatcherConfig {
return MergeBatcherConfig{
Enabled: true,
Timeout: 200 * time.Millisecond,
MinSizeItems: 8192,
}
}

// SplitBatcherConfig defines batching configuration for merging or splitting requests based on a timeout and
// minimum and maximum number of items.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type SplitBatcherConfig struct {
// MaxSizeItems is the maximum number of the batch items, i.e. spans, data points or log records for OTLP,
// but can be anything else for other formats. If the batch size exceeds this value,
// it will be broken up into smaller batches if possible.
// Setting this value to zero disables the maximum size limit.
MaxSizeItems int `mapstructure:"max_size_items"`
}

func (c SplitBatcherConfig) Validate() error {
if c.MaxSizeItems < 0 {
return errors.New("max_size_items must be greater than or equal to zero")
}
return nil
}

// MergeSplitBatcherConfig defines batching configuration for merging or splitting requests based on a timeout and
// minimum and maximum number of items.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type MergeSplitBatcherConfig struct {
MergeBatcherConfig `mapstructure:",squash"`
SplitBatcherConfig `mapstructure:",squash"`
}

func (c MergeSplitBatcherConfig) Validate() error {
if c.MaxSizeItems < c.MinSizeItems {
return errors.New("max_size_items must be greater than or equal to min_size_items")
}
return nil
}

// BatchConfigBatchersLimit defines batching configuration part for setting a maximum number of batchers.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type BatchConfigBatchersLimit struct {
// BatchersLimit is the maximum number of batchers that can be used for batching.
// Requests producing batch identifiers that exceed this limit will be dropped.
// If this value is zero, then there is no limit on the number of batchers.
BatchersLimit int `mapstructure:"batchers_limit"`
}

// BatchMergeFunc is a function that merges two requests into a single request.
// Context will be propagated from the first request. If you want to separate batches based on the context,
// use WithRequestBatchIdentifier option. Context will be propagated from the first request.
// Do not mutate the requests passed to the function if error can be returned after mutation.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type BatchMergeFunc func(context.Context, Request, Request) (Request, error)

// BatchMergeSplitFunc is a function that merge and/or splits a request into multiple requests based on the provided
// limit of maximum number of items. All the returned requests MUST have a number of items that does not exceed the
// maximum number of items. Size of the last returned request MUST be less or equal than the size of any other returned
// request. The original request MUST not be mutated if error is returned. The length of the returned slice MUST not
// be 0. The optionalReq argument can be nil, make sure to check it before using. maxItems argument is guaranteed to be
// greater than 0. Context will be propagated from the original request.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type BatchMergeSplitFunc func(ctx context.Context, optionalReq Request, req Request, maxItems int) ([]Request, error)

type BatcherOption func(*batchSender)

func WithSplitBatcher(cfg SplitBatcherConfig, msf BatchMergeSplitFunc) BatcherOption {
return func(b *batchSender) {
if cfg.MaxSizeItems != 0 {
b.splitCfg = cfg
b.mergeSplitFunc = msf
}
}
}

// batchSender is a component that accepts places requests into batches before passing them to the downstream senders.
//
// batch_processor implements consumer.Traces and consumer.Metrics
//
// Batches are sent out with any of the following conditions:
// - batch size reaches cfg.SendBatchSize
// - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out.
type batchSender struct {
baseRequestSender
mergeCfg MergeBatcherConfig
splitCfg SplitBatcherConfig
mergeFunc BatchMergeFunc
mergeSplitFunc BatchMergeSplitFunc

// concurrencyLimit is the maximum number of goroutines that can be created by the batcher.
// If this number is reached and all the goroutines are busy, the batch will be sent right away.
// Populated from the number of queue consumers if queue is enabled.
concurrencyLimit int
activeRequestsWG sync.WaitGroup

resetTimerCh chan struct{}

mu sync.Mutex
activeBatch *batch

logger *zap.Logger

shutdownCh chan struct{}
stopped *atomic.Bool
}

// newBatchSender returns a new batch consumer component.
func newBatchSender(cfg MergeBatcherConfig, set exporter.CreateSettings, mf BatchMergeFunc, opts ...BatcherOption) requestSender {
bs := &batchSender{
activeBatch: newEmptyBatch(),
mergeCfg: cfg,
mergeFunc: mf,
logger: set.Logger,
shutdownCh: make(chan struct{}),
stopped: &atomic.Bool{},
resetTimerCh: make(chan struct{}),
}

for _, op := range opts {
op(bs)
}
return bs
}

func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
timer := time.NewTimer(bs.mergeCfg.Timeout)
go func() {
for {
select {
case <-bs.shutdownCh:
bs.mu.Lock()
if bs.activeBatch.request != nil {
bs.exportActiveBatch()
}

Check warning on line 188 in exporter/exporterhelper/batch_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/batch_sender.go#L187-L188

Added lines #L187 - L188 were not covered by tests
bs.mu.Unlock()
return
case <-timer.C:
bs.mu.Lock()
if bs.activeBatch.request != nil {
bs.exportActiveBatch()
}
bs.mu.Unlock()
timer.Reset(bs.mergeCfg.Timeout)
case <-bs.resetTimerCh:
if !timer.Stop() {
<-timer.C
}

Check warning on line 201 in exporter/exporterhelper/batch_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/batch_sender.go#L200-L201

Added lines #L200 - L201 were not covered by tests
timer.Reset(bs.mergeCfg.Timeout)
}
}
}()

return nil
}

type batch struct {
ctx context.Context
request Request
done chan struct{}
err error
}

func newEmptyBatch() *batch {
return &batch{
ctx: context.Background(),
done: make(chan struct{}),
}
}

// Caller must hold the lock.
func (bs *batchSender) exportActiveBatch() {
go func(b *batch) {
b.err = b.request.Export(b.ctx)
close(b.done)
}(bs.activeBatch)
bs.activeBatch = newEmptyBatch()
}

func (bs *batchSender) send(ctx context.Context, req Request) error {
if bs.stopped.Load() {
return errors.New("batchSender is stopped")
}

Check warning on line 236 in exporter/exporterhelper/batch_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/batch_sender.go#L235-L236

Added lines #L235 - L236 were not covered by tests

bs.activeRequestsWG.Add(1)
defer bs.activeRequestsWG.Done()

if bs.mergeSplitFunc != nil {
return bs.sendMergeSplitBatch(ctx, req)
}
return bs.sendMergeBatch(ctx, req)
}

func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) error {
bs.mu.Lock()

reqs, err := bs.mergeSplitFunc(ctx, bs.activeBatch.request, req, bs.splitCfg.MaxSizeItems)
if err != nil || len(reqs) == 0 {
bs.mu.Unlock()
return err
}
if len(reqs) == 1 || bs.activeBatch.request != nil {
bs.activeBatch.request = reqs[0]
bs.activeBatch.ctx = ctx
batch := bs.activeBatch
var sent bool
if batch.request.ItemsCount() >= bs.mergeCfg.MinSizeItems || len(reqs) > 1 {
bs.exportActiveBatch()
sent = true
}
bs.mu.Unlock()
if sent {
bs.resetTimerCh <- struct{}{}
}
<-batch.done
if batch.err != nil {
return batch.err
}

Check warning on line 271 in exporter/exporterhelper/batch_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/batch_sender.go#L270-L271

Added lines #L270 - L271 were not covered by tests
reqs = reqs[1:]
} else {
bs.mu.Unlock()
}

// Intentionally do not put the last request in the active batch to not block it.
// TODO: Consider including the partial request in the error to avoid double publishing.
for _, r := range reqs {
if err := r.Export(ctx); err != nil {
return err
}

Check warning on line 282 in exporter/exporterhelper/batch_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/batch_sender.go#L281-L282

Added lines #L281 - L282 were not covered by tests
}
return nil
}

func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error {
bs.mu.Lock()

if bs.activeBatch.request != nil {
var err error
req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req)
if err != nil {
bs.mu.Unlock()
return err
}
}
bs.activeBatch.request = req
bs.activeBatch.ctx = ctx
batch := bs.activeBatch
var sent bool
if bs.activeBatch.request.ItemsCount() >= bs.mergeCfg.MinSizeItems {
bs.exportActiveBatch()
sent = true
}
bs.mu.Unlock()
if sent {
bs.resetTimerCh <- struct{}{}
}
<-batch.done
return batch.err
}

func (bs *batchSender) Shutdown(context.Context) error {
bs.stopped.Store(true)
close(bs.shutdownCh)
bs.activeRequestsWG.Wait()
return nil
}
Loading

0 comments on commit 46e6316

Please sign in to comment.