Skip to content
27 changes: 27 additions & 0 deletions .chloggen/elasticexporte-batcher.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Validate batcher config for exporter

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [38072]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
17 changes: 17 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ func (cfg *Config) Validate() error {
if cfg.Retry.MaxRetries < 0 {
return errors.New("retry::max_retries should be non-negative")
}
if batcherCfg, ok := cfg.exporterbatcherConfig(); ok {
if err := batcherCfg.Validate(); err != nil {
return fmt.Errorf("invalid batcher config: %w", err)
}
}

return nil
}
Expand Down Expand Up @@ -314,6 +319,18 @@ func (cfg *Config) endpoints() ([]string, error) {
return endpoints, nil
}

func (cfg *Config) exporterbatcherConfig() (exporterbatcher.Config, bool) {
if cfg.Batcher.Enabled == nil {
return exporterbatcher.Config{}, false
}
return exporterbatcher.Config{
Enabled: *cfg.Batcher.Enabled,
FlushTimeout: cfg.Batcher.FlushTimeout,
MinSizeConfig: cfg.Batcher.MinSizeConfig,
MaxSizeConfig: cfg.Batcher.MaxSizeConfig,
}, true
}

func validateEndpoint(endpoint string) error {
if endpoint == "" {
return errConfigEmptyEndpoint
Expand Down
10 changes: 10 additions & 0 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,16 @@ func TestConfig_Validate(t *testing.T) {
}),
err: `must not specify both retry::max_requests and retry::max_retries`,
},
"batcher max_size_items less than min_size_items": {
config: withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"http://test:9200"}
cfg.Batcher.MaxSizeItems = 1000
cfg.Batcher.MinSizeItems = 2000
enableBatcher := true
cfg.Batcher.Enabled = &enableBatcher
}),
err: `max_size_items must be greater than or equal to min_size_items`,
},
}

for name, tt := range tests {
Expand Down
10 changes: 2 additions & 8 deletions exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,8 @@ func exporterhelperOptions(
exporterhelper.WithShutdown(shutdown),
exporterhelper.WithQueue(cfg.QueueSettings),
}
if cfg.Batcher.Enabled != nil {
batcherConfig := exporterbatcher.Config{
Enabled: *cfg.Batcher.Enabled,
FlushTimeout: cfg.Batcher.FlushTimeout,
MinSizeConfig: cfg.Batcher.MinSizeConfig,
MaxSizeConfig: cfg.Batcher.MaxSizeConfig,
}
opts = append(opts, exporterhelper.WithBatcher(batcherConfig))
if batcherCfg, ok := cfg.exporterbatcherConfig(); ok {
opts = append(opts, exporterhelper.WithBatcher(batcherCfg))

// Effectively disable timeout_sender because timeout is enforced in bulk indexer.
//
Expand Down
Loading