Skip to content

Commit d156151

Browse files
committed
POC byte based batching
1 parent 306c939 commit d156151

File tree

62 files changed

+524
-6
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+524
-6
lines changed

exporter/exporterbatcher/config.go

+15
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type MinSizeConfig struct {
3131
// sent regardless of the timeout. There is no guarantee that the batch size always greater than this value.
3232
// This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored.
3333
MinSizeItems int `mapstructure:"min_size_items"`
34+
MinSizeBytes int `mapstructure:"min_size_bytes"`
3435
}
3536

3637
// MaxSizeConfig defines the configuration for the maximum number of items in a batch.
@@ -41,18 +42,32 @@ type MaxSizeConfig struct {
4142
// If the batch size exceeds this value, it will be broken up into smaller batches if possible.
4243
// Setting this value to zero disables the maximum size limit.
4344
MaxSizeItems int `mapstructure:"max_size_items"`
45+
MaxSizeBytes int `mapstructure:"max_size_bytes"`
4446
}
4547

4648
func (c Config) Validate() error {
49+
if c.MinSizeBytes != 0 && c.MinSizeItems != 0 || c.MinSizeBytes != 0 && c.MaxSizeItems != 0 || c.MinSizeItems != 0 && c.MaxSizeBytes != 0 {
50+
return errors.New("size limit and bytes limit cannot be specified at the same time")
51+
}
52+
4753
if c.MinSizeItems < 0 {
4854
return errors.New("min_size_items must be greater than or equal to zero")
4955
}
56+
if c.MinSizeBytes < 0 {
57+
return errors.New("min_size_bytes must be greater than or equal to zero")
58+
}
5059
if c.MaxSizeItems < 0 {
5160
return errors.New("max_size_items must be greater than or equal to zero")
5261
}
62+
if c.MaxSizeBytes < 0 {
63+
return errors.New("max_size_bytes must be greater than or equal to zero")
64+
}
5365
if c.MaxSizeItems != 0 && c.MaxSizeItems < c.MinSizeItems {
5466
return errors.New("max_size_items must be greater than or equal to min_size_items")
5567
}
68+
if c.MaxSizeBytes != 0 && c.MaxSizeBytes < c.MinSizeBytes {
69+
return errors.New("max_size_bytes must be greater than or equal to min_size_bytes")
70+
}
5671
if c.FlushTimeout <= 0 {
5772
return errors.New("timeout must be greater than zero")
5873
}

exporter/exporterhelper/internal/request.go

+4
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ func (r *fakeRequest) ItemsCount() int {
5656
return r.items
5757
}
5858

59+
func (r *fakeRequest) ByteSize() int {
60+
return r.items
61+
}
62+
5963
func (r *fakeRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 internal.Request) ([]internal.Request, error) {
6064
if r.mergeErr != nil {
6165
return nil, r.mergeErr

exporter/exporterhelper/internal/retry_sender_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,10 @@ func (mer *mockErrorRequest) ItemsCount() int {
418418
return 7
419419
}
420420

421+
func (mer *mockErrorRequest) ByteSize() int {
422+
return 7
423+
}
424+
421425
func (mer *mockErrorRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) {
422426
return nil, nil
423427
}
@@ -464,6 +468,10 @@ func (m *mockRequest) ItemsCount() int {
464468
return m.cnt
465469
}
466470

471+
func (m *mockRequest) ByteSize() int {
472+
return m.cnt
473+
}
474+
467475
func (m *mockRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) {
468476
return nil, nil
469477
}

exporter/exporterhelper/logs.go

+4
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ func (req *logsRequest) ItemsCount() int {
6767
return req.ld.LogRecordCount()
6868
}
6969

70+
func (req *logsRequest) ByteSize() int {
71+
return req.ld.ByteSize()
72+
}
73+
7074
type logsExporter struct {
7175
*internal.BaseExporter
7276
consumer.Logs

exporter/exporterhelper/logs_batch.go

+120-2
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,129 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz
2323
}
2424
}
2525

26-
if cfg.MaxSizeItems == 0 {
27-
req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs())
26+
if cfg.MaxSizeItems == 0 && cfg.MaxSizeBytes == 0 {
27+
if req2 != nil {
28+
req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs())
29+
}
2830
return []Request{req}, nil
2931
}
32+
if cfg.MaxSizeBytes > 0 {
33+
return req.mergeSplitBasedOnByteSize(cfg, req2)
34+
}
35+
return req.mergeSplitBasedOnItemCount(cfg, req2)
36+
}
37+
38+
func (req *logsRequest) mergeSplitBasedOnByteSize(cfg exporterbatcher.MaxSizeConfig, req2 *logsRequest) ([]Request, error) {
39+
var (
40+
res []Request
41+
destReq *logsRequest
42+
capacityLeft = cfg.MaxSizeBytes
43+
)
44+
for _, srcReq := range []*logsRequest{req, req2} {
45+
if srcReq == nil {
46+
continue
47+
}
48+
49+
ByteSize := srcReq.ld.ByteSize()
50+
if ByteSize <= capacityLeft {
51+
if destReq == nil {
52+
destReq = srcReq
53+
} else {
54+
srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
55+
}
56+
capacityLeft -= ByteSize
57+
continue
58+
}
59+
60+
for {
61+
extractedLogs, capacityReached := extractLogsBasedOnByteSize(srcReq.ld, capacityLeft)
62+
if extractedLogs.LogRecordCount() == 0 {
63+
break
64+
}
65+
if destReq == nil {
66+
destReq = &logsRequest{ld: extractedLogs, pusher: srcReq.pusher}
67+
} else {
68+
extractedLogs.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
69+
}
70+
// Create new batch once capacity is reached.
71+
if capacityReached {
72+
res = append(res, destReq)
73+
destReq = nil
74+
capacityLeft = cfg.MaxSizeBytes
75+
} else {
76+
capacityLeft = cfg.MaxSizeBytes - destReq.ByteSize()
77+
}
78+
}
79+
}
80+
81+
if destReq != nil {
82+
res = append(res, destReq)
83+
}
84+
return res, nil
85+
}
86+
87+
// extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records.
88+
func extractLogsBasedOnByteSize(srcLogs plog.Logs, capacity int) (plog.Logs, bool) {
89+
capacityReached := false
90+
destLogs := plog.NewLogs()
91+
srcLogs.ResourceLogs().RemoveIf(func(srcRL plog.ResourceLogs) bool {
92+
if capacityReached {
93+
return false
94+
}
95+
needToExtract := srcRL.Size() > capacity-destLogs.ByteSize()
96+
if needToExtract {
97+
srcRL, capacityReached = extractResourceLogsBasedOnByteSize(srcRL, capacity-destLogs.ByteSize())
98+
if srcRL.ScopeLogs().Len() == 0 {
99+
return false
100+
}
101+
}
102+
srcRL.MoveTo(destLogs.ResourceLogs().AppendEmpty())
103+
return !needToExtract
104+
})
105+
return destLogs, capacityReached
106+
}
107+
108+
// extractResourceLogs extracts resource logs and returns a new resource logs with the specified number of log records.
109+
func extractResourceLogsBasedOnByteSize(srcRL plog.ResourceLogs, capacity int) (plog.ResourceLogs, bool) {
110+
capacityReached := false
111+
destRL := plog.NewResourceLogs()
112+
destRL.SetSchemaUrl(srcRL.SchemaUrl())
113+
srcRL.Resource().CopyTo(destRL.Resource())
114+
srcRL.ScopeLogs().RemoveIf(func(srcSL plog.ScopeLogs) bool {
115+
if capacityReached {
116+
return false
117+
}
118+
needToExtract := srcSL.Size() > capacity-destRL.Size()
119+
if needToExtract {
120+
srcSL, capacityReached = extractScopeLogsBasedOnByteSize(srcSL, capacity-destRL.Size())
121+
if srcSL.LogRecords().Len() == 0 {
122+
return false
123+
}
124+
}
125+
srcSL.MoveTo(destRL.ScopeLogs().AppendEmpty())
126+
return !needToExtract
127+
})
128+
return destRL, capacityReached
129+
}
130+
131+
// extractScopeLogs extracts scope logs and returns a new scope logs with the specified number of log records.
132+
func extractScopeLogsBasedOnByteSize(srcSL plog.ScopeLogs, capacity int) (plog.ScopeLogs, bool) {
133+
capacityReached := false
134+
destSL := plog.NewScopeLogs()
135+
destSL.SetSchemaUrl(srcSL.SchemaUrl())
136+
srcSL.Scope().CopyTo(destSL.Scope())
137+
srcSL.LogRecords().RemoveIf(func(srcLR plog.LogRecord) bool {
138+
if capacityReached || srcLR.Size()+destSL.Size() > capacity {
139+
capacityReached = true
140+
return false
141+
}
142+
srcLR.MoveTo(destSL.LogRecords().AppendEmpty())
143+
return true
144+
})
145+
return destSL, capacityReached
146+
}
30147

