Skip to content

Commit

Permalink
bytes based batching logs
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Feb 5, 2025
1 parent 2f6babd commit 51d907b
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 7 deletions.
3 changes: 3 additions & 0 deletions exporter/exporterbatcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type MaxSizeConfig struct {
// If the batch size exceeds this value, it will be broken up into smaller batches if possible.
// Setting this value to zero disables the maximum size limit.
MaxSizeItems int `mapstructure:"max_size_items"`

// MaxSizeBytesTestingOnly is a temporary field we use for testing. DO NOT USE.
MaxSizeBytesTestingOnly int `mapstructure:"_max_size_bytes_testing_only"`
}

func (c Config) Validate() error {
Expand Down
10 changes: 10 additions & 0 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ type logsRequest struct {
ld plog.Logs
pusher consumer.ConsumeLogsFunc
cachedItemsCount int
cachedByteSize int
}

func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request {
return &logsRequest{
ld: ld,
pusher: pusher,
cachedItemsCount: ld.LogRecordCount(),
cachedByteSize: logsMarshaler.LogsSize(ld),
}
}

Expand Down Expand Up @@ -72,6 +74,14 @@ func (req *logsRequest) setCachedItemsCount(count int) {
req.cachedItemsCount = count
}

func (req *logsRequest) ByteSize() int {
return req.cachedByteSize
}

func (req *logsRequest) setCachedByteSize(size int) {
req.cachedByteSize = size
}

type logsExporter struct {
*internal.BaseExporter
consumer.Logs
Expand Down
110 changes: 104 additions & 6 deletions exporter/exporterhelper/logs_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte
import (
"context"
"errors"
math_bits "math/bits"

"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -23,7 +24,7 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz
}

// If no limit we can simply merge the new request into the current and return.
if cfg.MaxSizeItems == 0 {
if cfg.MaxSizeItems == 0 && cfg.MaxSizeBytesTestingOnly == 0 {
return []Request{req}, nil
}
return req.split(cfg)
Expand All @@ -32,16 +33,27 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz
func (req *logsRequest) mergeTo(dst *logsRequest) {
dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount())
req.setCachedItemsCount(0)
dst.setCachedByteSize(dst.ByteSize() + req.ByteSize())
req.setCachedByteSize(0)
req.ld.ResourceLogs().MoveAndAppendTo(dst.ld.ResourceLogs())
}

func (req *logsRequest) split(cfg exporterbatcher.MaxSizeConfig) ([]Request, error) {
var res []Request
for req.ItemsCount() > cfg.MaxSizeItems {
ld := extractLogs(req.ld, cfg.MaxSizeItems)
size := ld.LogRecordCount()
req.setCachedItemsCount(req.ItemsCount() - size)
res = append(res, &logsRequest{ld: ld, pusher: req.pusher, cachedItemsCount: size})
if cfg.MaxSizeItems != 0 {
for req.ItemsCount() > cfg.MaxSizeItems {
ld := extractLogs(req.ld, cfg.MaxSizeItems)
size := ld.LogRecordCount()
req.setCachedItemsCount(req.ItemsCount() - size)
res = append(res, &logsRequest{ld: ld, pusher: req.pusher, cachedItemsCount: size})
}
} else if cfg.MaxSizeBytesTestingOnly != 0 {
for req.ByteSize() > cfg.MaxSizeBytesTestingOnly {
ld := extractLogsBasedOnByteSize(req.ld, cfg.MaxSizeBytesTestingOnly)
size := logsMarshaler.LogsSize(ld)
req.setCachedByteSize(req.ByteSize() - size)
res = append(res, &logsRequest{ld: ld, pusher: req.pusher, cachedByteSize: size})
}
}
res = append(res, req)
return res, nil
Expand Down Expand Up @@ -109,3 +121,89 @@ func resourceLogsCount(rl plog.ResourceLogs) int {
}
return count
}

func (req *logsRequest) splitBasedOnByteSize(maxByteSize int) ([]Request, error) {
var res []Request
for req.ByteSize() > maxByteSize {
ld := extractLogsBasedOnByteSize(req.ld, maxByteSize)
size := ld.LogRecordCount()
req.setCachedItemsCount(req.ItemsCount() - size)
res = append(res, &logsRequest{ld: ld, pusher: req.pusher, cachedItemsCount: size})
}
res = append(res, req)
return res, nil
}

// extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records.
func extractLogsBasedOnByteSize(srcLogs plog.Logs, capacity int) plog.Logs {
capacityReached := false
destLogs := plog.NewLogs()
capacityLeft := capacity - logsMarshaler.LogsSize(destLogs)
srcLogs.ResourceLogs().RemoveIf(func(srcRL plog.ResourceLogs) bool {
if capacityReached {
return false
}
needToExtract := logsMarshaler.ResourceLogsSize(srcRL) > capacityLeft
if needToExtract {
srcRL, capacityReached = extractResourceLogsBasedOnByteSize(srcRL, capacityLeft)
if srcRL.ScopeLogs().Len() == 0 {
return false
}
}
capacityLeft -= deltaCapacity(logsMarshaler.ResourceLogsSize(srcRL))
srcRL.MoveTo(destLogs.ResourceLogs().AppendEmpty())
return !needToExtract
})
return destLogs
}

// extractResourceLogs extracts resource logs and returns a new resource logs with the specified number of log records.
func extractResourceLogsBasedOnByteSize(srcRL plog.ResourceLogs, capacity int) (plog.ResourceLogs, bool) {
capacityReached := false
destRL := plog.NewResourceLogs()
destRL.SetSchemaUrl(srcRL.SchemaUrl())
srcRL.Resource().CopyTo(destRL.Resource())
capacityLeft := capacity - logsMarshaler.ResourceLogsSize(destRL)
srcRL.ScopeLogs().RemoveIf(func(srcSL plog.ScopeLogs) bool {
if capacityReached {
return false
}
needToExtract := logsMarshaler.ScopeLogsSize(srcSL) > capacityLeft
if needToExtract {
srcSL, capacityReached = extractScopeLogsBasedOnByteSize(srcSL, capacityLeft)
if srcSL.LogRecords().Len() == 0 {
return false
}
}

capacityLeft -= deltaCapacity(logsMarshaler.ScopeLogsSize(srcSL))
srcSL.MoveTo(destRL.ScopeLogs().AppendEmpty())
return !needToExtract
})
return destRL, capacityReached
}

// extractScopeLogs extracts scope logs and returns a new scope logs with the specified number of log records.
func extractScopeLogsBasedOnByteSize(srcSL plog.ScopeLogs, capacity int) (plog.ScopeLogs, bool) {
capacityReached := false
destSL := plog.NewScopeLogs()
destSL.SetSchemaUrl(srcSL.SchemaUrl())
srcSL.Scope().CopyTo(destSL.Scope())
capacityLeft := capacity - logsMarshaler.ScopeLogsSize(destSL)

srcSL.LogRecords().RemoveIf(func(srcLR plog.LogRecord) bool {
if capacityReached || logsMarshaler.LogRecordSize(srcLR) > capacityLeft {
capacityReached = true
return false
}
capacityLeft -= deltaCapacity(logsMarshaler.LogRecordSize(srcLR))
srcLR.MoveTo(destSL.LogRecords().AppendEmpty())
return true
})
return destSL, capacityReached
}

// deltaCapacity() returns the delta size of a proto slice when a new item is added.
func deltaCapacity(newItemSize int) int {
return 1 + newItemSize + int(math_bits.Len64(uint64(newItemSize|1)+6)/7)
}
143 changes: 142 additions & 1 deletion exporter/exporterhelper/logs_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,106 @@ func TestMergeSplitLogs(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, len(tt.expected), len(res))
for i := range res {
assert.Equal(t, tt.expected[i], res[i])
assert.Equal(t, tt.expected[i].(*logsRequest).ld, res[i].(*logsRequest).ld)
}
})
}
}

