Skip to content

Commit 812f050

Browse files
committed
POC byte based batching
1 parent 306c939 commit 812f050

File tree

62 files changed

+503
-6
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+503
-6
lines changed

exporter/exporterbatcher/config.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type MinSizeConfig struct {
3131
// sent regardless of the timeout. There is no guarantee that the batch size always greater than this value.
3232
// This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored.
3333
MinSizeItems int `mapstructure:"min_size_items"`
34+
MinSizeBytes int `mapstructure:"min_size_bytes"`
3435
}
3536

3637
// MaxSizeConfig defines the configuration for the maximum number of items in a batch.
@@ -41,18 +42,32 @@ type MaxSizeConfig struct {
4142
// If the batch size exceeds this value, it will be broken up into smaller batches if possible.
4243
// Setting this value to zero disables the maximum size limit.
4344
MaxSizeItems int `mapstructure:"max_size_items"`
45+
MaxSizeBytes int `mapstructure:"max_size_bytes"`
4446
}
4547

4648
func (c Config) Validate() error {
49+
if c.MinSizeBytes != 0 && c.MinSizeItems != 0 || c.MinSizeBytes != 0 && c.MaxSizeItems != 0 || c.MinSizeItems != 0 && c.MaxSizeBytes != 0 {
50+
return errors.New("size limit and bytes limit cannot be specified at the same time")
51+
}
52+
4753
if c.MinSizeItems < 0 {
4854
return errors.New("min_size_items must be greater than or equal to zero")
4955
}
56+
if c.MinSizeBytes < 0 {
57+
return errors.New("min_size_bytes must be greater than or equal to zero")
58+
}
5059
if c.MaxSizeItems < 0 {
5160
return errors.New("max_size_items must be greater than or equal to zero")
5261
}
62+
if c.MaxSizeBytes < 0 {
63+
return errors.New("max_size_bytes must be greater than or equal to zero")
64+
}
5365
if c.MaxSizeItems != 0 && c.MaxSizeItems < c.MinSizeItems {
5466
return errors.New("max_size_items must be greater than or equal to min_size_items")
5567
}
68+
if c.MaxSizeBytes != 0 && c.MaxSizeBytes < c.MinSizeBytes {
69+
return errors.New("max_size_bytes must be greater than or equal to min_size_bytes")
70+
}
5671
if c.FlushTimeout <= 0 {
5772
return errors.New("timeout must be greater than zero")
5873
}

exporter/exporterhelper/internal/request.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ func (r *fakeRequest) ItemsCount() int {
5656
return r.items
5757
}
5858

59+
func (r *fakeRequest) ByteSize() int {
60+
return r.items
61+
}
62+
5963
func (r *fakeRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 internal.Request) ([]internal.Request, error) {
6064
if r.mergeErr != nil {
6165
return nil, r.mergeErr

exporter/exporterhelper/internal/retry_sender_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,10 @@ func (mer *mockErrorRequest) ItemsCount() int {
418418
return 7
419419
}
420420

421+
func (mer *mockErrorRequest) ByteSize() int {
422+
return 7
423+
}
424+
421425
func (mer *mockErrorRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) {
422426
return nil, nil
423427
}
@@ -464,6 +468,10 @@ func (m *mockRequest) ItemsCount() int {
464468
return m.cnt
465469
}
466470

471+
func (m *mockRequest) ByteSize() int {
472+
return m.cnt
473+
}
474+
467475
func (m *mockRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) {
468476
return nil, nil
469477
}

exporter/exporterhelper/logs.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ func (req *logsRequest) ItemsCount() int {
6767
return req.ld.LogRecordCount()
6868
}
6969

70+
func (req *logsRequest) ByteSize() int {
71+
return req.ld.ByteSize()
72+
}
73+
7074
type logsExporter struct {
7175
*internal.BaseExporter
7276
consumer.Logs

exporter/exporterhelper/logs_batch.go

Lines changed: 119 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,128 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz
2323
}
2424
}
2525

26-
if cfg.MaxSizeItems == 0 {
27-
req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs())
26+
if cfg.MaxSizeItems == 0 && cfg.MaxSizeBytes == 0 {
27+
if req2 != nil {
28+
req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs())
29+
}
2830
return []Request{req}, nil
2931
}
32+
if cfg.MaxSizeBytes > 0 {
33+
return req.mergeSplitBasedOnByteSize(cfg, req2)
34+
}
35+
return req.mergeSplitBasedOnItemCount(cfg, req2)
36+
}
37+
38+
func (req *logsRequest) mergeSplitBasedOnByteSize(cfg exporterbatcher.MaxSizeConfig, req2 *logsRequest) ([]Request, error) {
39+
var (
40+
res []Request
41+
destReq *logsRequest
42+
capacityLeft = cfg.MaxSizeBytes
43+
)
44+
for _, srcReq := range []*logsRequest{req, req2} {
45+
if srcReq == nil {
46+
continue
47+
}
48+
49+
ByteSize := srcReq.ld.ByteSize()
50+
if ByteSize <= capacityLeft {
51+
if destReq == nil {
52+
destReq = srcReq
53+
} else {
54+
srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
55+
}
56+
capacityLeft -= ByteSize
57+
continue
58+
}
59+
60+
for {
61+
extractedLogs, capacityReached := extractLogsBasedOnByteSize(srcReq.ld, capacityLeft)
62+
if extractedLogs.LogRecordCount() == 0 {
63+
break
64+
}
65+
capacityLeft -= extractedLogs.ByteSize()
66+
if destReq == nil {
67+
destReq = &logsRequest{ld: extractedLogs, pusher: srcReq.pusher}
68+
} else {
69+
extractedLogs.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
70+
}
71+
// Create new batch once capacity is reached.
72+
if capacityReached {
73+
res = append(res, destReq)
74+
destReq = nil
75+
capacityLeft = cfg.MaxSizeBytes
76+
}
77+
}
78+
}
79+
80+
if destReq != nil {
81+
res = append(res, destReq)
82+
}
83+
return res, nil
84+
}
85+
86+
// extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records.
87+
func extractLogsBasedOnByteSize(srcLogs plog.Logs, capacity int) (plog.Logs, bool) {
88+
capacityReached := false
89+
destLogs := plog.NewLogs()
90+
srcLogs.ResourceLogs().RemoveIf(func(srcRL plog.ResourceLogs) bool {
91+
if capacityReached {
92+
return false
93+
}
94+
needToExtract := srcRL.Size()+destLogs.ByteSize() > capacity
95+
if needToExtract {
96+
srcRL, capacityReached = extractResourceLogsBasedOnByteSize(srcRL, capacity-destLogs.ByteSize())
97+
if srcRL.ScopeLogs().Len() == 0 {
98+
return false
99+
}
100+
}
101+
srcRL.MoveTo(destLogs.ResourceLogs().AppendEmpty())
102+
return !needToExtract
103+
})
104+
return destLogs, capacityReached
105+
}
106+
107+
// extractResourceLogs extracts resource logs and returns a new resource logs with the specified number of log records.
108+
func extractResourceLogsBasedOnByteSize(srcRL plog.ResourceLogs, capacity int) (plog.ResourceLogs, bool) {
109+
capacityReached := false
110+
destRL := plog.NewResourceLogs()
111+
destRL.SetSchemaUrl(srcRL.SchemaUrl())
112+
srcRL.Resource().CopyTo(destRL.Resource())
113+
srcRL.ScopeLogs().RemoveIf(func(srcSL plog.ScopeLogs) bool {
114+
if capacityReached {
115+
return false
116+
}
117+
needToExtract := srcSL.Size()+destRL.Size() > capacity
118+
if needToExtract {
119+
srcSL, capacityReached = extractScopeLogsBasedOnByteSize(srcSL, capacity-destRL.Size())
120+
if srcSL.LogRecords().Len() == 0 {
121+
return false
122+
}
123+
}
124+
srcSL.MoveTo(destRL.ScopeLogs().AppendEmpty())
125+
return !needToExtract
126+
})
127+
return destRL, capacityReached
128+
}
129+
130+
// extractScopeLogs extracts scope logs and returns a new scope logs with the specified number of log records.
131+
func extractScopeLogsBasedOnByteSize(srcSL plog.ScopeLogs, capacity int) (plog.ScopeLogs, bool) {
132+
capacityReached := false
133+
destSL := plog.NewScopeLogs()
134+
destSL.SetSchemaUrl(srcSL.SchemaUrl())
135+
srcSL.Scope().CopyTo(destSL.Scope())
136+
srcSL.LogRecords().RemoveIf(func(srcLR plog.LogRecord) bool {
137+
if capacityReached || srcLR.Size()+destSL.Size() > capacity {
138+
capacityReached = true
139+
return false
140+
}
141+
srcLR.MoveTo(destSL.LogRecords().AppendEmpty())
142+
return true
143+
})
144+
return destSL, capacityReached
145+
}
30146

