Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter][batching] Serialized bytes based batching for logs #12299

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
6 changes: 6 additions & 0 deletions exporter/exporterbatcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type MaxSizeConfig struct {
// If the batch size exceeds this value, it will be broken up into smaller batches if possible.
// Setting this value to zero disables the maximum size limit.
MaxSizeItems int `mapstructure:"max_size_items"`

// MaxSizeBytes is the maximum serialized byte size of the batch.
MaxSizeBytes int `mapstructure:"max_size_bytes"`
}

func (c Config) Validate() error {
Expand All @@ -56,6 +59,9 @@ func (c Config) Validate() error {
if c.FlushTimeout <= 0 {
return errors.New("timeout must be greater than zero")
}
if c.MaxSizeBytes != 0 {
return errors.New("max_size_bytes is not yet supported")
}
return nil
}

Expand Down
7 changes: 5 additions & 2 deletions exporter/exporterbatcher/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package exporterbatcher
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -27,5 +26,9 @@ func TestConfig_Validate(t *testing.T) {
cfg = NewDefaultConfig()
cfg.MaxSizeItems = 20000
cfg.MinSizeItems = 20001
assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to min_size_items")
require.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to min_size_items")

cfg = NewDefaultConfig()
cfg.MaxSizeBytes = 1
require.EqualError(t, cfg.Validate(), "max_size_bytes is not yet supported")
}
67 changes: 67 additions & 0 deletions exporter/exporterhelper/internal/sizer/logs_sizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"

import (
math_bits "math/bits"

"go.opentelemetry.io/collector/pdata/plog"
)

type LogsSizer interface {
LogsSize(ld plog.Logs) int
ResourceLogsSize(rl plog.ResourceLogs) int
ScopeLogsSize(sl plog.ScopeLogs) int
LogRecordSize(lr plog.LogRecord) int

// DeltaCapacity() returns the delta size when a ResourceLog, ScopeLog or LogRecord is added.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

	// DeltaSize() returns the delta size when a ResourceLog, ScopeLog or LogRecord is added.

DeltaSize(newItemSize int) int
}

// LogsByteSizer returns the byte size of serialized protos.
type LogsBytesSizer struct {
plog.ProtoMarshaler
}

// DeltaCapacity() returns the delta size of a proto slice when a new item is added.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// DeltaCapacity() returns the delta size of a proto slice when a new item is added.
// DeltaSize() returns the delta size of a proto slice when a new item is added.

// Example:
//
// prevSize := proto1.Size()
// proto1.RepeatedField().AppendEmpty() = proto2
//
// Then currSize of proto1 can be calculated as
//
// currSize := (prevSize + deltaCapacity(proto2.Size()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// currSize := (prevSize + deltaCapacity(proto2.Size()))
// currSize := (prevSize + sizer.DeltaSize(proto2.Size()))

//
// This is derived from gogo/protobuf 's Size() function
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a link to the Size() function here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and you might add that the constant 1 below is specialized for OTLP; it relies on protobuf tags < 16 (which are expressed w/ a single byte of overhead).

func (s *LogsBytesSizer) DeltaSize(newItemSize int) int {
return 1 + newItemSize + math_bits.Len64(uint64(newItemSize|1)+6)/7 //nolint:gosec // disable G115
}

// LogsCountSizer returns the nunmber of logs entries.
type LogsCountSizer struct{}

func (s *LogsCountSizer) LogsSize(ld plog.Logs) int {
return ld.LogRecordCount()
}

func (s *LogsCountSizer) ResourceLogsSize(rl plog.ResourceLogs) int {
count := 0
for k := 0; k < rl.ScopeLogs().Len(); k++ {
count += rl.ScopeLogs().At(k).LogRecords().Len()
}
return count
}

func (s *LogsCountSizer) ScopeLogsSize(sl plog.ScopeLogs) int {
return sl.LogRecords().Len()
}

func (s *LogsCountSizer) LogRecordSize(_ plog.LogRecord) int {
return 1
}

func (s *LogsCountSizer) DeltaSize(newItemSize int) int {
return newItemSize
}
57 changes: 57 additions & 0 deletions exporter/exporterhelper/internal/sizer/logs_sizer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"

import (
"testing"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/pdata/testdata"
)

func TestLogsCountSizer(t *testing.T) {
ld := testdata.GenerateLogs(5)
sizer := LogsCountSizer{}
require.Equal(t, 5, sizer.LogsSize(ld))

rl := ld.ResourceLogs().At(0)
require.Equal(t, 5, sizer.ResourceLogsSize(rl))

sl := rl.ScopeLogs().At(0)
require.Equal(t, 5, sizer.ScopeLogsSize(sl))

require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(0)))
require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(1)))
require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(2)))
require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(3)))
require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(4)))