148+
func (req *logsRequest) mergeSplitBasedOnItemCount(cfg exporterbatcher.MaxSizeConfig, req2 *logsRequest) ([]Request, error) {
31149
var (
32150
res []Request
33151
destReq *logsRequest

exporter/exporterhelper/logs_batch_test.go

+140-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestMergeLogsInvalidInput(t *testing.T) {
3131
require.Error(t, err)
3232
}
3333

34-
func TestMergeSplitLogs(t *testing.T) {
34+
func TestMergeSplitLogsBasedOnItemCount(t *testing.T) {
3535
tests := []struct {
3636
name string
3737
cfg exporterbatcher.MaxSizeConfig
@@ -152,3 +152,142 @@ func TestExtractLogs(t *testing.T) {
152152
assert.Equal(t, 10-i, ld.LogRecordCount())
153153
}
154154
}
155+
156+
func TestMergeSplitLogsBasedOnByteSize(t *testing.T) {
157+
// Magic number is the byte size testdata.GenerateLogs(10)
158+
tests := []struct {
159+
name string
160+
cfg exporterbatcher.MaxSizeConfig
161+
lr1 internal.Request
162+
lr2 internal.Request
163+
expected []*logsRequest
164+
}{
165+
{
166+
name: "both_requests_empty",
167+
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(10).ByteSize()},
168+
lr1: &logsRequest{ld: plog.NewLogs()},
169+
lr2: &logsRequest{ld: plog.NewLogs()},
170+
expected: []*logsRequest{{ld: plog.NewLogs()}},
171+
},
172+
{
173+
name: "first_request_empty",
174+
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(10).ByteSize()},
175+
lr1: &logsRequest{ld: plog.NewLogs()},
176+
lr2: &logsRequest{ld: testdata.GenerateLogs(5)},
177+
expected: []*logsRequest{{ld: testdata.GenerateLogs(5)}},
178+
},
179+
{
180+
name: "first_empty_second_nil",
181+
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(10).ByteSize()},
182+
lr1: &logsRequest{ld: plog.NewLogs()},
183+
lr2: nil,
184+
expected: []*logsRequest{{ld: plog.NewLogs()}},
185+
},
186+
{
187+
name: "merge_only",
188+
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(11).ByteSize()},
189+
lr1: &logsRequest{ld: testdata.GenerateLogs(4)},
190+
lr2: &logsRequest{ld: testdata.GenerateLogs(6)},
191+
expected: []*logsRequest{{ld: func() plog.Logs {
192+
logs := testdata.GenerateLogs(4)
193+
testdata.GenerateLogs(6).ResourceLogs().MoveAndAppendTo(logs.ResourceLogs())
194+
return logs
195+
}()}},
196+
},
197+
{
198+
name: "split_only",
199+
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(4).ByteSize()},
200+
lr1: &logsRequest{ld: plog.NewLogs()},
201+
lr2: &logsRequest{ld: testdata.GenerateLogs(10)},
202+
expected: []*logsRequest{
203+
{ld: testdata.GenerateLogs(4)},
204+
{ld: testdata.GenerateLogs(4)},
205+
{ld: testdata.GenerateLogs(2)},
206+
},
207+
},
208+
{
209+
name: "merge_and_split",
210+
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: (testdata.GenerateLogs(10).ByteSize() + testdata.GenerateLogs(11).ByteSize()) / 2},
211+
lr1: &logsRequest{ld: testdata.GenerateLogs(8)},
212+
lr2: &logsRequest{ld: testdata.GenerateLogs(20)},
213+
expected: []*logsRequest{
214+
{ld: func() plog.Logs {
215+
logs := testdata.GenerateLogs(8)
216+
testdata.GenerateLogs(2).ResourceLogs().MoveAndAppendTo(logs.ResourceLogs())
217+
return logs
218+
}()},
219+
{ld: testdata.GenerateLogs(10)},
220+
{ld: testdata.GenerateLogs(8)},
221+
},
222+
},
223+
{
224+
name: "scope_logs_split",
225+
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(4).ByteSize()},
226+
lr1: &logsRequest{ld: func() plog.Logs {
227+
ld := testdata.GenerateLogs(4)
228+
ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("extra log")
229+
return ld
230+
}()},
231+
lr2: &logsRequest{ld: testdata.GenerateLogs(2)},
232+
expected: []*logsRequest{
233+
{ld: testdata.GenerateLogs(4)},
234+
{ld: func() plog.Logs {
235+
ld := testdata.GenerateLogs(0)
236+
ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty().Body().SetStr("extra log")
237+
testdata.GenerateLogs(2).ResourceLogs().MoveAndAppendTo(ld.ResourceLogs())
238+
return ld
239+
}()},
240+
},
241+
},
242+
}
243+
for _, tt := range tests {
244+
t.Run(tt.name, func(t *testing.T) {
245+
res, err := tt.lr1.MergeSplit(context.Background(), tt.cfg, tt.lr2)
246+
require.NoError(t, err)
247+
assert.Equal(t, len(tt.expected), len(res))
248+
for i, r := range res {
249+
assert.Equal(t, tt.expected[i], r.(*logsRequest))
250+
}
251+
})
252+
}
253+
}
254+
255+
func BenchmarkSplittingBasedOnItemCountManyLogs(b *testing.B) {
256+
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}
257+
for i := 0; i < b.N; i++ {
258+
lr1 := &logsRequest{ld: testdata.GenerateLogs(9)}
259+
for j := 0; j < 1000; j++ {
260+
lr2 := &logsRequest{ld: testdata.GenerateLogs(9)}
261+
lr1.MergeSplit(context.Background(), cfg, lr2)
262+
}
263+
}
264+
}
265+
266+
func BenchmarkSplittingBasedOnByteSizeManyLogs(b *testing.B) {
267+
cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: 1010}
268+
for i := 0; i < b.N; i++ {
269+
lr1 := &logsRequest{ld: testdata.GenerateLogs(9)}
270+
for j := 0; j < 1000; j++ {
271+
lr2 := &logsRequest{ld: testdata.GenerateLogs(9)}
272+
lr1.MergeSplit(context.Background(), cfg, lr2)
273+
}
274+
}
275+
}
276+
277+
func BenchmarkSplittingBasedOnItemCountHugeLog(b *testing.B) {
278+
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}
279+
for i := 0; i < b.N; i++ {
280+
lr1 := &logsRequest{ld: testdata.GenerateLogs(1)}
281+
lr2 := &logsRequest{ld: testdata.GenerateLogs(1000)}
282+
lr1.MergeSplit(context.Background(), cfg, lr2)
283+
}
284+
}
285+
286+
func BenchmarkSplittingBasedOnByteSizeHugeLog(b *testing.B) {
287+
cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: 1010}
288+
for i := 0; i < b.N; i++ {
289+
lr1 := &logsRequest{ld: testdata.GenerateLogs(1)}
290+
lr2 := &logsRequest{ld: testdata.GenerateLogs(1000)}
291+
lr1.MergeSplit(context.Background(), cfg, lr2)
292+
}
293+
}

exporter/exporterhelper/metrics.go

+4
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ func (req *metricsRequest) ItemsCount() int {
6767
return req.md.DataPointCount()
6868
}
6969

70+
func (req *metricsRequest) ByteSize() int {
71+
return req.md.ByteSize()
72+
}
73+
7074
type metricsExporter struct {
7175
*internal.BaseExporter
7276
consumer.Metrics

exporter/exporterhelper/traces.go

+4
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ func (req *tracesRequest) ItemsCount() int {
6767
return req.td.SpanCount()
6868
}
6969

70+
func (req *tracesRequest) ByteSize() int {
71+
return req.td.ByteSize()
72+
}
73+
7074
type tracesExporter struct {
7175
*internal.BaseExporter
7276
consumer.Traces

0 commit comments

Comments
 (0)