147+
func (req *logsRequest) mergeSplitBasedOnItemCount(cfg exporterbatcher.MaxSizeConfig, req2 *logsRequest) ([]Request, error) {
31148
var (
32149
res []Request
33150
destReq *logsRequest

exporter/exporterhelper/logs_batch_test.go

Lines changed: 120 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestMergeLogsInvalidInput(t *testing.T) {
3131
require.Error(t, err)
3232
}
3333

34-
func TestMergeSplitLogs(t *testing.T) {
34+
func TestMergeSplitLogsBasedOnItemCount(t *testing.T) {
3535
tests := []struct {
3636
name string
3737
cfg exporterbatcher.MaxSizeConfig
@@ -152,3 +152,122 @@ func TestExtractLogs(t *testing.T) {
152152
assert.Equal(t, 10-i, ld.LogRecordCount())
153153
}
154154
}
155+
156+
func TestMergeSplitLogsBasedOnByteSize(t *testing.T) {
157+
// Magic number is the byte size testdata.GenerateLogs(10)
158+
tests := []struct {
159+
name string
160+
cfg exporterbatcher.MaxSizeConfig
161+
lr1 internal.Request
162+
lr2 internal.Request
163+
expected []*logsRequest
164+
}{
165+
{
166+
name: "both_requests_empty",
167+
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(10).ByteSize()},
168+
lr1: &logsRequest{ld: plog.NewLogs()},
169+
lr2: &logsRequest{ld: plog.NewLogs()},
170+
expected: []*logsRequest{{ld: plog.NewLogs()}},
171+
},
172+
{
173+
name: "first_request_empty",
174+
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(10).ByteSize()},
175+
lr1: &logsRequest{ld: plog.NewLogs()},
176+
lr2: &logsRequest{ld: testdata.GenerateLogs(5)},
177+
expected: []*logsRequest{{ld: testdata.GenerateLogs(5)}},
178+
},
179+
{
180+
name: "first_empty_second_nil",
181+
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(10).ByteSize()},
182+
lr1: &logsRequest{ld: plog.NewLogs()},
183+
lr2: nil,
184+
expected: []*logsRequest{{ld: plog.NewLogs()}},
185+
},
186+
{
187+
name: "merge_only",
188+
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(11).ByteSize()},
189+
lr1: &logsRequest{ld: testdata.GenerateLogs(4)},
190+
lr2: &logsRequest{ld: testdata.GenerateLogs(6)},
191+
expected: []*logsRequest{{ld: func() plog.Logs {
192+
logs := testdata.GenerateLogs(4)
193+
testdata.GenerateLogs(6).ResourceLogs().MoveAndAppendTo(logs.ResourceLogs())
194+
return logs
195+
}()}},
196+
},
197+
{
198+
name: "split_only",
199+
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(4).ByteSize()},
200+
lr1: &logsRequest{ld: plog.NewLogs()},
201+
lr2: &logsRequest{ld: testdata.GenerateLogs(10)},
202+
expected: []*logsRequest{
203+
{ld: testdata.GenerateLogs(4)},
204+
{ld: testdata.GenerateLogs(4)},
205+
{ld: testdata.GenerateLogs(2)},
206+
},
207+
},
208+
{
209+
name: "merge_and_split",
210+
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: (testdata.GenerateLogs(10).ByteSize() + testdata.GenerateLogs(11).ByteSize()) / 2},
211+
lr1: &logsRequest{ld: testdata.GenerateLogs(8)},
212+
lr2: &logsRequest{ld: testdata.GenerateLogs(20)},
213+
expected: []*logsRequest{
214+
{ld: func() plog.Logs {
215+
logs := testdata.GenerateLogs(8)
216+
testdata.GenerateLogs(2).ResourceLogs().MoveAndAppendTo(logs.ResourceLogs())
217+
return logs
218+
}()},
219+
{ld: testdata.GenerateLogs(10)},
220+
{ld: testdata.GenerateLogs(8)},
221+
},
222+
},
223+
{
224+
name: "scope_logs_split",
225+
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(4).ByteSize()},
226+
lr1: &logsRequest{ld: func() plog.Logs {
227+
ld := testdata.GenerateLogs(4)
228+
ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("extra log")
229+
return ld
230+
}()},
231+
lr2: &logsRequest{ld: testdata.GenerateLogs(2)},
232+
expected: []*logsRequest{
233+
{ld: testdata.GenerateLogs(4)},
234+
{ld: func() plog.Logs {
235+
ld := testdata.GenerateLogs(0)
236+
ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty().Body().SetStr("extra log")
237+
testdata.GenerateLogs(2).ResourceLogs().MoveAndAppendTo(ld.ResourceLogs())
238+
return ld
239+
}()},
240+
},
241+
},
242+
}
243+
for _, tt := range tests {
244+
t.Run(tt.name, func(t *testing.T) {
245+
res, err := tt.lr1.MergeSplit(context.Background(), tt.cfg, tt.lr2)
246+
require.NoError(t, err)
247+
assert.Equal(t, len(tt.expected), len(res))
248+
for i, r := range res {
249+
assert.Equal(t, tt.expected[i], r.(*logsRequest))
250+
}
251+
})
252+
}
253+
}
254+
255+
func BenchmarkSplittingBasedOnItemCount(b *testing.B) {
256+
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}
257+
for i := 0; i < b.N; i++ {
258+
lr1 := &logsRequest{ld: testdata.GenerateLogs(1000)}
259+
lr2 := &logsRequest{ld: testdata.GenerateLogs(2000)}
260+
_, err := lr1.MergeSplit(context.Background(), cfg, lr2)
261+
require.NoError(b, err)
262+
}
263+
}
264+
265+
func BenchmarkSplittingBasedOnByteSize(b *testing.B) {
266+
cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(10).ByteSize()}
267+
for i := 0; i < b.N; i++ {
268+
lr1 := &logsRequest{ld: testdata.GenerateLogs(1000)}
269+
lr2 := &logsRequest{ld: testdata.GenerateLogs(2000)}
270+
_, err := lr1.MergeSplit(context.Background(), cfg, lr2)
271+
require.NoError(b, err)
272+
}
273+
}

