Skip to content

Commit 3fc55a6

Browse files
[exporter][batcher] Multi-batch support - Version 2 (#12760)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR introduces two new components * `Partitioner` - an interface for fetching batch key. A partitioner type should implement the function `GetKey()` which returns the batching key. `Partitioner` should be provided to the `queue_bacher` along with `sizer` in `queue_batch::Settings`. * `multi_batcher`. It supports key-based batching by routing the requests to a corresponding `shard_batcher`. Each `shard_batcher` corresponds to a shard described in #12473. <!-- Issue number if applicable --> #### Link to tracking issue #12795 <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.--> --------- Co-authored-by: Dmitry Anoshin <[email protected]>
1 parent 4074b72 commit 3fc55a6

File tree

28 files changed

+383
-92
lines changed

28 files changed

+383
-92
lines changed

cmd/otelcorecol/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ require (
7474
github.com/prometheus/client_model v0.6.2 // indirect
7575
github.com/prometheus/common v0.64.0 // indirect
7676
github.com/prometheus/procfs v0.16.1 // indirect
77+
github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect
7778
github.com/rs/cors v1.11.1 // indirect
7879
github.com/shirou/gopsutil/v4 v4.25.5 // indirect
7980
github.com/spf13/cobra v1.9.1 // indirect

cmd/otelcorecol/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/debugexporter/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ require (
4040
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
4141
github.com/modern-go/reflect2 v1.0.2 // indirect
4242
github.com/pmezard/go-difflib v1.0.0 // indirect
43+
github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect
4344
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
4445
go.opentelemetry.io/collector/config/configretry v1.33.0 // indirect
4546
go.opentelemetry.io/collector/consumer/consumererror v0.127.0 // indirect

exporter/debugexporter/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/exporterhelper/internal/base_exporter.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,12 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende
9090

9191
if be.queueCfg.Enabled || be.batcherCfg.Enabled {
9292
qSet := queuebatch.Settings[request.Request]{
93-
Signal: signal,
94-
ID: set.ID,
95-
Telemetry: set.TelemetrySettings,
96-
Encoding: be.queueBatchSettings.Encoding,
97-
Sizers: be.queueBatchSettings.Sizers,
93+
Signal: signal,
94+
ID: set.ID,
95+
Telemetry: set.TelemetrySettings,
96+
Encoding: be.queueBatchSettings.Encoding,
97+
Sizers: be.queueBatchSettings.Sizers,
98+
Partitioner: be.queueBatchSettings.Partitioner,
9899
}
99100
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
100101
if err != nil {
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
5+
import (
6+
"context"
7+
"sync"
8+
9+
"github.com/puzpuzpuz/xsync/v3"
10+
11+
"go.opentelemetry.io/collector/component"
12+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
13+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
14+
)
15+
16+
type multiBatcher struct {
17+
cfg BatchConfig
18+
workerPool chan struct{}
19+
sizerType request.SizerType
20+
sizer request.Sizer[request.Request]
21+
partitioner Partitioner[request.Request]
22+
consumeFunc sender.SendFunc[request.Request]
23+
24+
singleShard *shardBatcher
25+
shards *xsync.MapOf[string, *shardBatcher]
26+
}
27+
28+
var _ Batcher[request.Request] = (*multiBatcher)(nil)
29+
30+
func newMultiBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request]) *multiBatcher {
31+
var workerPool chan struct{}
32+
if bSet.maxWorkers != 0 {
33+
workerPool = make(chan struct{}, bSet.maxWorkers)
34+
for i := 0; i < bSet.maxWorkers; i++ {
35+
workerPool <- struct{}{}
36+
}
37+
}
38+
mb := &multiBatcher{
39+
cfg: bCfg,
40+
workerPool: workerPool,
41+
sizerType: bSet.sizerType,
42+
sizer: bSet.sizer,
43+
partitioner: bSet.partitioner,
44+
consumeFunc: bSet.next,
45+
}
46+
47+
if bSet.partitioner == nil {
48+
mb.singleShard = &shardBatcher{
49+
cfg: bCfg,
50+
workerPool: mb.workerPool,
51+
sizerType: bSet.sizerType,
52+
sizer: bSet.sizer,
53+
consumeFunc: bSet.next,
54+
stopWG: sync.WaitGroup{},
55+
shutdownCh: make(chan struct{}, 1),
56+
}
57+
} else {
58+
mb.shards = xsync.NewMapOf[string, *shardBatcher]()
59+
}
60+
return mb
61+
}
62+
63+
func (mb *multiBatcher) getShard(ctx context.Context, req request.Request) *shardBatcher {
64+
if mb.singleShard != nil {
65+
return mb.singleShard
66+
}
67+
68+
key := mb.partitioner.GetKey(ctx, req)
69+
result, _ := mb.shards.LoadOrCompute(key, func() *shardBatcher {
70+
s := &shardBatcher{
71+
cfg: mb.cfg,
72+
workerPool: mb.workerPool,
73+
sizerType: mb.sizerType,
74+
sizer: mb.sizer,
75+
consumeFunc: mb.consumeFunc,
76+
stopWG: sync.WaitGroup{},
77+
shutdownCh: make(chan struct{}, 1),
78+
}
79+
s.start(ctx, nil)
80+
return s
81+
})
82+
return result
83+
}
84+
85+
func (mb *multiBatcher) Start(ctx context.Context, host component.Host) error {
86+
if mb.singleShard != nil {
87+
mb.singleShard.start(ctx, host)
88+
}
89+
return nil
90+
}
91+
92+
func (mb *multiBatcher) Consume(ctx context.Context, req request.Request, done Done) {
93+
shard := mb.getShard(ctx, req)
94+
shard.Consume(ctx, req, done)
95+
}
96+
97+
func (mb *multiBatcher) Shutdown(ctx context.Context) error {
98+
if mb.singleShard != nil {
99+
mb.singleShard.shutdown(ctx)
100+
return nil
101+
}
102+
103+
var wg sync.WaitGroup
104+
wg.Add(mb.shards.Size())
105+
mb.shards.Range(func(_ string, shard *shardBatcher) bool {
106+
go func() {
107+
shard.shutdown(ctx)
108+
wg.Done()
109+
}()
110+
return true
111+
})
112+
wg.Wait()
113+
return nil
114+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queuebatch
5+
6+
import (
7+
"context"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
14+
"go.opentelemetry.io/collector/component/componenttest"
15+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
16+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
17+
)
18+
19+
func TestMultiBatcher_NoTimeout(t *testing.T) {
20+
cfg := BatchConfig{
21+
FlushTimeout: 0,
22+
MinSize: 10,
23+
}
24+
sink := requesttest.NewSink()
25+
26+
type partitionKey struct{}
27+
28+
ba := newMultiBatcher(cfg, batcherSettings[request.Request]{
29+
sizerType: request.SizerTypeItems,
30+
sizer: request.NewItemsSizer(),
31+
partitioner: NewPartitioner(func(ctx context.Context, _ request.Request) string {
32+
return ctx.Value(partitionKey{}).(string)
33+
}),
34+
next: sink.Export,
35+
maxWorkers: 1,
36+
})
37+
38+
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
39+
t.Cleanup(func() {
40+
require.NoError(t, ba.Shutdown(context.Background()))
41+
})
42+
43+
done := newFakeDone()
44+
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p1"), &requesttest.FakeRequest{Items: 8}, done)
45+
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p2"), &requesttest.FakeRequest{Items: 6}, done)
46+
47+
// Neither batch should be flushed since they haven't reached min threshold.
48+
assert.Equal(t, 0, sink.RequestsCount())
49+
assert.Equal(t, 0, sink.ItemsCount())
50+
51+
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p1"), &requesttest.FakeRequest{Items: 8}, done)
52+
53+
assert.Eventually(t, func() bool {
54+
return sink.RequestsCount() == 1 && sink.ItemsCount() == 16
55+
}, 500*time.Millisecond, 10*time.Millisecond)
56+
57+
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p2"), &requesttest.FakeRequest{Items: 6}, done)
58+
59+
assert.Eventually(t, func() bool {
60+
return sink.RequestsCount() == 2 && sink.ItemsCount() == 28
61+
}, 500*time.Millisecond, 10*time.Millisecond)
62+
63+
// Check that done callback is called for the right amount of times.
64+
assert.EqualValues(t, 0, done.errors.Load())
65+
assert.EqualValues(t, 4, done.success.Load())
66+
67+
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
68+
}
69+
70+
func TestMultiBatcher_Timeout(t *testing.T) {
71+
cfg := BatchConfig{
72+
FlushTimeout: 100 * time.Millisecond,
73+
MinSize: 100,
74+
}
75+
sink := requesttest.NewSink()
76+
77+
type partitionKey struct{}
78+
79+
ba := newMultiBatcher(cfg, batcherSettings[request.Request]{
80+
sizerType: request.SizerTypeItems,
81+
sizer: request.NewItemsSizer(),
82+
partitioner: NewPartitioner(func(ctx context.Context, _ request.Request) string {
83+
return ctx.Value(partitionKey{}).(string)
84+
}),
85+
next: sink.Export,
86+
maxWorkers: 1,
87+
})
88+
89+
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
90+
t.Cleanup(func() {
91+
require.NoError(t, ba.Shutdown(context.Background()))
92+
})
93+
94+
done := newFakeDone()
95+
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p1"), &requesttest.FakeRequest{Items: 8}, done)
96+
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p2"), &requesttest.FakeRequest{Items: 6}, done)
97+
98+
// Neither batch should be flushed since they haven't reached min threshold.
99+
assert.Equal(t, 0, sink.RequestsCount())
100+
assert.Equal(t, 0, sink.ItemsCount())
101+
102+
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p1"), &requesttest.FakeRequest{Items: 8}, done)
103+
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p2"), &requesttest.FakeRequest{Items: 6}, done)
104+
105+
assert.Eventually(t, func() bool {
106+
return sink.RequestsCount() == 2 && sink.ItemsCount() == 28
107+
}, 1*time.Second, 10*time.Millisecond)
108+
// Check that done callback is called for the right amount of times.
109+
assert.EqualValues(t, 0, done.errors.Load())
110+
assert.EqualValues(t, 4, done.success.Load())
111+
112+
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
113+
}

exporter/exporterhelper/internal/queuebatch/queue_batch.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@ import (
1616

1717
// Settings defines settings for creating a QueueBatch.
1818
type Settings[T any] struct {
19-
Signal pipeline.Signal
20-
ID component.ID
21-
Telemetry component.TelemetrySettings
22-
Encoding Encoding[T]
23-
Sizers map[request.SizerType]request.Sizer[T]
19+
Signal pipeline.Signal
20+
ID component.ID
21+
Telemetry component.TelemetrySettings
22+
Encoding Encoding[T]
23+
Sizers map[request.SizerType]request.Sizer[T]
24+
Partitioner Partitioner[T]
2425
}
2526

2627
type QueueBatch struct {
@@ -65,18 +66,20 @@ func newQueueBatch(
6566
if cfg.Batch != nil {
6667
if oldBatcher {
6768
// If user configures the old batcher we only can support "items" sizer.
68-
b = newDefaultBatcher(*cfg.Batch, batcherSettings[request.Request]{
69-
sizerType: request.SizerTypeItems,
70-
sizer: request.NewItemsSizer(),
71-
next: next,
72-
maxWorkers: cfg.NumConsumers,
69+
b = newMultiBatcher(*cfg.Batch, batcherSettings[request.Request]{
70+
sizerType: request.SizerTypeItems,
71+
sizer: request.NewItemsSizer(),
72+
partitioner: set.Partitioner,
73+
next: next,
74+
maxWorkers: cfg.NumConsumers,
7375
})
7476
} else {
75-
b = newDefaultBatcher(*cfg.Batch, batcherSettings[request.Request]{
76-
sizerType: cfg.Sizer,
77-
sizer: sizer,
78-
next: next,
79-
maxWorkers: cfg.NumConsumers,
77+
b = newMultiBatcher(*cfg.Batch, batcherSettings[request.Request]{
78+
sizerType: cfg.Sizer,
79+
sizer: sizer,
80+
partitioner: set.Partitioner,
81+
next: next,
82+
maxWorkers: cfg.NumConsumers,
8083
})
8184
}
8285
// Keep the number of queue consumers to 1 if batching is enabled until we support sharding as described in

exporter/exporterhelper/internal/queuebatch/queue_batch_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,48 @@ func TestQueueBatch_MergeOrSplit(t *testing.T) {
404404
require.NoError(t, qb.Shutdown(context.Background()))
405405
}
406406

407+
func TestQueueBatch_MergeOrSplit_Multibatch(t *testing.T) {
408+
sink := requesttest.NewSink()
409+
cfg := newTestConfig()
410+
cfg.Batch = &BatchConfig{
411+
FlushTimeout: 100 * time.Millisecond,
412+
MinSize: 10,
413+
}
414+
415+
type partitionKey struct{}
416+
set := newFakeRequestSettings()
417+
set.Partitioner = NewPartitioner(func(ctx context.Context, _ request.Request) string {
418+
key := ctx.Value(partitionKey{}).(string)
419+
return key
420+
})
421+
422+
qb, err := NewQueueBatch(set, cfg, sink.Export)
423+
require.NoError(t, err)
424+
require.NoError(t, qb.Start(context.Background(), componenttest.NewNopHost()))
425+
426+
// should be sent right away by reaching the minimum items size.
427+
require.NoError(t, qb.Send(context.WithValue(context.Background(), partitionKey{}, "p1"), &requesttest.FakeRequest{Items: 8}))
428+
require.NoError(t, qb.Send(context.WithValue(context.Background(), partitionKey{}, "p2"), &requesttest.FakeRequest{Items: 6}))
429+
430+
// Neither batch should be flushed since they haven't reached min threshold.
431+
assert.Equal(t, 0, sink.RequestsCount())
432+
assert.Equal(t, 0, sink.ItemsCount())
433+
434+
require.NoError(t, qb.Send(context.WithValue(context.Background(), partitionKey{}, "p1"), &requesttest.FakeRequest{Items: 8}))
435+
436+
assert.Eventually(t, func() bool {
437+
return sink.RequestsCount() == 1 && sink.ItemsCount() == 16
438+
}, 500*time.Millisecond, 10*time.Millisecond)
439+
440+
require.NoError(t, qb.Send(context.WithValue(context.Background(), partitionKey{}, "p2"), &requesttest.FakeRequest{Items: 6}))
441+
442+
assert.Eventually(t, func() bool {
443+
return sink.RequestsCount() == 2 && sink.ItemsCount() == 28
444+
}, 500*time.Millisecond, 10*time.Millisecond)
445+
446+
require.NoError(t, qb.Shutdown(context.Background()))
447+
}
448+
407449
func TestQueueBatch_Shutdown(t *testing.T) {
408450
sink := requesttest.NewSink()
409451
qb, err := NewQueueBatch(newFakeRequestSettings(), newTestConfig(), sink.Export)

0 commit comments

Comments
 (0)