Skip to content

Commit 66d83f0

Browse files
[exporter][batching] Serialized bytes based batching for logs (open-telemetry#12299)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR implements serialized bytes based batching. <!-- Issue number if applicable --> #### Link to tracking issue open-telemetry#3262 <!--Describe what testing was performed and which tests were added.--> #### Testing ``` BenchmarkSplittingBasedOnItemCountManySmallLogs-10 409 2822478 ns/op 3148718 B/op 71082 allocs/op BenchmarkSplittingBasedOnByteSizeManySmallLogs-10 412 2890594 ns/op 3156103 B/op 71242 allocs/op BenchmarkSplittingBasedOnItemCountManyLogsSlightlyAboveLimit-10 38 27579702 ns/op 43531582 B/op 700471 allocs/op BenchmarkSplittingBasedOnByteSizeManyLogsSlightlyAboveLimit-10 24 43067219 ns/op 43639889 B/op 702927 allocs/op BenchmarkSplittingBasedOnItemCountHugeLogs-10 40 29722710 ns/op 41922925 B/op 690300 allocs/op BenchmarkSplittingBasedOnByteSizeHugeLogs-10 16 64321198 ns/op 42107144 B/op 694144 allocs/op ``` #### Caveat We book keep both item count and byte size regardless of batching option, which adds an overhead in both cases. Co-authored-by: Dmitrii Anoshin <[email protected]>
1 parent b3b28ed commit 66d83f0

File tree

7 files changed

+366
-51
lines changed

7 files changed

+366
-51
lines changed

exporter/exporterbatcher/sizer_type.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,20 @@ type SizerType struct {
1313

1414
const (
1515
sizerTypeItems = "items"
16+
sizerTypeBytes = "bytes"
1617
)
1718

18-
var SizerTypeItems = SizerType{val: sizerTypeItems}
19+
var (
20+
SizerTypeItems = SizerType{val: sizerTypeItems}
21+
SizerTypeBytes = SizerType{val: sizerTypeBytes}
22+
)
1923

2024
// UnmarshalText implements TextUnmarshaler interface.
2125
func (s *SizerType) UnmarshalText(text []byte) error {
2226
switch str := string(text); str {
2327
case sizerTypeItems:
2428
*s = SizerTypeItems
29+
// TODO support setting sizer to SizerTypeBytes when all logs, traces, and metrics batching support it
2530
default:
2631
return fmt.Errorf("invalid sizer: %q", str)
2732
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
5+
6+
import (
7+
math_bits "math/bits"
8+
9+
"go.opentelemetry.io/collector/pdata/plog"
10+
)
11+
12+
type LogsSizer interface {
13+
LogsSize(ld plog.Logs) int
14+
ResourceLogsSize(rl plog.ResourceLogs) int
15+
ScopeLogsSize(sl plog.ScopeLogs) int
16+
LogRecordSize(lr plog.LogRecord) int
17+
18+
// DeltaSize() returns the delta size when a ResourceLog, ScopeLog or LogRecord is added.
19+
DeltaSize(newItemSize int) int
20+
}
21+
22+
// LogsByteSizer returns the byte size of serialized protos.
23+
type LogsBytesSizer struct {
24+
plog.ProtoMarshaler
25+
}
26+
27+
// DeltaSize() returns the delta size of a proto slice when a new item is added.
28+
// Example:
29+
//
30+
// prevSize := proto1.Size()
31+
// proto1.RepeatedField().AppendEmpty() = proto2
32+
//
33+
// Then currSize of proto1 can be calculated as
34+
//
35+
// currSize := (prevSize + sizer.DeltaSize(proto2.Size()))
36+
//
37+
// This is derived from opentelemetry-collector/pdata/internal/data/protogen/logs/v1/logs.pb.go
38+
// which is generated with gogo/protobuf.
39+
func (s *LogsBytesSizer) DeltaSize(newItemSize int) int {
40+
return 1 + newItemSize + math_bits.Len64(uint64(newItemSize|1)+6)/7 //nolint:gosec // disable G115
41+
}
42+
43+
// LogsCountSizer returns the nunmber of logs entries.
44+
type LogsCountSizer struct{}
45+
46+
func (s *LogsCountSizer) LogsSize(ld plog.Logs) int {
47+
return ld.LogRecordCount()
48+
}
49+
50+
func (s *LogsCountSizer) ResourceLogsSize(rl plog.ResourceLogs) int {
51+
count := 0
52+
for k := 0; k < rl.ScopeLogs().Len(); k++ {
53+
count += rl.ScopeLogs().At(k).LogRecords().Len()
54+
}
55+
return count
56+
}
57+
58+
func (s *LogsCountSizer) ScopeLogsSize(sl plog.ScopeLogs) int {
59+
return sl.LogRecords().Len()
60+
}
61+
62+
func (s *LogsCountSizer) LogRecordSize(_ plog.LogRecord) int {
63+
return 1
64+
}
65+
66+
func (s *LogsCountSizer) DeltaSize(newItemSize int) int {
67+
return newItemSize
68+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
4+
5+
import (
6+
"testing"
7+
8+
"github.com/stretchr/testify/require"
9+
10+
"go.opentelemetry.io/collector/pdata/testdata"
11+
)
12+
13+
func TestLogsCountSizer(t *testing.T) {
14+
ld := testdata.GenerateLogs(5)
15+
sizer := LogsCountSizer{}
16+
require.Equal(t, 5, sizer.LogsSize(ld))
17+
18+
rl := ld.ResourceLogs().At(0)
19+
require.Equal(t, 5, sizer.ResourceLogsSize(rl))
20+
21+
sl := rl.ScopeLogs().At(0)
22+
require.Equal(t, 5, sizer.ScopeLogsSize(sl))
23+
24+
require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(0)))
25+
require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(1)))
26+
require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(2)))
27+
require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(3)))
28+
require.Equal(t, 1, sizer.LogRecordSize(sl.LogRecords().At(4)))
29+
30+
prevSize := sizer.ScopeLogsSize(sl)
31+
lr := sl.LogRecords().At(2)
32+
lr.CopyTo(sl.LogRecords().AppendEmpty())
33+
require.Equal(t, sizer.ScopeLogsSize(sl), prevSize+sizer.DeltaSize(sizer.LogRecordSize(lr)))
34+
}
35+
36+
func TestLogsBytesSizer(t *testing.T) {
37+
ld := testdata.GenerateLogs(5)
38+
sizer := LogsBytesSizer{}
39+
require.Equal(t, 545, sizer.LogsSize(ld))
40+
41+
rl := ld.ResourceLogs().At(0)
42+
require.Equal(t, 542, sizer.ResourceLogsSize(rl))
43+
44+
sl := rl.ScopeLogs().At(0)
45+
require.Equal(t, 497, sizer.ScopeLogsSize(sl))
46+
47+
require.Equal(t, 109, sizer.LogRecordSize(sl.LogRecords().At(0)))
48+
require.Equal(t, 79, sizer.LogRecordSize(sl.LogRecords().At(1)))
49+
require.Equal(t, 109, sizer.LogRecordSize(sl.LogRecords().At(2)))
50+
require.Equal(t, 79, sizer.LogRecordSize(sl.LogRecords().At(3)))
51+
require.Equal(t, 109, sizer.LogRecordSize(sl.LogRecords().At(4)))
52+
53+
prevSize := sizer.ScopeLogsSize(sl)
54+
lr := sl.LogRecords().At(2)
55+
lr.CopyTo(sl.LogRecords().AppendEmpty())
56+
require.Equal(t, sizer.ScopeLogsSize(sl), prevSize+sizer.DeltaSize(sizer.LogRecordSize(lr)))
57+
}