exporter/exporterhelper/metrics.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ func (req *metricsRequest) ItemsCount() int {
6767
return req.md.DataPointCount()
6868
}
6969

70+
func (req *metricsRequest) ByteSize() int {
71+
return req.md.ByteSize()
72+
}
73+
7074
type metricsExporter struct {
7175
*internal.BaseExporter
7276
consumer.Metrics

exporter/exporterhelper/traces.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ func (req *tracesRequest) ItemsCount() int {
6767
return req.td.SpanCount()
6868
}
6969

70+
func (req *tracesRequest) ByteSize() int {
71+
return req.td.ByteSize()
72+
}
73+
7074
type tracesExporter struct {
7175
*internal.BaseExporter
7276
consumer.Traces

exporter/exporterhelper/xexporterhelper/profiles.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ func (req *profilesRequest) ItemsCount() int {
6969
return req.pd.SampleCount()
7070
}
7171

72+
func (req *profilesRequest) ByteSize() int {
73+
return req.pd.SampleCount()
74+
}
75+
7276
type profileExporter struct {
7377
*internal.BaseExporter
7478
xconsumer.Profiles

exporter/exporterhelper/xexporterhelper/profiles_batch_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,10 @@ func (req *dummyRequest) ItemsCount() int {
155155
return 1
156156
}
157157

158+
func (req *dummyRequest) ByteSize() int {
159+
return 1
160+
}
161+
158162
func (req *dummyRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ exporterhelper.Request) (
159163
[]exporterhelper.Request, error,
160164
) {

0 commit comments

Comments
 (0)