prevSize := sizer.ScopeLogsSize(sl)
lr := sl.LogRecords().At(2)
lr.CopyTo(sl.LogRecords().AppendEmpty())
require.Equal(t, sizer.ScopeLogsSize(sl), prevSize+sizer.DeltaSize(sizer.LogRecordSize(lr)))
}

func TestLogsBytesSizer(t *testing.T) {
ld := testdata.GenerateLogs(5)
sizer := LogsBytesSizer{}
require.Equal(t, 545, sizer.LogsSize(ld))

rl := ld.ResourceLogs().At(0)
require.Equal(t, 542, sizer.ResourceLogsSize(rl))

sl := rl.ScopeLogs().At(0)
require.Equal(t, 497, sizer.ScopeLogsSize(sl))

require.Equal(t, 109, sizer.LogRecordSize(sl.LogRecords().At(0)))
require.Equal(t, 79, sizer.LogRecordSize(sl.LogRecords().At(1)))
require.Equal(t, 109, sizer.LogRecordSize(sl.LogRecords().At(2)))
require.Equal(t, 79, sizer.LogRecordSize(sl.LogRecords().At(3)))
require.Equal(t, 109, sizer.LogRecordSize(sl.LogRecords().At(4)))

prevSize := sizer.ScopeLogsSize(sl)
lr := sl.LogRecords().At(2)
lr.CopyTo(sl.LogRecords().AppendEmpty())
require.Equal(t, sizer.ScopeLogsSize(sl), prevSize+sizer.DeltaSize(sizer.LogRecordSize(lr)))
}
26 changes: 17 additions & 9 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pipeline"
Expand All @@ -25,16 +26,16 @@ var (
)

type logsRequest struct {
ld plog.Logs
pusher consumer.ConsumeLogsFunc
cachedItemsCount int
ld plog.Logs
pusher consumer.ConsumeLogsFunc
cachedSize int
}

func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request {
return &logsRequest{
ld: ld,
pusher: pusher,
cachedItemsCount: ld.LogRecordCount(),
ld: ld,
pusher: pusher,
cachedSize: -1,
}
}

Expand Down Expand Up @@ -65,11 +66,18 @@ func (req *logsRequest) Export(ctx context.Context) error {
}

func (req *logsRequest) ItemsCount() int {
return req.cachedItemsCount
return req.ld.LogRecordCount()
}