func TestMergeSplitLogsBasedOnByteSize(t *testing.T) {
tests := []struct {
name string
cfg exporterbatcher.MaxSizeConfig
lr1 internal.Request
lr2 internal.Request
expected []Request
}{
{
name: "both_requests_empty",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytesTestingOnly: logsMarshaler.LogsSize(testdata.GenerateLogs(10))},
lr1: newLogsRequest(plog.NewLogs(), nil),
lr2: newLogsRequest(plog.NewLogs(), nil),
expected: []Request{newLogsRequest(plog.NewLogs(), nil)},
},
{
name: "first_request_empty",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytesTestingOnly: logsMarshaler.LogsSize(testdata.GenerateLogs(10))},
lr1: newLogsRequest(plog.NewLogs(), nil),
lr2: newLogsRequest(testdata.GenerateLogs(5), nil),
expected: []Request{newLogsRequest(testdata.GenerateLogs(5), nil)},
},
{
name: "first_empty_second_nil",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytesTestingOnly: logsMarshaler.LogsSize(testdata.GenerateLogs(10))},
lr1: newLogsRequest(plog.NewLogs(), nil),
lr2: nil,
expected: []Request{newLogsRequest(plog.NewLogs(), nil)},
},
{
name: "merge_only",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytesTestingOnly: logsMarshaler.LogsSize(testdata.GenerateLogs(11))},
lr1: newLogsRequest(testdata.GenerateLogs(4), nil),
lr2: newLogsRequest(testdata.GenerateLogs(6), nil),
expected: []Request{newLogsRequest(func() plog.Logs {
logs := testdata.GenerateLogs(4)
testdata.GenerateLogs(6).ResourceLogs().MoveAndAppendTo(logs.ResourceLogs())
return logs
}(), nil)},
},
{
name: "split_only",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytesTestingOnly: logsMarshaler.LogsSize(testdata.GenerateLogs(4))},
lr1: newLogsRequest(plog.NewLogs(), nil),
lr2: newLogsRequest(testdata.GenerateLogs(10), nil),
expected: []Request{
newLogsRequest(testdata.GenerateLogs(4), nil),
newLogsRequest(testdata.GenerateLogs(4), nil),
newLogsRequest(testdata.GenerateLogs(2), nil),
},
},
{
name: "merge_and_split",
cfg: exporterbatcher.MaxSizeConfig{
MaxSizeBytesTestingOnly: logsMarshaler.LogsSize(testdata.GenerateLogs(10))/2 + logsMarshaler.LogsSize(testdata.GenerateLogs(11))/2},
lr1: newLogsRequest(testdata.GenerateLogs(8), nil),
lr2: newLogsRequest(testdata.GenerateLogs(20), nil),
expected: []Request{
newLogsRequest(func() plog.Logs {
logs := testdata.GenerateLogs(8)
testdata.GenerateLogs(2).ResourceLogs().MoveAndAppendTo(logs.ResourceLogs())
return logs
}(), nil),
newLogsRequest(testdata.GenerateLogs(10), nil),
newLogsRequest(testdata.GenerateLogs(8), nil),
},
},
{
name: "scope_logs_split",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytesTestingOnly: logsMarshaler.LogsSize(testdata.GenerateLogs(4))},
lr1: newLogsRequest(func() plog.Logs {
ld := testdata.GenerateLogs(4)
ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("extra log")
return ld
}(), nil),
lr2: newLogsRequest(testdata.GenerateLogs(2), nil),
expected: []Request{
newLogsRequest(testdata.GenerateLogs(4), nil),
newLogsRequest(func() plog.Logs {
ld := testdata.GenerateLogs(0)
ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty().Body().SetStr("extra log")
testdata.GenerateLogs(2).ResourceLogs().MoveAndAppendTo(ld.ResourceLogs())
return ld
}(), nil),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res, err := tt.lr1.MergeSplit(context.Background(), tt.cfg, tt.lr2)
require.NoError(t, err)
assert.Equal(t, len(tt.expected), len(res))
for i := range res {
assert.Equal(t, tt.expected[i].(*logsRequest).ld, res[i].(*logsRequest).ld)
}
})
}
Expand Down Expand Up @@ -179,6 +278,20 @@ func BenchmarkSplittingBasedOnItemCountManySmallLogs(b *testing.B) {
assert.Len(b, merged, 1)
}
}
func BenchmarkSplittingBasedOnByteSizeManySmallLogs(b *testing.B) {
// All requests merge into a single batch.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytesTestingOnly: logsMarshaler.LogsSize(testdata.GenerateLogs(11000))}
b.ReportAllocs()
for i := 0; i < b.N; i++ {
merged := []Request{newLogsRequest(testdata.GenerateLogs(10), nil)}
for j := 0; j < 1000; j++ {
lr2 := newLogsRequest(testdata.GenerateLogs(10), nil)
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
}
assert.Len(b, merged, 1)
}
}

