Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter] Disable the API to pass in configurations using a callback that operates on batch sender #11448

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/disable-batch-option.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Disables setting batch option to batch sender directly.

# One or more tracking issues or pull requests related to the change
issues: [10368]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
20 changes: 10 additions & 10 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,19 @@
return internal.WithCapabilities(capabilities)
}

// BatcherOption apply changes to batcher sender.
type BatcherOption = internal.BatcherOption

// WithRequestBatchFuncs sets the functions for merging and splitting batches for an exporter built for custom request types.
func WithRequestBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], msf exporterbatcher.BatchMergeSplitFunc[Request]) BatcherOption {
return internal.WithRequestBatchFuncs(mf, msf)
}

// WithBatcher enables batching for an exporter based on custom request types.
// For now, it can be used only with the New[Traces|Metrics|Logs]RequestExporter exporter helpers and
// WithRequestBatchFuncs provided.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithBatcher(cfg exporterbatcher.Config, opts ...BatcherOption) Option {
return internal.WithBatcher(cfg, opts...)
func WithBatcher(cfg exporterbatcher.Config) Option {
return internal.WithBatcher(cfg)

Check warning on line 70 in exporter/exporterhelper/common.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/common.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}

// WithBatchFuncs enables setting custom batch merge functions.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request],
msf exporterbatcher.BatchMergeSplitFunc[Request]) Option {
return internal.WithBatchFuncs(mf, msf)

Check warning on line 78 in exporter/exporterhelper/common.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/common.go#L77-L78

Added lines #L77 - L78 were not covered by tests
}
25 changes: 1 addition & 24 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ type ObsrepSenderFactory = func(obsrep *ObsReport) RequestSender
// Option apply changes to BaseExporter.
type Option func(*BaseExporter) error

// BatcherOption apply changes to batcher sender.
type BatcherOption func(*BatchSender) error

type BaseExporter struct {
component.StartFunc
component.ShutdownFunc
Expand Down Expand Up @@ -64,7 +61,6 @@ type BaseExporter struct {
queueCfg exporterqueue.Config
queueFactory exporterqueue.Factory[internal.Request]
BatcherCfg exporterbatcher.Config
BatcherOpts []BatcherOption
}

func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSenderFactory, options ...Option) (*BaseExporter, error) {
Expand Down Expand Up @@ -109,9 +105,6 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe

if be.BatcherCfg.Enabled {
bs := NewBatchSender(be.BatcherCfg, be.Set, be.BatchMergeFunc, be.BatchMergeSplitfunc)
for _, opt := range be.BatcherOpts {
err = multierr.Append(err, opt(bs))
}
if bs.mergeFunc == nil || bs.mergeSplitFunc == nil {
err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters"))
}
Expand Down Expand Up @@ -275,30 +268,14 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
}
}

// WithRequestBatchFuncs sets the functions for merging and splitting batches for an exporter built for custom request types.
func WithRequestBatchFuncs(mf exporterbatcher.BatchMergeFunc[internal.Request], msf exporterbatcher.BatchMergeSplitFunc[internal.Request]) BatcherOption {
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete also BatcherOption

return func(bs *BatchSender) error {
if mf == nil || msf == nil {
return fmt.Errorf("WithRequestBatchFuncs must be provided with non-nil functions")
}
if bs.mergeFunc != nil || bs.mergeSplitFunc != nil {
return fmt.Errorf("WithRequestBatchFuncs can only be used once with request-based exporters")
}
bs.mergeFunc = mf
bs.mergeSplitFunc = msf
return nil
}
}

// WithBatcher enables batching for an exporter based on custom request types.
// For now, it can be used only with the New[Traces|Metrics|Logs]RequestExporter exporter helpers and
// WithRequestBatchFuncs provided.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithBatcher(cfg exporterbatcher.Config, opts ...BatcherOption) Option {
func WithBatcher(cfg exporterbatcher.Config) Option {
return func(o *BaseExporter) error {
o.BatcherCfg = cfg
o.BatcherOpts = opts
return nil
}
}
Expand Down
71 changes: 36 additions & 35 deletions exporter/exporterhelper/internal/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ func TestBatchSender_Merge(t *testing.T) {
}{
{
name: "split_disabled",
batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)),
batcherOption: WithBatcher(cfg),
},
{
name: "split_high_limit",
batcherOption: func() Option {
c := cfg
c.MaxSizeItems = 1000
return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))
return WithBatcher(c)
}(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
be := queueBatchExporter(t, tt.batcherOption)
be := queueBatchExporter(t, tt.batcherOption, WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))

require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
Expand Down Expand Up @@ -94,30 +94,30 @@ func TestBatchSender_BatchExportError(t *testing.T) {
}{
{
name: "merge_only",
batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)),
batcherOption: WithBatcher(cfg),
},
{
name: "merge_without_split_triggered",
batcherOption: func() Option {
c := cfg
c.MaxSizeItems = 200
return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))
return WithBatcher(c)
}(),
},
{
name: "merge_with_split_triggered",
batcherOption: func() Option {
c := cfg
c.MaxSizeItems = 20
return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))
return WithBatcher(c)
}(),
expectedRequests: 1,
expectedItems: 20,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
be := queueBatchExporter(t, tt.batcherOption)
be := queueBatchExporter(t, tt.batcherOption, WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))

