Skip to content

Commit 91dce71

Browse files
axwcarsonip
andauthored
[exporter/elasticsearch] add support for batcher config (#34238)
**Description:** Add opt-in support for the experimental batch sender (open-telemetry/opentelemetry-collector#8122). When opting into this functionality the exporter's `Consume*` methods will make synchronous bulk requests to Elasticsearch, without additional batching/buffering in the exporter. By default the exporter continues to use its own buffering, which supports thresholds for time, number of documents, and size of encoded documents in bytes. The batch sender does not currently support a bytes-based threshold, and is experimental, hence why we are not yet making it the default for the Elasticsearch exporter. This PR is based on #32632, but made to be non-breaking. **Link to tracking Issue:** #32377 **Testing:** Added unit and integration tests. Manually tested with Elasticsearch, using the following config to highlight that client metadata can now flow through all the way: ```yaml receivers: otlp: protocols: http: endpoint: 0.0.0.0:4318 include_metadata: true exporters: elasticsearch: endpoint: "http://localhost:9200" auth: authenticator: headers_setter batcher: enabled: false extensions: headers_setter: headers: - action: insert key: Authorization from_context: authorization service: extensions: [headers_setter] pipelines: traces: receivers: [otlp] processors: [] exporters: [elasticsearch] ``` I have Elasticsearch running locally, with an "admin" user with the password "changeme". Sending OTLP/HTTP to the collector with `telemetrygen traces --otlp-http --otlp-insecure http://localhost:4318 --otlp-header "Authorization=\"Basic YWRtaW46Y2hhbmdlbWU=\""`, I observe the following: - Without the `batcher` config, the exporter fails to index data into Elasticsearch due to an auth error. That's because the exporter is buffering and dropping the context with client metadata, so there's no Authorization header attached to the requests going out. - With `batcher: {enabled: true}`, same behaviour as above. Unlike the [`batch` processor](https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor/batchprocessor), the batch sender does not maintain client metadata. - With `batcher: {enabled: false}`, the exporter successfully indexes data into Elasticsearch. **Documentation:** Updated the README. --------- Co-authored-by: Carson Ip <[email protected]>
1 parent 6b7057d commit 91dce71

File tree

12 files changed

+415
-36
lines changed

12 files changed

+415
-36
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add opt-in support for the experimental `batcher` config
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [32377]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
By enabling (or explicitly disabling) the batcher, the Elasticsearch exporter's
20+
existing batching/buffering logic will be disabled, and the batch sender will be used.
21+
22+
# If your change doesn't affect end users or the exported elements of any package,
23+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
24+
# Optional: The change log or logs in which this entry should be included.
25+
# e.g. '[user]' or '[user, api]'
26+
# Include 'user' if the change is relevant to end users.
27+
# Include 'api' if there is a change to a library API.
28+
# Default: '[user]'
29+
change_logs: [user]

exporter/elasticsearchexporter/README.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,33 @@ All other defaults are as defined by [confighttp].
8181

8282
The Elasticsearch exporter supports the common [`sending_queue` settings][exporterhelper]. However, the sending queue is currently disabled by default.
8383

84+
### Batching
85+
86+
> [!WARNING]
87+
> The `batcher` config is experimental and may change without notice.
88+
89+
The Elasticsearch exporter supports the [common `batcher` settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterbatcher/config.go).
90+
91+
- `batcher`:
92+
- `enabled` (default=unset): Enable batching of requests into a single bulk request.
93+
- `min_size_items` (default=5000): Minimum number of log records / spans in the buffer to trigger a flush immediately.
94+
- `max_size_items` (default=10000): Maximum number of log records / spans in a request.
95+
- `flush_timeout` (default=30s): Maximum time of the oldest item spent inside the buffer, aka "max age of buffer". A flush will happen regardless of the size of content in buffer.
96+
97+
By default, the exporter will perform its own buffering and batching, as configured through the
98+
`flush` config, and `batcher` will be unused. By setting `batcher::enabled` to either `true` or
99+
`false`, the exporter will not perform any of its own buffering or batching, and the `flush` config
100+
will be ignored. In a future release when the `batcher` config is stable, and has feature parity
101+
with the exporter's existing `flush` config, it will be enabled by default.
102+
103+
Using the common `batcher` functionality provides several benefits over the default behavior:
104+
- Combined with a persistent queue, or no queue at all, `batcher` enables at least once delivery.
105+
With the default behavior, the exporter will accept data and process it asynchronously,
106+
which interacts poorly with queuing.
107+
- By ensuring the exporter makes requests to Elasticsearch synchronously,
108+
client metadata can be passed through to Elasticsearch requests,
109+
e.g. by using the [`headers_setter` extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/extension/headerssetterextension/README.md).
110+
84111
### Elasticsearch document routing
85112

86113
Telemetry data will be written to signal specific data streams by default:
@@ -173,6 +200,9 @@ The behaviour of this bulk indexing can be configured with the following setting
173200
- `max_interval` (default=1m): Max waiting time if a HTTP request failed.
174201
- `retry_on_status` (default=[429, 500, 502, 503, 504]): Status codes that trigger request or document level retries. Request level retry and document level retry status codes are shared and cannot be configured separately. To avoid duplicates, it is recommended to set it to `[429]`. WARNING: The default will be changed to `[429]` in the future.
175202

203+
> [!NOTE]
204+
> The `flush` config will be ignored when `batcher::enabled` config is explicitly set to `true` or `false`.
205+
176206
### Elasticsearch node discovery
177207

178208
The Elasticsearch Exporter will regularly check Elasticsearch for available nodes.

exporter/elasticsearchexporter/bulkindexer.go

Lines changed: 116 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry
55

66
import (
77
"context"
8-
"fmt"
8+
"errors"
99
"io"
1010
"runtime"
1111
"sync"
@@ -51,10 +51,103 @@ type bulkIndexerSession interface {
5151
}
5252

5353
func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (bulkIndexer, error) {
54-
// TODO: add support for synchronous bulk indexing, to integrate with the exporterhelper batch sender.
54+
if config.Batcher.Enabled != nil {
55+
return newSyncBulkIndexer(logger, client, config), nil
56+
}
5557
return newAsyncBulkIndexer(logger, client, config)
5658
}
5759

60+
func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) *syncBulkIndexer {
61+
var maxDocRetry int
62+
if config.Retry.Enabled {
63+
// max_requests includes initial attempt
64+
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32344
65+
maxDocRetry = config.Retry.MaxRequests - 1
66+
}
67+
return &syncBulkIndexer{
68+
config: docappender.BulkIndexerConfig{
69+
Client: client,
70+
MaxDocumentRetries: maxDocRetry,
71+
Pipeline: config.Pipeline,
72+
RetryOnDocumentStatus: config.Retry.RetryOnStatus,
73+
},
74+
flushTimeout: config.Timeout,
75+
retryConfig: config.Retry,
76+
logger: logger,
77+
}
78+
}
79+
80+
type syncBulkIndexer struct {
81+
config docappender.BulkIndexerConfig
82+
flushTimeout time.Duration
83+
retryConfig RetrySettings
84+
logger *zap.Logger
85+
}
86+
87+
// StartSession creates a new docappender.BulkIndexer, and wraps
88+
// it with a syncBulkIndexerSession.
89+
func (s *syncBulkIndexer) StartSession(context.Context) (bulkIndexerSession, error) {
90+
bi, err := docappender.NewBulkIndexer(s.config)
91+
if err != nil {
92+
return nil, err
93+
}
94+
return &syncBulkIndexerSession{
95+
s: s,
96+
bi: bi,
97+
}, nil
98+
}
99+
100+
// Close is a no-op.
101+
func (s *syncBulkIndexer) Close(context.Context) error {
102+
return nil
103+
}
104+
105+
type syncBulkIndexerSession struct {
106+
s *syncBulkIndexer
107+
bi *docappender.BulkIndexer
108+
}
109+
110+
// Add adds an item to the sync bulk indexer session.
111+
func (s *syncBulkIndexerSession) Add(_ context.Context, index string, document io.WriterTo) error {
112+
return s.bi.Add(docappender.BulkIndexerItem{Index: index, Body: document})
113+
}
114+
115+
// End is a no-op.
116+
func (s *syncBulkIndexerSession) End() {
117+
// TODO acquire docappender.BulkIndexer from pool in StartSession, release here
118+
}
119+
120+
// Flush flushes documents added to the bulk indexer session.
121+
func (s *syncBulkIndexerSession) Flush(ctx context.Context) error {
122+
var retryBackoff func(int) time.Duration
123+
for attempts := 0; ; attempts++ {
124+
if _, err := flushBulkIndexer(ctx, s.bi, s.s.flushTimeout, s.s.logger); err != nil {
125+
return err
126+
}
127+
if s.bi.Items() == 0 {
128+
// No documents in buffer waiting for per-document retry, exit retry loop.
129+
return nil
130+
}
131+
if retryBackoff == nil {
132+
retryBackoff = createElasticsearchBackoffFunc(&s.s.retryConfig)
133+
if retryBackoff == nil {
134+
// BUG: This should never happen in practice.
135+
// When retry is disabled / document level retry limit is reached,
136+
// documents should go into FailedDocs instead of indexer buffer.
137+
return errors.New("bulk indexer contains documents pending retry but retry is disabled")
138+
}
139+
}
140+
backoff := retryBackoff(attempts + 1) // TODO: use exporterhelper retry_sender
141+
timer := time.NewTimer(backoff)
142+
select {
143+
case <-ctx.Done():
144+
timer.Stop()
145+
return ctx.Err()
146+
case <-timer.C:
147+
}
148+
}
149+
}
150+
58151
func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (*asyncBulkIndexer, error) {
59152
numWorkers := config.NumWorkers
60153
if numWorkers == 0 {
@@ -215,18 +308,32 @@ func (w *asyncBulkIndexerWorker) run() {
215308

216309
func (w *asyncBulkIndexerWorker) flush() {
217310
ctx := context.Background()
218-
if w.flushTimeout > 0 {
311+
stat, _ := flushBulkIndexer(ctx, w.indexer, w.flushTimeout, w.logger)
312+
w.stats.docsIndexed.Add(stat.Indexed)
313+
}
314+
315+
func flushBulkIndexer(
316+
ctx context.Context,
317+
bi *docappender.BulkIndexer,
318+
timeout time.Duration,
319+
logger *zap.Logger,
320+
) (docappender.BulkIndexerResponseStat, error) {
321+
if timeout > 0 {
219322
var cancel context.CancelFunc
220-
ctx, cancel = context.WithTimeout(context.Background(), w.flushTimeout)
323+
ctx, cancel = context.WithTimeout(ctx, timeout)
221324
defer cancel()
222325
}
223-
stat, err := w.indexer.Flush(ctx)
224-
w.stats.docsIndexed.Add(stat.Indexed)
326+
stat, err := bi.Flush(ctx)
225327
if err != nil {
226-
w.logger.Error("bulk indexer flush error", zap.Error(err))
328+
logger.Error("bulk indexer flush error", zap.Error(err))
227329
}
228330
for _, resp := range stat.FailedDocs {
229-
w.logger.Error(fmt.Sprintf("Drop docs: failed to index: %#v", resp.Error),
230-
zap.Int("status", resp.Status))
331+
logger.Error(
332+
"failed to index document",
333+
zap.String("index", resp.Index),
334+
zap.String("error.type", resp.Error.Type),
335+
zap.String("error.reason", resp.Error.Reason),
336+
)
231337
}
338+
return stat, err
232339
}

exporter/elasticsearchexporter/config.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"go.opentelemetry.io/collector/config/confighttp"
1616
"go.opentelemetry.io/collector/config/configopaque"
17+
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1718
"go.opentelemetry.io/collector/exporter/exporterhelper"
1819
"go.uber.org/zap"
1920
)
@@ -76,6 +77,31 @@ type Config struct {
7677
// TelemetrySettings contains settings useful for testing/debugging purposes
7778
// This is experimental and may change at any time.
7879
TelemetrySettings `mapstructure:"telemetry"`
80+
81+
// Batcher holds configuration for batching requests based on timeout
82+
// and size-based thresholds.
83+
//
84+
// Batcher is unused by default, in which case Flush will be used.
85+
// If Batcher.Enabled is non-nil (i.e. batcher::enabled is specified),
86+
// then the Flush will be ignored even if Batcher.Enabled is false.
87+
Batcher BatcherConfig `mapstructure:"batcher"`
88+
}
89+
90+
// BatcherConfig holds configuration for exporterbatcher.
91+
//
92+
// This is a slightly modified version of exporterbatcher.Config,
93+
// to enable tri-state Enabled: unset, false, true.
94+
type BatcherConfig struct {
95+
// Enabled indicates whether to enqueue batches before sending
96+
// to the exporter. If Enabled is specified (non-nil),
97+
// then the exporter will not perform any buffering itself.
98+
Enabled *bool `mapstructure:"enabled"`
99+
100+
// FlushTimeout sets the time after which a batch will be sent regardless of its size.
101+
FlushTimeout time.Duration `mapstructure:"flush_timeout"`
102+
103+
exporterbatcher.MinSizeConfig `mapstructure:",squash"`
104+
exporterbatcher.MaxSizeConfig `mapstructure:",squash"`
79105
}
80106

81107
type TelemetrySettings struct {

exporter/elasticsearchexporter/config_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"go.opentelemetry.io/collector/config/confighttp"
1717
"go.opentelemetry.io/collector/config/configopaque"
1818
"go.opentelemetry.io/collector/confmap/confmaptest"
19+
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1920
"go.opentelemetry.io/collector/exporter/exporterhelper"
2021

2122
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadata"
@@ -107,6 +108,15 @@ func TestConfig(t *testing.T) {
107108
PrefixSeparator: "-",
108109
DateFormat: "%Y.%m.%d",
109110
},
111+
Batcher: BatcherConfig{
112+
FlushTimeout: 30 * time.Second,
113+
MinSizeConfig: exporterbatcher.MinSizeConfig{
114+
MinSizeItems: 5000,
115+
},
116+
MaxSizeConfig: exporterbatcher.MaxSizeConfig{
117+
MaxSizeItems: 10000,
118+
},
119+
},
110120
},
111121
},
112122
{
@@ -168,6 +178,15 @@ func TestConfig(t *testing.T) {
168178
PrefixSeparator: "-",
169179
DateFormat: "%Y.%m.%d",
170180
},
181+
Batcher: BatcherConfig{
182+
FlushTimeout: 30 * time.Second,
183+
MinSizeConfig: exporterbatcher.MinSizeConfig{
184+
MinSizeItems: 5000,
185+
},
186+
MaxSizeConfig: exporterbatcher.MaxSizeConfig{
187+
MaxSizeItems: 10000,
188+
},
189+
},
171190
},
172191
},
173192
{
@@ -229,6 +248,15 @@ func TestConfig(t *testing.T) {
229248
PrefixSeparator: "-",
230249
DateFormat: "%Y.%m.%d",
231250
},
251+
Batcher: BatcherConfig{
252+
FlushTimeout: 30 * time.Second,
253+
MinSizeConfig: exporterbatcher.MinSizeConfig{
254+
MinSizeItems: 5000,
255+
},
256+
MaxSizeConfig: exporterbatcher.MaxSizeConfig{
257+
MaxSizeItems: 10000,
258+
},
259+
},
232260
},
233261
},
234262
{
@@ -263,6 +291,16 @@ func TestConfig(t *testing.T) {
263291
cfg.Endpoint = "https://elastic.example.com:9200"
264292
}),
265293
},
294+
{
295+
id: component.NewIDWithName(metadata.Type, "batcher_disabled"),
296+
configFile: "config.yaml",
297+
expected: withDefaultConfig(func(cfg *Config) {
298+
cfg.Endpoint = "https://elastic.example.com:9200"
299+
300+
enabled := false
301+
cfg.Batcher.Enabled = &enabled
302+
}),
303+
},
266304
}
267305

268306
for _, tt := range tests {

0 commit comments

Comments
 (0)