func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyAboveLimit(b *testing.B) {
// Every incoming request results in a split.
Expand All @@ -194,6 +307,21 @@ func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyAboveLimit(b *testing.B)
assert.Len(b, merged, 11)
}
}
func BenchmarkSplittingBasedOnByteSizeManyLogsSlightlyAboveLimit(b *testing.B) {
// Every incoming request results in a split.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytesTestingOnly: logsMarshaler.LogsSize(testdata.GenerateLogs(10000))}
b.ReportAllocs()
for i := 0; i < b.N; i++ {
merged := []Request{newLogsRequest(testdata.GenerateLogs(0), nil)}
for j := 0; j < 10; j++ {
lr2 := newLogsRequest(testdata.GenerateLogs(10001), nil)
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
assert.Len(b, res, 2)
merged = append(merged[0:len(merged)-1], res...)
}
assert.Len(b, merged, 11)
}
}

func BenchmarkSplittingBasedOnItemCountHugeLogs(b *testing.B) {
// One request splits into many batches.
Expand All @@ -207,3 +335,16 @@ func BenchmarkSplittingBasedOnItemCountHugeLogs(b *testing.B) {
assert.Len(b, merged, 10)
}
}

func BenchmarkSplittingBasedOnByteSizeHugeLogs(b *testing.B) {
// One request splits into many batches.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytesTestingOnly: logsMarshaler.LogsSize(testdata.GenerateLogs(10010))}
b.ReportAllocs()
for i := 0; i < b.N; i++ {
merged := []Request{newLogsRequest(testdata.GenerateLogs(0), nil)}
lr2 := newLogsRequest(testdata.GenerateLogs(100000), nil)
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
assert.Len(b, merged, 10)
}
}
12 changes: 12 additions & 0 deletions pdata/plog/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ func (e *ProtoMarshaler) LogsSize(ld Logs) int {
return pb.Size()
}

func (e *ProtoMarshaler) ResourceLogsSize(rl ResourceLogs) int {
return rl.orig.Size()
}

func (e *ProtoMarshaler) ScopeLogsSize(sl ScopeLogs) int {
return sl.orig.Size()
}

func (e *ProtoMarshaler) LogRecordSize(lr LogRecord) int {
return lr.orig.Size()
}

var _ Unmarshaler = (*ProtoUnmarshaler)(nil)

type ProtoUnmarshaler struct{}
Expand Down

0 comments on commit 51d907b

Please sign in to comment.