Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter][batching] Serialized bytes based batching for logs #12299

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
25 changes: 25 additions & 0 deletions .chloggen/exporter-batcher-bytes-based-batching.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support bytes based baching for logs.

# One or more tracking issues or pull requests related to the change
issues: [3262]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
6 changes: 6 additions & 0 deletions exporter/exporterbatcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type MaxSizeConfig struct {
// If the batch size exceeds this value, it will be broken up into smaller batches if possible.
// Setting this value to zero disables the maximum size limit.
MaxSizeItems int `mapstructure:"max_size_items"`

// MaxSizeBytes is the maximum serialized byte size of the batch.
MaxSizeBytes int `mapstructure:"max_size_bytes"`
}

func (c Config) Validate() error {
Expand All @@ -56,6 +59,9 @@ func (c Config) Validate() error {
if c.FlushTimeout <= 0 {
return errors.New("timeout must be greater than zero")
}
if c.MaxSizeBytes != 0 {
return errors.New("max_size_bytes is not yet supported")
}
return nil
}

Expand Down
7 changes: 5 additions & 2 deletions exporter/exporterbatcher/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package exporterbatcher
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -27,5 +26,9 @@ func TestConfig_Validate(t *testing.T) {
cfg = NewDefaultConfig()
cfg.MaxSizeItems = 20000
cfg.MinSizeItems = 20001
assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to min_size_items")
require.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to min_size_items")

cfg = NewDefaultConfig()
cfg.MaxSizeBytes = 1
require.EqualError(t, cfg.Validate(), "max_size_bytes is not yet supported")
}
10 changes: 10 additions & 0 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ type logsRequest struct {
ld plog.Logs
pusher consumer.ConsumeLogsFunc
cachedItemsCount int
cachedByteSize int
}

func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request {
return &logsRequest{
ld: ld,
pusher: pusher,
cachedItemsCount: ld.LogRecordCount(),
cachedByteSize: logsMarshaler.LogsSize(ld),
}
}

Expand Down Expand Up @@ -72,6 +74,14 @@ func (req *logsRequest) setCachedItemsCount(count int) {
req.cachedItemsCount = count
}

func (req *logsRequest) ByteSize() int {
return req.cachedByteSize
}

func (req *logsRequest) setCachedByteSize(size int) {
req.cachedByteSize = size
}

type logsExporter struct {
*internal.BaseExporter
consumer.Logs
Expand Down
108 changes: 102 additions & 6 deletions exporter/exporterhelper/logs_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte
import (
"context"
"errors"
math_bits "math/bits"

"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -23,7 +24,7 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz
}

// If no limit we can simply merge the new request into the current and return.
if cfg.MaxSizeItems == 0 {
if cfg.MaxSizeItems == 0 && cfg.MaxSizeBytes == 0 {
return []Request{req}, nil
}
return req.split(cfg)
Expand All @@ -32,16 +33,27 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz
func (req *logsRequest) mergeTo(dst *logsRequest) {
dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount())
req.setCachedItemsCount(0)
dst.setCachedByteSize(dst.ByteSize() + req.ByteSize())
req.setCachedByteSize(0)
req.ld.ResourceLogs().MoveAndAppendTo(dst.ld.ResourceLogs())
}

func (req *logsRequest) split(cfg exporterbatcher.MaxSizeConfig) ([]Request, error) {
var res []Request
for req.ItemsCount() > cfg.MaxSizeItems {
ld := extractLogs(req.ld, cfg.MaxSizeItems)
size := ld.LogRecordCount()
req.setCachedItemsCount(req.ItemsCount() - size)
res = append(res, &logsRequest{ld: ld, pusher: req.pusher, cachedItemsCount: size})
if cfg.MaxSizeItems != 0 {
for req.ItemsCount() > cfg.MaxSizeItems {
ld := extractLogs(req.ld, cfg.MaxSizeItems)
size := ld.LogRecordCount()
req.setCachedItemsCount(req.ItemsCount() - size)
res = append(res, &logsRequest{ld: ld, pusher: req.pusher, cachedItemsCount: size})
}
} else if cfg.MaxSizeBytes != 0 {
for req.ByteSize() > cfg.MaxSizeBytes {
ld := extractLogsBasedOnByteSize(req.ld, cfg.MaxSizeBytes)
size := logsMarshaler.LogsSize(ld)
req.setCachedByteSize(req.ByteSize() - size)
res = append(res, &logsRequest{ld: ld, pusher: req.pusher, cachedByteSize: size})
}
}
res = append(res, req)
return res, nil
Expand Down Expand Up @@ -109,3 +121,87 @@ func resourceLogsCount(rl plog.ResourceLogs) int {
}
return count
}

// extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records.
func extractLogsBasedOnByteSize(srcLogs plog.Logs, capacity int) plog.Logs {
capacityReached := false
destLogs := plog.NewLogs()
capacityLeft := capacity - logsMarshaler.LogsSize(destLogs)
srcLogs.ResourceLogs().RemoveIf(func(srcRL plog.ResourceLogs) bool {
if capacityReached {
return false
}
needToExtract := logsMarshaler.ResourceLogsSize(srcRL) > capacityLeft
if needToExtract {
srcRL, capacityReached = extractResourceLogsBasedOnByteSize(srcRL, capacityLeft)
if srcRL.ScopeLogs().Len() == 0 {
return false
}
}
capacityLeft -= deltaCapacity(logsMarshaler.ResourceLogsSize(srcRL))
srcRL.MoveTo(destLogs.ResourceLogs().AppendEmpty())
return !needToExtract
})
return destLogs
}

// extractResourceLogs extracts resource logs and returns a new resource logs with the specified number of log records.
func extractResourceLogsBasedOnByteSize(srcRL plog.ResourceLogs, capacity int) (plog.ResourceLogs, bool) {
capacityReached := false
destRL := plog.NewResourceLogs()
destRL.SetSchemaUrl(srcRL.SchemaUrl())
srcRL.Resource().CopyTo(destRL.Resource())
capacityLeft := capacity - logsMarshaler.ResourceLogsSize(destRL)
srcRL.ScopeLogs().RemoveIf(func(srcSL plog.ScopeLogs) bool {
if capacityReached {
return false
}
needToExtract := logsMarshaler.ScopeLogsSize(srcSL) > capacityLeft
if needToExtract {
srcSL, capacityReached = extractScopeLogsBasedOnByteSize(srcSL, capacityLeft)
if srcSL.LogRecords().Len() == 0 {
return false
}
}

capacityLeft -= deltaCapacity(logsMarshaler.ScopeLogsSize(srcSL))
srcSL.MoveTo(destRL.ScopeLogs().AppendEmpty())
return !needToExtract
})
return destRL, capacityReached
}

// extractScopeLogs extracts scope logs and returns a new scope logs with the specified number of log records.
func extractScopeLogsBasedOnByteSize(srcSL plog.ScopeLogs, capacity int) (plog.ScopeLogs, bool) {
capacityReached := false
destSL := plog.NewScopeLogs()
destSL.SetSchemaUrl(srcSL.SchemaUrl())
srcSL.Scope().CopyTo(destSL.Scope())
capacityLeft := capacity - logsMarshaler.ScopeLogsSize(destSL)

srcSL.LogRecords().RemoveIf(func(srcLR plog.LogRecord) bool {
if capacityReached || logsMarshaler.LogRecordSize(srcLR) > capacityLeft {
capacityReached = true
return false
}
capacityLeft -= deltaCapacity(logsMarshaler.LogRecordSize(srcLR))
srcLR.MoveTo(destSL.LogRecords().AppendEmpty())
return true
})
return destSL, capacityReached
}

// deltaCapacity() returns the delta size of a proto slice when a new item is added.
// Example:
//
// prevSize := proto1.Size()
// proto1.RepeatedField().AppendEmpty() = proto2
//
// Then currSize of proto1 can be calculated as
//
// currSize := (prevSize + deltaCapacity(proto2.Size()))
//
// This is derived from gogo/protobuf 's Size() function
func deltaCapacity(newItemSize int) int {
return 1 + newItemSize + math_bits.Len64(uint64(newItemSize|1)+6)/7 //nolint:gosec // disable G115
}
Loading
Loading