func (req *logsRequest) setCachedItemsCount(count int) {
req.cachedItemsCount = count
func (req *logsRequest) Size(sizer sizer.LogsSizer) int {
if req.cachedSize == -1 {
req.cachedSize = sizer.LogsSize(req.ld)
}
return req.cachedSize
}

func (req *logsRequest) setCachedSize(size int) {
req.cachedSize = size
}

type logsExporter struct {
Expand Down
95 changes: 59 additions & 36 deletions exporter/exporterhelper/logs_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,104 +8,127 @@ import (
"errors"

"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
"go.opentelemetry.io/collector/pdata/plog"
)

var (
logsCountSizer = sizer.LogsCountSizer{}
logsBytesSizer = sizer.LogsBytesSizer{}
)

// MergeSplit splits and/or merges the provided logs request and the current request into one or more requests
// conforming with the MaxSizeConfig.
func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) {
var maxSize int
var sizer sizer.LogsSizer
if cfg.MaxSizeItems != 0 {
maxSize = cfg.MaxSizeItems
sizer = &logsCountSizer
} else if cfg.MaxSizeBytes != 0 {
maxSize = cfg.MaxSizeBytes
sizer = &logsBytesSizer
}

if r2 != nil {
req2, ok := r2.(*logsRequest)
if !ok {
return nil, errors.New("invalid input type")
}
req2.mergeTo(req)
req2.mergeTo(req, sizer)
}

// If no limit we can simply merge the new request into the current and return.
if cfg.MaxSizeItems == 0 {
if cfg.MaxSizeItems == 0 && cfg.MaxSizeBytes == 0 {
return []Request{req}, nil
}
return req.split(cfg)

return req.split(maxSize, sizer)
}

func (req *logsRequest) mergeTo(dst *logsRequest) {
dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount())
req.setCachedItemsCount(0)
func (req *logsRequest) mergeTo(dst *logsRequest, sizer sizer.LogsSizer) {
if sizer != nil {
dst.setCachedSize(dst.Size(sizer) + req.Size(sizer))
req.setCachedSize(0)
}
req.ld.ResourceLogs().MoveAndAppendTo(dst.ld.ResourceLogs())
}

func (req *logsRequest) split(cfg exporterbatcher.MaxSizeConfig) ([]Request, error) {
func (req *logsRequest) split(maxSize int, sizer sizer.LogsSizer) ([]Request, error) {
var res []Request
for req.ItemsCount() > cfg.MaxSizeItems {
ld := extractLogs(req.ld, cfg.MaxSizeItems)
size := ld.LogRecordCount()
req.setCachedItemsCount(req.ItemsCount() - size)
res = append(res, &logsRequest{ld: ld, pusher: req.pusher, cachedItemsCount: size})
for req.Size(sizer) > maxSize {
ld := extractLogs(req.ld, maxSize, sizer)
size := sizer.LogsSize(ld)
req.setCachedSize(req.Size(sizer) - size)
res = append(res, &logsRequest{ld: ld, pusher: req.pusher, cachedSize: size})
}
res = append(res, req)
return res, nil
}

// extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records.
func extractLogs(srcLogs plog.Logs, count int) plog.Logs {
func extractLogs(srcLogs plog.Logs, capacity int, sizer sizer.LogsSizer) plog.Logs {
capacityReached := false
destLogs := plog.NewLogs()
capacityLeft := capacity - sizer.LogsSize(destLogs)
srcLogs.ResourceLogs().RemoveIf(func(srcRL plog.ResourceLogs) bool {
if count == 0 {
if capacityReached {
return false
}
needToExtract := resourceLogsCount(srcRL) > count
needToExtract := sizer.ResourceLogsSize(srcRL) > capacityLeft
if needToExtract {
srcRL = extractResourceLogs(srcRL, count)
srcRL, capacityReached = extractResourceLogs(srcRL, capacityLeft, sizer)
if srcRL.ScopeLogs().Len() == 0 {
return false
}
}
count -= resourceLogsCount(srcRL)
capacityLeft -= sizer.DeltaSize(sizer.ResourceLogsSize(srcRL))
srcRL.MoveTo(destLogs.ResourceLogs().AppendEmpty())
return !needToExtract
})
return destLogs
}

// extractResourceLogs extracts resource logs and returns a new resource logs with the specified number of log records.
func extractResourceLogs(srcRL plog.ResourceLogs, count int) plog.ResourceLogs {
func extractResourceLogs(srcRL plog.ResourceLogs, capacity int, sizer sizer.LogsSizer) (plog.ResourceLogs, bool) {
capacityReached := false
destRL := plog.NewResourceLogs()
destRL.SetSchemaUrl(srcRL.SchemaUrl())
srcRL.Resource().CopyTo(destRL.Resource())
capacityLeft := capacity - sizer.ResourceLogsSize(destRL)
srcRL.ScopeLogs().RemoveIf(func(srcSL plog.ScopeLogs) bool {
if count == 0 {
if capacityReached {
return false
}
needToExtract := srcSL.LogRecords().Len() > count
needToExtract := sizer.ScopeLogsSize(srcSL) > capacityLeft
if needToExtract {
srcSL = extractScopeLogs(srcSL, count)
srcSL, capacityReached = extractScopeLogs(srcSL, capacityLeft, sizer)
if srcSL.LogRecords().Len() == 0 {
return false
}
}
count -= srcSL.LogRecords().Len()
capacityLeft -= sizer.DeltaSize(sizer.ScopeLogsSize(srcSL))
srcSL.MoveTo(destRL.ScopeLogs().AppendEmpty())
return !needToExtract
})
return destRL
return destRL, capacityReached
}

// extractScopeLogs extracts scope logs and returns a new scope logs with the specified number of log records.
func extractScopeLogs(srcSL plog.ScopeLogs, count int) plog.ScopeLogs {
func extractScopeLogs(srcSL plog.ScopeLogs, capacity int, sizer sizer.LogsSizer) (plog.ScopeLogs, bool) {
capacityReached := false
destSL := plog.NewScopeLogs()
destSL.SetSchemaUrl(srcSL.SchemaUrl())
srcSL.Scope().CopyTo(destSL.Scope())
capacityLeft := capacity - sizer.ScopeLogsSize(destSL)
srcSL.LogRecords().RemoveIf(func(srcLR plog.LogRecord) bool {
if count == 0 {
if capacityReached || sizer.LogRecordSize(srcLR) > capacityLeft {
capacityReached = true
return false
}
capacityLeft -= sizer.DeltaSize(sizer.LogRecordSize(srcLR))
srcLR.MoveTo(destSL.LogRecords().AppendEmpty())
count--
return true
})
return destSL
}

// resourceLogsCount calculates the total number of log records in the plog.ResourceLogs.
func resourceLogsCount(rl plog.ResourceLogs) int {
count := 0
for k := 0; k < rl.ScopeLogs().Len(); k++ {
count += rl.ScopeLogs().At(k).LogRecords().Len()
}
return count
return destSL, capacityReached
}
Loading
Loading