exporter/exporterhelper/logs.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.opentelemetry.io/collector/consumer/consumererror"
1515
"go.opentelemetry.io/collector/exporter"
1616
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
17+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
1718
"go.opentelemetry.io/collector/exporter/exporterqueue"
1819
"go.opentelemetry.io/collector/pdata/plog"
1920
"go.opentelemetry.io/collector/pipeline"
@@ -25,16 +26,16 @@ var (
2526
)
2627

2728
type logsRequest struct {
28-
ld plog.Logs
29-
pusher consumer.ConsumeLogsFunc
30-
cachedItemsCount int
29+
ld plog.Logs
30+
pusher consumer.ConsumeLogsFunc
31+
cachedSize int
3132
}
3233

3334
func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request {
3435
return &logsRequest{
35-
ld: ld,
36-
pusher: pusher,
37-
cachedItemsCount: ld.LogRecordCount(),
36+
ld: ld,
37+
pusher: pusher,
38+
cachedSize: -1,
3839
}
3940
}
4041

@@ -65,11 +66,18 @@ func (req *logsRequest) Export(ctx context.Context) error {
6566
}
6667

6768
func (req *logsRequest) ItemsCount() int {
68-
return req.cachedItemsCount
69+
return req.ld.LogRecordCount()
6970
}
7071