require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
cfg.MinSizeItems = 5
cfg.MaxSizeItems = 10
cfg.FlushTimeout = 100 * time.Millisecond
be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
be := queueBatchExporter(t, WithBatcher(cfg), WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))

require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
func TestBatchSender_Shutdown(t *testing.T) {
batchCfg := exporterbatcher.NewDefaultConfig()
batchCfg.MinSizeItems = 10
be := queueBatchExporter(t, WithBatcher(batchCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
be := queueBatchExporter(t, WithBatcher(batchCfg), WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))

require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

Expand All @@ -212,7 +212,8 @@ func TestBatchSender_Disabled(t *testing.T) {
cfg.Enabled = false
cfg.MaxSizeItems = 5
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(cfg))
require.NotNil(t, be)
require.NoError(t, err)

Expand Down Expand Up @@ -241,7 +242,7 @@ func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.FlushTimeout = 50 * time.Millisecond
cfg.MaxSizeItems = 20
be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc)))
be := queueBatchExporter(t, WithBatcher(cfg), WithBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc))

require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
Expand All @@ -260,8 +261,8 @@ func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) {

func TestBatchSender_PostShutdown(t *testing.T) {
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc,
fakeBatchMergeSplitFunc)))
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(exporterbatcher.NewDefaultConfig()))
require.NotNil(t, be)
require.NoError(t, err)
assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -322,7 +323,8 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {
qCfg := exporterqueue.NewDefaultConfig()
qCfg.NumConsumers = 2
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(tt.batcherCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)),
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(tt.batcherCfg),
WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]()))
require.NotNil(t, be)
require.NoError(t, err)
Expand Down Expand Up @@ -377,7 +379,8 @@ func TestBatchSender_BatchBlocking(t *testing.T) {
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.MinSizeItems = 3
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(bCfg))
require.NotNil(t, be)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -407,7 +410,8 @@ func TestBatchSender_BatchCancelled(t *testing.T) {
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.MinSizeItems = 2
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(bCfg))
require.NotNil(t, be)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -442,7 +446,8 @@ func TestBatchSender_DrainActiveRequests(t *testing.T) {
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.MinSizeItems = 2
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(bCfg))
require.NotNil(t, be)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -487,18 +492,9 @@ func TestBatchSender_WithBatcherOption(t *testing.T) {
opts: []Option{WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(exporterbatcher.NewDefaultConfig())},
expectedErr: false,
},
{
name: "funcs_set_twice",
opts: []Option{
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc,
fakeBatchMergeSplitFunc)),
},
expectedErr: true,
},
{
name: "nil_funcs",
opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(nil, nil))},
opts: []Option{WithBatchFuncs(nil, nil), WithBatcher(exporterbatcher.NewDefaultConfig())},
expectedErr: true,
},
}
Expand All @@ -518,7 +514,8 @@ func TestBatchSender_WithBatcherOption(t *testing.T) {

func TestBatchSender_UnstartedShutdown(t *testing.T) {
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(exporterbatcher.NewDefaultConfig()))
require.NoError(t, err)

err = be.Shutdown(context.Background())
Expand All @@ -542,7 +539,8 @@ func TestBatchSender_ShutdownDeadlock(t *testing.T) {
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc)))
WithBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(bCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -578,7 +576,8 @@ func TestBatchSenderWithTimeout(t *testing.T) {
tCfg := NewDefaultTimeoutConfig()
tCfg.Timeout = 50 * time.Millisecond
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)),
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(bCfg),
WithTimeout(tCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -637,7 +636,8 @@ func TestBatchSenderTimerResetNoConflict(t *testing.T) {
bCfg.MinSizeItems = 8
bCfg.FlushTimeout = 50 * time.Millisecond
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc)))
WithBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(bCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
sink := newFakeRequestSink()
Expand Down Expand Up @@ -668,7 +668,8 @@ func TestBatchSenderTimerFlush(t *testing.T) {
bCfg.MinSizeItems = 8
bCfg.FlushTimeout = 100 * time.Millisecond
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(bCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
sink := newFakeRequestSink()
Expand Down Expand Up @@ -703,9 +704,9 @@ func TestBatchSenderTimerFlush(t *testing.T) {
require.NoError(t, be.Shutdown(context.Background()))
}

func queueBatchExporter(t *testing.T, batchOption Option) *BaseExporter {
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, batchOption,
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]()))
func queueBatchExporter(t *testing.T, opts ...Option) *BaseExporter {
opts = append(opts, WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]()))
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, opts...)
require.NotNil(t, be)
require.NoError(t, err)
return be
Expand Down
Loading