Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: pkg/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Use `configoptional.Optional` for the `exporterhelper.QueueBatchConfig`

# One or more tracking issues or pull requests related to the change
issues: [14155]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
It's recommended to change the field type in your component configuration to be `configoptional.Optional[exporterhelper.QueueBatchConfig]` to keep the `enabled` subfield.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
3 changes: 2 additions & 1 deletion exporter/debugexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)
Expand All @@ -33,7 +34,7 @@ type Config struct {
// UseInternalLogger defines whether the exporter sends the output to the collector's internal logger.
UseInternalLogger bool `mapstructure:"use_internal_logger"`

QueueConfig exporterhelper.QueueBatchConfig `mapstructure:"sending_queue"`
QueueConfig configoptional.Optional[exporterhelper.QueueBatchConfig] `mapstructure:"sending_queue"`

// prevent unkeyed literal initialization
_ struct{}
Expand Down
4 changes: 2 additions & 2 deletions exporter/debugexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/confmaptest"
Expand All @@ -25,7 +26,6 @@ func TestUnmarshalDefaultConfig(t *testing.T) {

func TestUnmarshalConfig(t *testing.T) {
queueCfg := exporterhelper.NewDefaultQueueConfig()
queueCfg.Enabled = false
tests := []struct {
filename string
cfg *Config
Expand All @@ -37,7 +37,7 @@ func TestUnmarshalConfig(t *testing.T) {
Verbosity: configtelemetry.LevelDetailed,
SamplingInitial: 10,
SamplingThereafter: 50,
QueueConfig: queueCfg,
QueueConfig: configoptional.Default(*queueCfg.Get()),
},
},
{
Expand Down
7 changes: 5 additions & 2 deletions exporter/debugexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/exporter/debugexporter/internal/metadata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -111,15 +112,17 @@ func createTestCases() []testCase {
name: "default config",
config: func() *Config {
c := createDefaultConfig().(*Config)
c.QueueConfig.QueueSize = 10
c.QueueConfig = exporterhelper.NewDefaultQueueConfig()
c.QueueConfig.Get().QueueSize = 10
return c
}(),
},
{
name: "don't use internal logger",
config: func() *Config {
cfg := createDefaultConfig().(*Config)
cfg.QueueConfig.QueueSize = 10
cfg.QueueConfig = exporterhelper.NewDefaultQueueConfig()
cfg.QueueConfig.Get().QueueSize = 10
cfg.UseInternalLogger = false
return cfg
}(),
Expand Down
5 changes: 2 additions & 3 deletions exporter/debugexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
Expand Down Expand Up @@ -43,14 +44,12 @@ func NewFactory() exporter.Factory {

func createDefaultConfig() component.Config {
queueCfg := exporterhelper.NewDefaultQueueConfig()
queueCfg.Enabled = false

return &Config{
Verbosity: configtelemetry.LevelBasic,
SamplingInitial: defaultSamplingInitial,
SamplingThereafter: defaultSamplingThereafter,
UseInternalLogger: true,
QueueConfig: queueCfg,
QueueConfig: configoptional.Default(*queueCfg.Get()),
}
}

Expand Down
2 changes: 1 addition & 1 deletion exporter/debugexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/collector/component v1.46.0
go.opentelemetry.io/collector/component/componenttest v0.140.0
go.opentelemetry.io/collector/config/configoptional v1.46.0
go.opentelemetry.io/collector/config/configtelemetry v0.140.0
go.opentelemetry.io/collector/confmap v1.46.0
go.opentelemetry.io/collector/consumer v1.46.0
Expand Down Expand Up @@ -43,7 +44,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/client v1.46.0 // indirect
go.opentelemetry.io/collector/config/configoptional v1.46.0 // indirect
go.opentelemetry.io/collector/config/configretry v1.46.0 // indirect
go.opentelemetry.io/collector/confmap/xconfmap v0.140.0 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.140.0 // indirect
Expand Down
3 changes: 2 additions & 1 deletion exporter/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
Expand All @@ -22,7 +23,7 @@ var typeStr = component.MustNewType("example")

// exampleConfig holds configuration settings for the exporter.
type exampleConfig struct {
QueueSettings exporterhelper.QueueBatchConfig
QueueSettings configoptional.Optional[exporterhelper.QueueBatchConfig]
BackOffConfig configretry.BackOffConfig
}

Expand Down
17 changes: 9 additions & 8 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
Expand Down Expand Up @@ -47,7 +48,7 @@ type BaseExporter struct {
retryCfg configretry.BackOffConfig

queueBatchSettings queuebatch.Settings[request.Request]
queueCfg queuebatch.Config
queueCfg configoptional.Optional[queuebatch.Config]
}

func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sender.SendFunc[request.Request], options ...Option) (*BaseExporter, error) {
Expand Down Expand Up @@ -82,19 +83,19 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende
return nil, err
}

if be.queueCfg.Batch.HasValue() {
if be.queueCfg.HasValue() && be.queueCfg.Get().Batch.HasValue() {
// Batcher mutates the data.
be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
}

if be.queueCfg.Enabled {
if be.queueCfg.HasValue() {
qSet := queuebatch.AllSettings[request.Request]{
Settings: be.queueBatchSettings,
Signal: signal,
ID: set.ID,
Telemetry: set.TelemetrySettings,
}
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.ExportFailureMessage, be.firstSender)
be.QueueSender, err = NewQueueSender(qSet, *be.queueCfg.Get(), be.ExportFailureMessage, be.firstSender)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -191,7 +192,7 @@ func WithRetry(config configretry.BackOffConfig) Option {
// WithQueue overrides the default queuebatch.Config for an exporter.
// The default queuebatch.Config is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(cfg queuebatch.Config) Option {
func WithQueue(cfg configoptional.Optional[queuebatch.Config]) Option {
return func(o *BaseExporter) error {
if o.queueBatchSettings.Encoding == nil {
return errors.New("WithQueue option is not available for the new request exporters, use WithQueueBatch instead")
Expand All @@ -204,13 +205,13 @@ func WithQueue(cfg queuebatch.Config) Option {
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithQueueBatch(cfg queuebatch.Config, set queuebatch.Settings[request.Request]) Option {
func WithQueueBatch(cfg configoptional.Optional[queuebatch.Config], set queuebatch.Settings[request.Request]) Option {
return func(o *BaseExporter) error {
if !cfg.Enabled {
if !cfg.HasValue() {
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
if cfg.StorageID != nil && set.Encoding == nil {
if cfg.Get().StorageID != nil && set.Encoding == nil {
return errors.New("`Settings.Encoding` must not be nil when persistent queue is enabled")
}
o.queueBatchSettings = set
Expand Down
13 changes: 5 additions & 8 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
Expand Down Expand Up @@ -54,7 +55,7 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {

qCfg := NewDefaultQueueConfig()
storageID := component.NewID(component.MustNewType("test"))
qCfg.StorageID = &storageID
qCfg.Get().StorageID = &storageID
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
WithQueueBatchSettings(newFakeQueueBatch()),
WithRetry(configretry.NewDefaultBackOffConfig()),
Expand All @@ -69,7 +70,7 @@ func TestBaseExporterLogging(t *testing.T) {
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.Enabled = false
qCfg := NewDefaultQueueConfig()
qCfg.WaitForResult = true
qCfg.Get().WaitForResult = true
bs, err := NewBaseExporter(set, pipeline.SignalMetrics, errExport,
WithQueueBatchSettings(newFakeQueueBatch()),
WithQueue(qCfg),
Expand Down Expand Up @@ -98,19 +99,15 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
queueOptions: []Option{
WithQueueBatchSettings(newFakeQueueBatch()),
func() Option {
qs := NewDefaultQueueConfig()
qs.Enabled = false
return WithQueue(qs)
return WithQueue(configoptional.None[queuebatch.Config]())
}(),
},
},
{
name: "WithRequestQueue",
queueOptions: []Option{
func() Option {
qs := NewDefaultQueueConfig()
qs.Enabled = false
return WithQueueBatch(qs, newFakeQueueBatch())
return WithQueueBatch(configoptional.None[queuebatch.Config](), newFakeQueueBatch())
}(),
},
},
Expand Down
7 changes: 3 additions & 4 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ import (

// NewDefaultQueueConfig returns the default config for queuebatch.Config.
// By default, the queue stores 1000 requests of telemetry and is non-blocking when full.
func NewDefaultQueueConfig() queuebatch.Config {
return queuebatch.Config{
Enabled: true,
func NewDefaultQueueConfig() configoptional.Optional[queuebatch.Config] {
return configoptional.Some(queuebatch.Config{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following up from an offline discussion, would it be a major inconvenience if we just returned the config directly here and required consumers to wrap it themselves? I feel like it would make tests easier and would make the function slightly less opinionated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree!

I still feel something's not perfect. We're recommending that users call configoptional.Default(...) or configoptional.Some(...) in order to disable or enable a feature by default. It sort of suggests adding a function named configoptional.Enabled(...) (for enabled with defaults) and configoptional.Disabled(...) (for disabled with defaults).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your concern. I think this partially sources from the fact that we have the enabled field in configoptional now, as before it was just a way to think about the presence of a value at the time the function is called. The best I can think of off the top of my head is aliasing each function to EnabledByDefault and DisabledByDefault or similar, but I'd want to give it more thought before committing. If we go this route, would want the names to be very explicit since I think Enabled and Disabled are also a little ambiguous.

Sizer: request.SizerTypeRequests,
NumConsumers: 10,
// By default, batches are 8192 spans, for a total of up to 8 million spans in the queue
Expand All @@ -32,7 +31,7 @@ func NewDefaultQueueConfig() queuebatch.Config {
Sizer: request.SizerTypeItems,
MinSize: 8192,
}),
}
})
}

func NewQueueSender(
Expand Down
10 changes: 6 additions & 4 deletions exporter/exporterhelper/internal/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
Expand All @@ -31,8 +32,9 @@ func TestNewQueueSenderFailedRequestDropped(t *testing.T) {
}
logger, observed := observer.New(zap.ErrorLevel)
qSet.Telemetry.Logger = zap.New(logger)
qCfg := NewDefaultQueueConfig()
be, err := NewQueueSender(
qSet, NewDefaultQueueConfig(), "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") }))
qSet, *qCfg.Get(), "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") }))
require.NoError(t, err)

require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -46,14 +48,14 @@ func TestQueueConfig_Validate(t *testing.T) {
qCfg := NewDefaultQueueConfig()
require.NoError(t, qCfg.Validate())

qCfg.NumConsumers = 0
qCfg.Get().NumConsumers = 0
require.EqualError(t, qCfg.Validate(), "`num_consumers` must be positive")

qCfg = NewDefaultQueueConfig()
qCfg.QueueSize = 0
qCfg.Get().QueueSize = 0
require.EqualError(t, qCfg.Validate(), "`queue_size` must be positive")

// Confirm Validate doesn't return error with invalid config when feature is disabled
qCfg.Enabled = false
qCfg = configoptional.None[queuebatch.Config]()
assert.NoError(t, qCfg.Validate())
}
7 changes: 0 additions & 7 deletions exporter/exporterhelper/internal/queuebatch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ import (

// Config defines configuration for queueing and batching incoming requests.
type Config struct {
// Enabled indicates whether to not enqueue and batch before exporting.
Enabled bool `mapstructure:"enabled"`

// WaitForResult determines if incoming requests are blocked until the request is processed or not.
// Currently, this option is not available when persistent queue is configured using the storage configuration.
WaitForResult bool `mapstructure:"wait_for_result"`
Expand Down Expand Up @@ -66,10 +63,6 @@ func (cfg *Config) Unmarshal(conf *confmap.Conf) error {

// Validate checks if the Config is valid
func (cfg *Config) Validate() error {
if !cfg.Enabled {
return nil
}

if cfg.NumConsumers <= 0 {
return errors.New("`num_consumers` must be positive")
}
Expand Down
Loading
Loading