71-
func (req *logsRequest) setCachedItemsCount(count int) {
72-
req.cachedItemsCount = count
72+
func (req *logsRequest) Size(sizer sizer.LogsSizer) int {
73+
if req.cachedSize == -1 {
74+
req.cachedSize = sizer.LogsSize(req.ld)
75+
}
76+
return req.cachedSize
77+
}
78+
79+
func (req *logsRequest) setCachedSize(size int) {
80+
req.cachedSize = size
7381
}
7482

7583
type logsExporter struct {

exporter/exporterhelper/logs_batch.go

Lines changed: 56 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -8,104 +8,124 @@ import (
88
"errors"
99

1010
"go.opentelemetry.io/collector/exporter/exporterbatcher"
11+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
1112
"go.opentelemetry.io/collector/pdata/plog"
1213
)
1314

15+
func sizerFromConfig(cfg exporterbatcher.SizeConfig) sizer.LogsSizer {
16+
switch cfg.Sizer {
17+
case exporterbatcher.SizerTypeItems:
18+
return &sizer.LogsCountSizer{}
19+
case exporterbatcher.SizerTypeBytes:
20+
return &sizer.LogsBytesSizer{}
21+
default:
22+
return &sizer.LogsCountSizer{}
23+
}
24+
}
25+
1426
// MergeSplit splits and/or merges the provided logs request and the current request into one or more requests
1527
// conforming with the MaxSizeConfig.
1628
func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.SizeConfig, r2 Request) ([]Request, error) {
29+
sizer := sizerFromConfig(cfg)
1730
if r2 != nil {
1831
req2, ok := r2.(*logsRequest)
1932
if !ok {
2033
return nil, errors.New("invalid input type")
2134
}
22-
req2.mergeTo(req)
35+
req2.mergeTo(req, sizer)
2336
}
2437

2538
// If no limit we can simply merge the new request into the current and return.
2639
if cfg.MaxSize == 0 {
2740
return []Request{req}, nil
2841
}
29-
return req.split(cfg)
42+
43+
return req.split(cfg.MaxSize, sizer), nil
3044
}
3145

32-
func (req *logsRequest) mergeTo(dst *logsRequest) {
33-
dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount())
34-
req.setCachedItemsCount(0)
46+
func (req *logsRequest) mergeTo(dst *logsRequest, sizer sizer.LogsSizer) {
47+
if sizer != nil {
48+
dst.setCachedSize(dst.Size(sizer) + req.Size(sizer))
49+
req.setCachedSize(0)
50+
}
3551
req.ld.ResourceLogs().MoveAndAppendTo(dst.ld.ResourceLogs())
3652
}
3753

38-
func (req *logsRequest) split(cfg exporterbatcher.SizeConfig) ([]Request, error) {
54+
func (req *logsRequest) split(maxSize int, sizer sizer.LogsSizer) []Request {
3955
var res []Request
40-
for req.ItemsCount() > cfg.MaxSize {
41-
ld := extractLogs(req.ld, cfg.MaxSize)
42-
size := ld.LogRecordCount()
43-
req.setCachedItemsCount(req.ItemsCount() - size)
44-
res = append(res, &logsRequest{ld: ld, pusher: req.pusher, cachedItemsCount: size})
56+
for req.Size(sizer) > maxSize {
57+
ld := extractLogs(req.ld, maxSize, sizer)
58+
size := sizer.LogsSize(ld)
59+
req.setCachedSize(req.Size(sizer) - size)
60+
res = append(res, &logsRequest{ld: ld, pusher: req.pusher, cachedSize: size})
4561
}
4662
res = append(res, req)
47-
return res, nil
63+
return res
4864
}
4965

5066
// extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records.
51-
func extractLogs(srcLogs plog.Logs, count int) plog.Logs {
67+
func extractLogs(srcLogs plog.Logs, capacity int, sizer sizer.LogsSizer) plog.Logs {
68+
capacityReached := false
5269
destLogs := plog.NewLogs()
70+
capacityLeft := capacity - sizer.LogsSize(destLogs)
5371
srcLogs.ResourceLogs().RemoveIf(func(srcRL plog.ResourceLogs) bool {
54-
if count == 0 {
72+
if capacityReached {
5573
return false
5674
}
57-
needToExtract := resourceLogsCount(srcRL) > count
75+
needToExtract := sizer.ResourceLogsSize(srcRL) > capacityLeft
5876
if needToExtract {
59-
srcRL = extractResourceLogs(srcRL, count)
77+
srcRL, capacityReached = extractResourceLogs(srcRL, capacityLeft, sizer)
78+
if srcRL.ScopeLogs().Len() == 0 {
79+
return false
80+
}
6081
}
61-
count -= resourceLogsCount(srcRL)
82+
capacityLeft -= sizer.DeltaSize(sizer.ResourceLogsSize(srcRL))
6283
srcRL.MoveTo(destLogs.ResourceLogs().AppendEmpty())
6384
return !needToExtract
6485
})
6586
return destLogs
6687
}
6788

6889
// extractResourceLogs extracts resource logs and returns a new resource logs with the specified number of log records.
69-
func extractResourceLogs(srcRL plog.ResourceLogs, count int) plog.ResourceLogs {
90+
func extractResourceLogs(srcRL plog.ResourceLogs, capacity int, sizer sizer.LogsSizer) (plog.ResourceLogs, bool) {
91+
capacityReached := false
7092
destRL := plog.NewResourceLogs()
7193
destRL.SetSchemaUrl(srcRL.SchemaUrl())
7294
srcRL.Resource().CopyTo(destRL.Resource())
95+
capacityLeft := capacity - sizer.ResourceLogsSize(destRL)
7396
srcRL.ScopeLogs().RemoveIf(func(srcSL plog.ScopeLogs) bool {
74-
if count == 0 {
97+
if capacityReached {
7598
return false
7699
}
77-
needToExtract := srcSL.LogRecords().Len() > count
100+
needToExtract := sizer.ScopeLogsSize(srcSL) > capacityLeft
78101
if needToExtract {
79-
srcSL = extractScopeLogs(srcSL, count)
102+
srcSL, capacityReached = extractScopeLogs(srcSL, capacityLeft, sizer)
103+
if srcSL.LogRecords().Len() == 0 {
104+
return false
105+
}
80106
}
81-
count -= srcSL.LogRecords().Len()
107+
capacityLeft -= sizer.DeltaSize(sizer.ScopeLogsSize(srcSL))
82108
srcSL.MoveTo(destRL.ScopeLogs().AppendEmpty())
83109
return !needToExtract
84110
})
85-
return destRL
111+
return destRL, capacityReached
86112
}
87113

88114
// extractScopeLogs extracts scope logs and returns a new scope logs with the specified number of log records.
89-
func extractScopeLogs(srcSL plog.ScopeLogs, count int) plog.ScopeLogs {
115+
func extractScopeLogs(srcSL plog.ScopeLogs, capacity int, sizer sizer.LogsSizer) (plog.ScopeLogs, bool) {
116+
capacityReached := false
90117
destSL := plog.NewScopeLogs()
91118
destSL.SetSchemaUrl(srcSL.SchemaUrl())
92119
srcSL.Scope().CopyTo(destSL.Scope())
120+
capacityLeft := capacity - sizer.ScopeLogsSize(destSL)
93121
srcSL.LogRecords().RemoveIf(func(srcLR plog.LogRecord) bool {
94-
if count == 0 {
122+
if capacityReached || sizer.LogRecordSize(srcLR) > capacityLeft {
123+
capacityReached = true
95124
return false
96125
}
126+
capacityLeft -= sizer.DeltaSize(sizer.LogRecordSize(srcLR))
97127
srcLR.MoveTo(destSL.LogRecords().AppendEmpty())
98-
count--
99128
return true
100129
})
101-
return destSL
102-
}
103-
104-
// resourceLogsCount calculates the total number of log records in the plog.ResourceLogs.
105-
func resourceLogsCount(rl plog.ResourceLogs) int {
106-
count := 0
107-
for k := 0; k < rl.ScopeLogs().Len(); k++ {
108-
count += rl.ScopeLogs().At(k).LogRecords().Len()
109-
}
110-
return count
130+
return destSL, capacityReached
111131
}

0 commit comments

Comments
 (0)