diff --git a/.chloggen/elasticsearchexporter_batchsender.yaml b/.chloggen/elasticsearchexporter_batchsender.yaml new file mode 100644 index 000000000000..909a909ae2de --- /dev/null +++ b/.chloggen/elasticsearchexporter_batchsender.yaml @@ -0,0 +1,33 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Improve reliability when used with persistent queue. Deprecate config options `flush.*`, use `batcher.*` instead. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32377] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Move buffering from bulk indexer to batch sender to improve reliability. + With this change, there should be no event loss when used with persistent queue in the event of a collector crash. + Introduce `batcher.*` to configure the batch sender which is now enabled by default. + Option `flush.bytes` is deprecated. Use the new `batcher.min_size_items` option to control the minimum number of items (log records, spans) to trigger a flush. `batcher.min_size_items` will be set to the value of `flush.bytes` / 1000 if `flush.bytes` is non-zero. + Option `flush.interval` is deprecated. Use the new `batcher.flush_timeout` option to control max age of buffer. `batcher.flush_timeout` will be set to the value of `flush.interval` if `flush.interval` is non-zero. + Queue sender `sending_queue.enabled` defaults to `true`. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 08abc5d7fa65..42c451e193c6 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -78,7 +78,20 @@ All other defaults are as defined by [confighttp]. ### Queuing -The Elasticsearch exporter supports the common [`sending_queue` settings][exporterhelper]. However, the sending queue is currently disabled by default. +The Elasticsearch exporter supports the common [`sending_queue` settings][exporterhelper]. The sending queue is enabled by default. + +When [persistent queue](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md#persistent-queue) is used, there should be no event loss even on collector crashes. + +`sending_queue::num_consumers` (default=100) controls the number of concurrent requests being fetched from the queue to the batcher, or directly to bulk indexer if batcher is disabled. However, the actual number of concurrent bulk requests is controlled by `num_workers`. + +### Batching + +The Elasticsearch exporter supports the [common `batcher` settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterbatcher/config.go). The `batcher` config is experimental and may change without notice. + +- `enabled` (default=true): Enable batching of requests into a single bulk request. +- `min_size_items` (default=5000): Minimum number of log records / spans in the buffer to trigger a flush immediately. +- `max_size_items` (default=10000): Maximum number of log records / spans in a request. +- `flush_timeout` (default=30s): Maximum time of the oldest item spent inside the buffer, aka "max age of buffer". A flush will happen regardless of the size of content in buffer. ### Elasticsearch document routing @@ -158,10 +171,10 @@ This can be configured through the following settings: The Elasticsearch exporter uses the [Elasticsearch Bulk API] for indexing documents. The behaviour of this bulk indexing can be configured with the following settings: -- `num_workers` (default=runtime.NumCPU()): Number of workers publishing bulk requests concurrently. +- `num_workers` (default=runtime.NumCPU()): Maximum number of concurrent bulk requests. - `flush`: Event bulk indexer buffer flush settings - - `bytes` (default=5000000): Write buffer flush size limit. - - `interval` (default=30s): Write buffer flush time limit. + - `bytes` (DEPRECATED, use `batcher.min_size_items` instead): Write buffer flush size limit. When specified, it is translated to `batcher.min_size_items` using an estimate of average item size of 1000 bytes. + - `interval` (DEPRECATED, use `batcher.flush_timeout` instead): Maximum time of the oldest item spent inside the buffer, aka "max age of buffer". A flush will happen regardless of the size of content in buffer. - `retry`: Elasticsearch bulk request retry settings - `enabled` (default=true): Enable/Disable request retry on error. Failed requests are retried with exponential backoff. - `max_requests` (default=3): Number of HTTP request retries. diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index 53d5006da26d..7098410b38f7 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -14,13 +14,18 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterhelper" - "go.uber.org/zap" ) // Config defines configuration for Elastic exporter. type Config struct { exporterhelper.QueueSettings `mapstructure:"sending_queue"` + + // Experimental: This configuration is at the early stage of development and may change without backward compatibility + // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. + BatcherConfig exporterbatcher.Config `mapstructure:"batcher"` + // Endpoints holds the Elasticsearch URLs the exporter should send events to. // // This setting is required if CloudID is not set and if the @@ -69,7 +74,7 @@ type Config struct { Authentication AuthenticationSettings `mapstructure:",squash"` Discovery DiscoverySettings `mapstructure:"discover"` Retry RetrySettings `mapstructure:"retry"` - Flush FlushSettings `mapstructure:"flush"` + Flush FlushSettings `mapstructure:"flush"` // Deprecated: use `batcher` instead. Mapping MappingsSettings `mapstructure:"mapping"` LogstashFormat LogstashFormatSettings `mapstructure:"logstash_format"` @@ -131,9 +136,13 @@ type DiscoverySettings struct { // all events already serialized into the send-buffer. type FlushSettings struct { // Bytes sets the send buffer flushing limit. + // + // Deprecated: Use `batcher.min_size_items` instead. Bytes int `mapstructure:"bytes"` // Interval configures the max age of a document in the send buffer. + // + // Deprecated: Use `batcher.flush_timeout` instead. Interval time.Duration `mapstructure:"interval"` } @@ -320,12 +329,3 @@ func parseCloudID(input string) (*url.URL, error) { func (cfg *Config) MappingMode() MappingMode { return mappingModes[cfg.Mapping.Mode] } - -func logConfigDeprecationWarnings(cfg *Config, logger *zap.Logger) { - if !cfg.Mapping.Dedup { - logger.Warn("dedup has been deprecated, and will always be enabled in future") - } - if cfg.Mapping.Dedot && cfg.MappingMode() != MappingECS || !cfg.Mapping.Dedot && cfg.MappingMode() == MappingECS { - logger.Warn("dedot has been deprecated: in the future, dedotting will always be performed in ECS mode only") - } -} diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index c409f175497e..aff847dcc0b8 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -16,7 +16,9 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/confmap/confmaptest" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/exporter/exporterqueue" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadata" ) @@ -53,9 +55,9 @@ func TestConfig(t *testing.T) { configFile: "config.yaml", expected: &Config{ QueueSettings: exporterhelper.QueueSettings{ - Enabled: false, - NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers, - QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, + Enabled: true, + NumConsumers: 100, + QueueSize: exporterqueue.NewDefaultConfig().QueueSize, }, Endpoints: []string{"https://elastic.example.com:9200"}, Index: "", @@ -88,8 +90,11 @@ func TestConfig(t *testing.T) { Discovery: DiscoverySettings{ OnStart: true, }, - Flush: FlushSettings{ - Bytes: 10485760, + BatcherConfig: exporterbatcher.Config{ + Enabled: true, + FlushTimeout: 5 * time.Second, + MinSizeConfig: exporterbatcher.MinSizeConfig{MinSizeItems: 100}, + MaxSizeConfig: exporterbatcher.MaxSizeConfig{MaxSizeItems: 200}, }, Retry: RetrySettings{ Enabled: true, @@ -108,6 +113,7 @@ func TestConfig(t *testing.T) { PrefixSeparator: "-", DateFormat: "%Y.%m.%d", }, + NumWorkers: 1, }, }, { @@ -115,9 +121,9 @@ func TestConfig(t *testing.T) { configFile: "config.yaml", expected: &Config{ QueueSettings: exporterhelper.QueueSettings{ - Enabled: true, - NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers, - QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, + Enabled: false, + NumConsumers: 100, + QueueSize: exporterqueue.NewDefaultConfig().QueueSize, }, Endpoints: []string{"http://localhost:9200"}, Index: "", @@ -150,8 +156,11 @@ func TestConfig(t *testing.T) { Discovery: DiscoverySettings{ OnStart: true, }, - Flush: FlushSettings{ - Bytes: 10485760, + BatcherConfig: exporterbatcher.Config{ + Enabled: true, + FlushTimeout: 5 * time.Second, + MinSizeConfig: exporterbatcher.MinSizeConfig{MinSizeItems: 100}, + MaxSizeConfig: exporterbatcher.MaxSizeConfig{MaxSizeItems: 200}, }, Retry: RetrySettings{ Enabled: true, @@ -170,6 +179,7 @@ func TestConfig(t *testing.T) { PrefixSeparator: "-", DateFormat: "%Y.%m.%d", }, + NumWorkers: 1, }, }, { @@ -178,7 +188,7 @@ func TestConfig(t *testing.T) { expected: &Config{ QueueSettings: exporterhelper.QueueSettings{ Enabled: true, - NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers, + NumConsumers: 100, QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, Endpoints: []string{"http://localhost:9200"}, @@ -212,8 +222,11 @@ func TestConfig(t *testing.T) { Discovery: DiscoverySettings{ OnStart: true, }, - Flush: FlushSettings{ - Bytes: 10485760, + BatcherConfig: exporterbatcher.Config{ + Enabled: true, + FlushTimeout: 5 * time.Second, + MinSizeConfig: exporterbatcher.MinSizeConfig{MinSizeItems: 100}, + MaxSizeConfig: exporterbatcher.MaxSizeConfig{MaxSizeItems: 200}, }, Retry: RetrySettings{ Enabled: true, @@ -232,6 +245,7 @@ func TestConfig(t *testing.T) { PrefixSeparator: "-", DateFormat: "%Y.%m.%d", }, + NumWorkers: 1, }, }, { diff --git a/exporter/elasticsearchexporter/elasticsearch_bulk.go b/exporter/elasticsearchexporter/elasticsearch_bulk.go index 5beb768bc582..405c7d2cc332 100644 --- a/exporter/elasticsearchexporter/elasticsearch_bulk.go +++ b/exporter/elasticsearchexporter/elasticsearch_bulk.go @@ -4,12 +4,11 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" import ( - "bytes" "context" + "errors" "fmt" "io" "net/http" - "runtime" "sync" "sync/atomic" "time" @@ -19,6 +18,7 @@ import ( elasticsearch7 "github.com/elastic/go-elasticsearch/v7" "go.opentelemetry.io/collector/component" "go.uber.org/zap" + "golang.org/x/sync/semaphore" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/sanitize" ) @@ -26,10 +26,6 @@ import ( type esClientCurrent = elasticsearch7.Client type esConfigCurrent = elasticsearch7.Config -type esBulkIndexerCurrent = bulkIndexerPool - -type esBulkIndexerItem = docappender.BulkIndexerItem - // clientLogger implements the estransport.Logger interface // that is required by the Elasticsearch client for logging. type clientLogger struct { @@ -138,7 +134,7 @@ func newElasticsearchClient( // configure retry behavior RetryOnStatus: config.Retry.RetryOnStatus, DisableRetry: retryDisabled, - EnableRetryOnTimeout: config.Retry.Enabled, + EnableRetryOnTimeout: config.Retry.Enabled, // for timeouts in underlying transport layers //RetryOnError: retryOnError, // should be used from esclient version 8 onwards MaxRetries: maxRetries, RetryBackoff: createElasticsearchBackoffFunc(&config.Retry), @@ -177,96 +173,72 @@ func createElasticsearchBackoffFunc(config *RetrySettings) func(int) time.Durati } } -func pushDocuments(ctx context.Context, index string, document []byte, bulkIndexer *esBulkIndexerCurrent) error { - return bulkIndexer.Add(ctx, index, bytes.NewReader(document)) +func newBulkIndexer(logger *zap.Logger, client *esClientCurrent, config *Config) (*bulkIndexerManager, error) { //nolint:unparam + return &bulkIndexerManager{ + closeCh: make(chan struct{}), + stats: bulkIndexerStats{}, + logger: logger, + config: config, + wg: &sync.WaitGroup{}, + sem: semaphore.NewWeighted(int64(config.NumWorkers)), + client: client, + }, nil } -func newBulkIndexer(logger *zap.Logger, client *elasticsearch7.Client, config *Config) (*esBulkIndexerCurrent, error) { - numWorkers := config.NumWorkers - if numWorkers == 0 { - numWorkers = runtime.NumCPU() - } +type bulkIndexerStats struct { + docsIndexed atomic.Int64 +} - flushInterval := config.Flush.Interval - if flushInterval == 0 { - flushInterval = 30 * time.Second - } +type bulkIndexerManager struct { + closeCh chan struct{} + stats bulkIndexerStats + logger *zap.Logger + config *Config + wg *sync.WaitGroup + sem *semaphore.Weighted + client *esClientCurrent +} + +func (p *bulkIndexerManager) AddBatchAndFlush(ctx context.Context, batch []docappender.BulkIndexerItem) error { + p.wg.Add(1) + defer p.wg.Done() - flushBytes := config.Flush.Bytes - if flushBytes == 0 { - flushBytes = 5e+6 + if err := p.sem.Acquire(ctx, 1); err != nil { + return err } + defer p.sem.Release(1) var maxDocRetry int - if config.Retry.Enabled { + if p.config.Retry.Enabled { // max_requests includes initial attempt // See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32344 - maxDocRetry = config.Retry.MaxRequests - 1 + maxDocRetry = p.config.Retry.MaxRequests - 1 } - pool := &bulkIndexerPool{ - wg: sync.WaitGroup{}, - items: make(chan esBulkIndexerItem, config.NumWorkers), - stats: bulkIndexerStats{}, - } - pool.wg.Add(numWorkers) - - for i := 0; i < numWorkers; i++ { - bi, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{ - Client: client, - MaxDocumentRetries: maxDocRetry, - Pipeline: config.Pipeline, - RetryOnDocumentStatus: config.Retry.RetryOnStatus, - }) - if err != nil { - return nil, err - } - w := worker{ - indexer: bi, - items: pool.items, - flushInterval: flushInterval, - flushTimeout: config.Timeout, - flushBytes: flushBytes, - logger: logger, - stats: &pool.stats, - } - go func() { - defer pool.wg.Done() - w.run() - }() + bi, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{ + Client: p.client, + MaxDocumentRetries: maxDocRetry, + Pipeline: p.config.Pipeline, + RetryOnDocumentStatus: p.config.Retry.RetryOnStatus, + }) + if err != nil { + return fmt.Errorf("error creating docappender bulk indexer: %w", err) } - return pool, nil -} -type bulkIndexerStats struct { - docsIndexed atomic.Int64 -} - -type bulkIndexerPool struct { - items chan esBulkIndexerItem - wg sync.WaitGroup - stats bulkIndexerStats -} - -// Add adds an item to the bulk indexer pool. -// -// Adding an item after a call to Close() will panic. -func (p *bulkIndexerPool) Add(ctx context.Context, index string, document io.WriterTo) error { - item := esBulkIndexerItem{ - Index: index, - Body: document, - } - select { - case <-ctx.Done(): - return ctx.Err() - case p.items <- item: - return nil + w := worker{ + indexer: bi, + closeCh: p.closeCh, + flushTimeout: p.config.Timeout, + retryBackoff: createElasticsearchBackoffFunc(&p.config.Retry), + logger: p.logger, + stats: &p.stats, } + return w.addBatchAndFlush(ctx, batch) } -// Close closes the items channel and waits for the workers to drain it. -func (p *bulkIndexerPool) Close(ctx context.Context) error { - close(p.items) +// Close closes the closeCh channel and wait for all p.AddBatchAndFlush to finish. +func (p *bulkIndexerManager) Close(ctx context.Context) error { + close(p.closeCh) doneCh := make(chan struct{}) go func() { p.wg.Wait() @@ -281,51 +253,57 @@ func (p *bulkIndexerPool) Close(ctx context.Context) error { } type worker struct { - indexer *docappender.BulkIndexer - items <-chan esBulkIndexerItem - flushInterval time.Duration - flushTimeout time.Duration - flushBytes int + indexer *docappender.BulkIndexer + closeCh <-chan struct{} + + // timeout on a single bulk request, not to be confused with `batcher.flush_timeout` option + flushTimeout time.Duration + + retryBackoff func(int) time.Duration stats *bulkIndexerStats logger *zap.Logger } -func (w *worker) run() { - flushTick := time.NewTicker(w.flushInterval) - defer flushTick.Stop() - for { +func (w *worker) addBatchAndFlush(ctx context.Context, batch []docappender.BulkIndexerItem) error { + for _, item := range batch { + if err := w.indexer.Add(item); err != nil { + return fmt.Errorf("failed to add item to bulk indexer: %w", err) + } + } + for attempts := 0; ; attempts++ { + if err := w.flush(ctx); err != nil { + return err + } + if w.indexer.Items() == 0 { + // No documents in buffer waiting for per-document retry, exit retry loop. + return nil + } + if w.retryBackoff == nil { + // BUG: This should never happen in practice. + // When retry is disabled / document level retry limit is reached, + // documents should go into FailedDocs instead of indexer buffer. + return errors.New("bulk indexer contains documents pending retry but retry is disabled") + } + backoff := w.retryBackoff(attempts + 1) // TODO: use exporterhelper retry_sender + timer := time.NewTimer(backoff) select { - case item, ok := <-w.items: - // if channel is closed, flush and return - if !ok { - w.flush() - return - } - - if err := w.indexer.Add(item); err != nil { - w.logger.Error("error adding item to bulk indexer", zap.Error(err)) - } - - // w.indexer.Len() can be either compressed or uncompressed bytes - if w.indexer.Len() >= w.flushBytes { - w.flush() - flushTick.Reset(w.flushInterval) - } - case <-flushTick.C: - // bulk indexer needs to be flushed every flush interval because - // there may be pending bytes in bulk indexer buffer due to e.g. document level 429 - w.flush() + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + case <-w.closeCh: + timer.Stop() + return errors.New("bulk indexer is closed") + case <-timer.C: } } } -func (w *worker) flush() { - ctx := context.Background() +func (w *worker) flush(ctx context.Context) error { if w.flushTimeout > 0 { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(context.Background(), w.flushTimeout) + ctx, cancel = context.WithTimeout(ctx, w.flushTimeout) defer cancel() } stat, err := w.indexer.Flush(ctx) @@ -337,4 +315,5 @@ func (w *worker) flush() { w.logger.Error(fmt.Sprintf("Drop docs: failed to index: %#v", resp.Error), zap.Int("status", resp.Status)) } + return err } diff --git a/exporter/elasticsearchexporter/elasticsearch_bulk_test.go b/exporter/elasticsearchexporter/elasticsearch_bulk_test.go index 020d29fae623..e999737ccba1 100644 --- a/exporter/elasticsearchexporter/elasticsearch_bulk_test.go +++ b/exporter/elasticsearchexporter/elasticsearch_bulk_test.go @@ -10,14 +10,12 @@ import ( "net/http" "strings" "testing" - "time" + "github.com/elastic/go-docappender/v2" "github.com/elastic/go-elasticsearch/v7" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "go.uber.org/zap/zaptest/observer" ) var defaultRoundTripFunc = func(*http.Request) (*http.Response, error) { @@ -50,8 +48,8 @@ const successResp = `{ ] }` -func TestBulkIndexer_flushOnClose(t *testing.T) { - cfg := Config{NumWorkers: 1, Flush: FlushSettings{Interval: time.Hour, Bytes: 2 << 30}} +func TestBulkIndexer_addBatchAndFlush(t *testing.T) { + cfg := Config{NumWorkers: 1} client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{ RoundTripFunc: func(*http.Request) (*http.Response, error) { return &http.Response{ @@ -63,51 +61,18 @@ func TestBulkIndexer_flushOnClose(t *testing.T) { require.NoError(t, err) bulkIndexer, err := newBulkIndexer(zap.NewNop(), client, &cfg) require.NoError(t, err) - assert.NoError(t, bulkIndexer.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`))) - assert.NoError(t, bulkIndexer.Close(context.Background())) + assert.NoError(t, bulkIndexer.AddBatchAndFlush(context.Background(), + []docappender.BulkIndexerItem{ + { + Index: "foo", + Body: strings.NewReader(`{"foo": "bar"}`), + }, + })) assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load()) + assert.NoError(t, bulkIndexer.Close(context.Background())) } -func TestBulkIndexer_flush(t *testing.T) { - tests := []struct { - name string - config Config - }{ - { - name: "flush.bytes", - config: Config{NumWorkers: 1, Flush: FlushSettings{Interval: time.Hour, Bytes: 1}}, - }, - { - name: "flush.interval", - config: Config{NumWorkers: 1, Flush: FlushSettings{Interval: 50 * time.Millisecond, Bytes: 2 << 30}}, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{ - RoundTripFunc: func(*http.Request) (*http.Response, error) { - return &http.Response{ - Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, - Body: io.NopCloser(strings.NewReader(successResp)), - }, nil - }, - }}) - require.NoError(t, err) - bulkIndexer, err := newBulkIndexer(zap.NewNop(), client, &tt.config) - require.NoError(t, err) - assert.NoError(t, bulkIndexer.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`))) - // should flush - time.Sleep(100 * time.Millisecond) - assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load()) - assert.NoError(t, bulkIndexer.Close(context.Background())) - }) - } -} - -func TestBulkIndexer_flush_error(t *testing.T) { +func TestBulkIndexer_addBatchAndFlush_error(t *testing.T) { tests := []struct { name string roundTripFunc func(*http.Request) (*http.Response, error) @@ -144,20 +109,22 @@ func TestBulkIndexer_flush_error(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - cfg := Config{NumWorkers: 1, Flush: FlushSettings{Interval: time.Hour, Bytes: 1}} + cfg := Config{NumWorkers: 1} client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{ RoundTripFunc: tt.roundTripFunc, }}) require.NoError(t, err) - core, observed := observer.New(zap.NewAtomicLevelAt(zapcore.DebugLevel)) - bulkIndexer, err := newBulkIndexer(zap.New(core), client, &cfg) + bulkIndexer, err := newBulkIndexer(zap.NewNop(), client, &cfg) require.NoError(t, err) - assert.NoError(t, bulkIndexer.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`))) - // should flush - time.Sleep(100 * time.Millisecond) + assert.ErrorContains(t, bulkIndexer.AddBatchAndFlush(context.Background(), + []docappender.BulkIndexerItem{ + { + Index: "foo", + Body: strings.NewReader(`{"foo": "bar"}`), + }, + }), "failed to execute the request") assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load()) assert.NoError(t, bulkIndexer.Close(context.Background())) - assert.Equal(t, 1, observed.FilterMessage("bulk indexer flush error").Len()) }) } } diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 6cb64da0983d..b3d129903409 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -4,12 +4,15 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" import ( + "bytes" "context" "errors" "fmt" "runtime" + "slices" "time" + "github.com/elastic/go-docappender/v2" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pcommon" @@ -30,7 +33,7 @@ type elasticsearchExporter struct { dynamicIndex bool model mappingModel - bulkIndexer *esBulkIndexerCurrent + bulkIndexer *bulkIndexerManager } func newExporter( @@ -90,8 +93,8 @@ func (e *elasticsearchExporter) Shutdown(ctx context.Context) error { } func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { + items := make([]docappender.BulkIndexerItem, 0, ld.LogRecordCount()) var errs []error - rls := ld.ResourceLogs() for i := 0; i < rls.Len(); i++ { rl := rls.At(i) @@ -102,21 +105,26 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) scope := ill.Scope() logs := ill.LogRecords() for k := 0; k < logs.Len(); k++ { - if err := e.pushLogRecord(ctx, resource, logs.At(k), scope); err != nil { + item, err := e.logRecordToItem(resource, logs.At(k), scope) + if err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } errs = append(errs, err) + continue } + items = append(items, item) } } } - + if err := e.bulkIndexer.AddBatchAndFlush(ctx, items); err != nil { + errs = append(errs, err) + } return errors.Join(errs...) } -func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) error { +func (e *elasticsearchExporter) logRecordToItem(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) (docappender.BulkIndexerItem, error) { fIndex := e.index if e.dynamicIndex { fIndex = routeLogRecord(record, scope, resource, fIndex) @@ -125,22 +133,28 @@ func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcom if e.logstashFormat.Enabled { formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now()) if err != nil { - return err + return docappender.BulkIndexerItem{}, err } fIndex = formattedIndex } document, err := e.model.encodeLog(resource, record, scope) if err != nil { - return fmt.Errorf("failed to encode log event: %w", err) + return docappender.BulkIndexerItem{}, fmt.Errorf("failed to encode log event: %w", err) } - return pushDocuments(ctx, fIndex, document, e.bulkIndexer) + return docappender.BulkIndexerItem{ + Index: fIndex, + Body: bytes.NewReader(document), + }, nil } func (e *elasticsearchExporter) pushMetricsData( ctx context.Context, metrics pmetric.Metrics, ) error { + // Ideally the slice will be preallocated once and for all + // but the actual length is uncertain due to grouping + var items []docappender.BulkIndexerItem var errs []error resourceMetrics := metrics.ResourceMetrics() @@ -183,6 +197,8 @@ func (e *elasticsearchExporter) pushMetricsData( } } + items = slices.Grow(items, len(resourceDocs)) + for fIndex, docs := range resourceDocs { for _, doc := range docs { var ( @@ -195,16 +211,18 @@ func (e *elasticsearchExporter) pushMetricsData( continue } - if err := pushDocuments(ctx, fIndex, docBytes, e.bulkIndexer); err != nil { - if cerr := ctx.Err(); cerr != nil { - return cerr - } - errs = append(errs, err) + item := docappender.BulkIndexerItem{ + Index: fIndex, + Body: bytes.NewReader(docBytes), } + items = append(items, item) } } } + if err := e.bulkIndexer.AddBatchAndFlush(ctx, items); err != nil { + errs = append(errs, err) + } return errors.Join(errs...) } @@ -232,6 +250,7 @@ func (e *elasticsearchExporter) pushTraceData( ctx context.Context, td ptrace.Traces, ) error { + items := make([]docappender.BulkIndexerItem, 0, td.SpanCount()) var errs []error resourceSpans := td.ResourceSpans() for i := 0; i < resourceSpans.Len(); i++ { @@ -244,20 +263,25 @@ func (e *elasticsearchExporter) pushTraceData( spans := scopeSpan.Spans() for k := 0; k < spans.Len(); k++ { span := spans.At(k) - if err := e.pushTraceRecord(ctx, resource, span, scope); err != nil { + item, err := e.traceRecordToItem(resource, span, scope) + if err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } errs = append(errs, err) + continue } + items = append(items, item) } } } - + if err := e.bulkIndexer.AddBatchAndFlush(ctx, items); err != nil { + errs = append(errs, err) + } return errors.Join(errs...) } -func (e *elasticsearchExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error { +func (e *elasticsearchExporter) traceRecordToItem(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) (docappender.BulkIndexerItem, error) { fIndex := e.index if e.dynamicIndex { fIndex = routeSpan(span, scope, resource, fIndex) @@ -266,14 +290,17 @@ func (e *elasticsearchExporter) pushTraceRecord(ctx context.Context, resource pc if e.logstashFormat.Enabled { formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now()) if err != nil { - return err + return docappender.BulkIndexerItem{}, err } fIndex = formattedIndex } document, err := e.model.encodeSpan(resource, span, scope) if err != nil { - return fmt.Errorf("failed to encode trace record: %w", err) + return docappender.BulkIndexerItem{}, fmt.Errorf("failed to encode trace record: %w", err) } - return pushDocuments(ctx, fIndex, document, e.bulkIndexer) + return docappender.BulkIndexerItem{ + Index: fIndex, + Body: bytes.NewReader(document), + }, nil } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 754cfaa4675f..26bacf977817 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -39,8 +39,7 @@ func TestExporterLogs(t *testing.T) { }) exporter := newTestLogsExporter(t, server.URL) - mustSendLogRecords(t, exporter, plog.NewLogRecord()) - mustSendLogRecords(t, exporter, plog.NewLogRecord()) + mustSendLogRecords(t, exporter, plog.NewLogRecord(), plog.NewLogRecord()) rec.WaitItems(2) }) @@ -142,7 +141,7 @@ func TestExporterLogs(t *testing.T) { exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { cfg.Headers = map[string]configopaque.String{"foo": "bah"} }) - mustSendLogRecords(t, exporter, plog.NewLogRecord()) + _ = sendLogRecords(t, exporter, plog.NewLogRecord()) <-done }) @@ -163,7 +162,7 @@ func TestExporterLogs(t *testing.T) { exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { cfg.Headers = map[string]configopaque.String{"User-Agent": "overridden"} }) - mustSendLogRecords(t, exporter, plog.NewLogRecord()) + _ = sendLogRecords(t, exporter, plog.NewLogRecord()) <-done }) @@ -345,7 +344,7 @@ func TestExporterLogs(t *testing.T) { server := newESTestServer(t, handler(attempts)) exporter := newTestLogsExporter(t, server.URL, configurer) - mustSendLogRecords(t, exporter, plog.NewLogRecord()) + _ = sendLogRecords(t, exporter, plog.NewLogRecord()) time.Sleep(200 * time.Millisecond) assert.Equal(t, int64(1), attempts.Load()) @@ -363,7 +362,7 @@ func TestExporterLogs(t *testing.T) { }) exporter := newTestLogsExporter(t, server.URL) - mustSendLogRecords(t, exporter, plog.NewLogRecord()) + _ = sendLogRecords(t, exporter, plog.NewLogRecord()) time.Sleep(200 * time.Millisecond) assert.Equal(t, int64(1), attempts.Load()) @@ -437,7 +436,7 @@ func TestExporterLogs(t *testing.T) { }) exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { - cfg.Flush.Interval = 50 * time.Millisecond + cfg.BatcherConfig.FlushTimeout = 50 * time.Millisecond cfg.Retry.InitialInterval = 1 * time.Millisecond cfg.Retry.MaxInterval = 10 * time.Millisecond }) @@ -644,8 +643,7 @@ func TestExporterTraces(t *testing.T) { }) exporter := newTestTracesExporter(t, server.URL) - mustSendSpans(t, exporter, ptrace.NewSpan()) - mustSendSpans(t, exporter, ptrace.NewSpan()) + mustSendSpans(t, exporter, ptrace.NewSpan(), ptrace.NewSpan()) rec.WaitItems(2) }) @@ -818,11 +816,10 @@ func newTestTracesExporter(t *testing.T, url string, fns ...func(*Config)) expor cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) { cfg.Endpoints = []string{url} cfg.NumWorkers = 1 - cfg.Flush.Interval = 10 * time.Millisecond + cfg.BatcherConfig.FlushTimeout = 10 * time.Millisecond }}, fns...)...) exp, err := f.CreateTracesExporter(context.Background(), exportertest.NewNopSettings(), cfg) require.NoError(t, err) - err = exp.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) t.Cleanup(func() { @@ -864,21 +861,30 @@ func newUnstartedTestLogsExporter(t *testing.T, url string, fns ...func(*Config) cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) { cfg.Endpoints = []string{url} cfg.NumWorkers = 1 - cfg.Flush.Interval = 10 * time.Millisecond + cfg.BatcherConfig.FlushTimeout = 10 * time.Millisecond }}, fns...)...) exp, err := f.CreateLogsExporter(context.Background(), exportertest.NewNopSettings(), cfg) require.NoError(t, err) return exp } -func mustSendLogRecords(t *testing.T, exporter exporter.Logs, records ...plog.LogRecord) { +func sendLogRecords(_ *testing.T, exporter exporter.Logs, records ...plog.LogRecord) error { logs := plog.NewLogs() resourceLogs := logs.ResourceLogs().AppendEmpty() scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() for _, record := range records { record.CopyTo(scopeLogs.LogRecords().AppendEmpty()) } - mustSendLogs(t, exporter, logs) + return sendLogs(exporter, logs) +} + +func mustSendLogRecords(t *testing.T, exporter exporter.Logs, records ...plog.LogRecord) { + err := sendLogRecords(t, exporter, records...) + require.NoError(t, err) +} + +func sendLogs(exporter exporter.Logs, logs plog.Logs) error { + return exporter.ConsumeLogs(context.Background(), logs) } func mustSendLogs(t *testing.T, exporter exporter.Logs, logs plog.Logs) { diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 3fb8b295e352..89a5a4d6e360 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -9,13 +9,16 @@ import ( "context" "fmt" "net/http" + "runtime" "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadata" ) @@ -40,7 +43,7 @@ func NewFactory() exporter.Factory { func createDefaultConfig() component.Config { qs := exporterhelper.NewDefaultQueueSettings() - qs.Enabled = false + qs.NumConsumers = 100 // default is too small as it also sets batch sender concurrency limit httpClientConfig := confighttp.NewDefaultClientConfig() httpClientConfig.Timeout = 90 * time.Second @@ -74,6 +77,16 @@ func createDefaultConfig() component.Config { http.StatusGatewayTimeout, }, }, + BatcherConfig: exporterbatcher.Config{ + Enabled: true, + FlushTimeout: 30 * time.Second, + MinSizeConfig: exporterbatcher.MinSizeConfig{ + MinSizeItems: 5000, + }, + MaxSizeConfig: exporterbatcher.MaxSizeConfig{ + MaxSizeItems: 10000, + }, + }, Mapping: MappingsSettings{ Mode: "none", Dedup: true, @@ -88,6 +101,7 @@ func createDefaultConfig() component.Config { LogRequestBody: false, LogResponseBody: false, }, + NumWorkers: runtime.NumCPU(), } } @@ -101,14 +115,11 @@ func createLogsExporter( ) (exporter.Logs, error) { cf := cfg.(*Config) - index := cf.LogsIndex - if cf.Index != "" { - set.Logger.Warn("index option are deprecated and replaced with logs_index and traces_index.") - index = cf.Index + if err := handleDeprecations(cf, set.Logger); err != nil { + return nil, err } - logConfigDeprecationWarnings(cf, set.Logger) - exporter, err := newExporter(cf, set, index, cf.LogsDynamicIndex.Enabled) + exporter, err := newExporter(cf, set, cf.LogsIndex, cf.LogsDynamicIndex.Enabled) if err != nil { return nil, fmt.Errorf("cannot configure Elasticsearch exporter: %w", err) } @@ -120,8 +131,11 @@ func createLogsExporter( exporter.pushLogsData, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithStart(exporter.Start), + exporterhelper.WithBatcher(cf.BatcherConfig), exporterhelper.WithShutdown(exporter.Shutdown), exporterhelper.WithQueue(cf.QueueSettings), + // effectively disable timeout_sender because timeout is enforced in bulk indexer + exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), ) } @@ -131,7 +145,20 @@ func createMetricsExporter( cfg component.Config, ) (exporter.Metrics, error) { cf := cfg.(*Config) - logConfigDeprecationWarnings(cf, set.Logger) + + if err := handleDeprecations(cf, set.Logger); err != nil { + return nil, err + } + + // Workaround to avoid rejections from Elasticsearch. + // TSDB does not accept 2 documents with the same timestamp and dimensions. + // + // Setting MaxSizeItems ensures that the batcher will not split a set of + // metrics into multiple batches, potentially sending two metric data points + // with the same timestamp and dimensions as separate documents. + batcherCfg := cf.BatcherConfig + batcherCfg.MaxSizeConfig.MaxSizeItems = 0 + set.Logger.Warn("batcher.max_size_items is ignored: metrics exporter does not support batch splitting") exporter, err := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled) if err != nil { @@ -144,22 +171,30 @@ func createMetricsExporter( exporter.pushMetricsData, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithStart(exporter.Start), + exporterhelper.WithBatcher(batcherCfg), exporterhelper.WithShutdown(exporter.Shutdown), exporterhelper.WithQueue(cf.QueueSettings), + // effectively disable timeout_sender because timeout is enforced in bulk indexer + exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), ) } +// createTracesExporter creates a new exporter for traces. func createTracesExporter(ctx context.Context, set exporter.Settings, cfg component.Config) (exporter.Traces, error) { cf := cfg.(*Config) - logConfigDeprecationWarnings(cf, set.Logger) + + if err := handleDeprecations(cf, set.Logger); err != nil { + return nil, err + } exporter, err := newExporter(cf, set, cf.TracesIndex, cf.TracesDynamicIndex.Enabled) if err != nil { return nil, fmt.Errorf("cannot configure Elasticsearch exporter: %w", err) } + return exporterhelper.NewTracesExporter( ctx, set, @@ -167,7 +202,41 @@ func createTracesExporter(ctx context.Context, exporter.pushTraceData, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithStart(exporter.Start), + exporterhelper.WithBatcher(cf.BatcherConfig), exporterhelper.WithShutdown(exporter.Shutdown), exporterhelper.WithQueue(cf.QueueSettings), + // effectively disable timeout_sender because timeout is enforced in bulk indexer + exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), ) } + +// handleDeprecations handles deprecated config options. +// If possible, translate deprecated config options to new config options +// Otherwise, return an error so that the user is aware of an unsupported option. +func handleDeprecations(cf *Config, logger *zap.Logger) error { //nolint:unparam + if cf.Index != "" { + logger.Warn(`"index" option is deprecated and replaced with "logs_index" and "traces_index". Setting "logs_index" to the value of "index".`, zap.String("value", cf.Index)) + cf.LogsIndex = cf.Index + } + + if !cf.Mapping.Dedup { + logger.Warn("dedup has been deprecated, and will always be enabled in future") + } + if cf.Mapping.Dedot && cf.MappingMode() != MappingECS || !cf.Mapping.Dedot && cf.MappingMode() == MappingECS { + logger.Warn("dedot has been deprecated: in the future, dedotting will always be performed in ECS mode only") + } + + if cf.Flush.Bytes != 0 { + const factor = 1000 + val := cf.Flush.Bytes / factor + logger.Warn(fmt.Sprintf(`"flush.bytes" option is deprecated and replaced with "batcher.min_size_items". Setting "batcher.min_size_items" to the value of "flush.bytes" / %d.`, factor), zap.Int("value", val)) + cf.BatcherConfig.MinSizeItems = val + } + + if cf.Flush.Interval != 0 { + logger.Warn(`"flush.interval" option is deprecated and replaced with "batcher.flush_timeout". Setting "batcher.flush_timeout" to the value of "flush.interval".`, zap.Duration("value", cf.Flush.Interval)) + cf.BatcherConfig.FlushTimeout = cf.Flush.Interval + } + + return nil +} diff --git a/exporter/elasticsearchexporter/factory_test.go b/exporter/elasticsearchexporter/factory_test.go index 9425ec2723d7..9b4923f33587 100644 --- a/exporter/elasticsearchexporter/factory_test.go +++ b/exporter/elasticsearchexporter/factory_test.go @@ -31,6 +31,7 @@ func TestFactory_CreateLogsExporter(t *testing.T) { exporter, err := factory.CreateLogsExporter(context.Background(), params, cfg) require.NoError(t, err) require.NotNil(t, exporter) + require.NoError(t, exporter.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, exporter.Shutdown(context.Background())) } @@ -66,6 +67,7 @@ func TestFactory_CreateTracesExporter(t *testing.T) { exporter, err := factory.CreateTracesExporter(context.Background(), params, cfg) require.NoError(t, err) require.NotNil(t, exporter) + require.NoError(t, exporter.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, exporter.Shutdown(context.Background())) } @@ -89,11 +91,13 @@ func TestFactory_CreateLogsAndTracesExporterWithDeprecatedIndexOption(t *testing logsExporter, err := factory.CreateLogsExporter(context.Background(), params, cfg) require.NoError(t, err) require.NotNil(t, logsExporter) + require.NoError(t, logsExporter.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, logsExporter.Shutdown(context.Background())) tracesExporter, err := factory.CreateTracesExporter(context.Background(), params, cfg) require.NoError(t, err) require.NotNil(t, tracesExporter) + require.NoError(t, tracesExporter.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, tracesExporter.Shutdown(context.Background())) } @@ -122,10 +126,13 @@ func TestFactory_DedupDeprecated(t *testing.T) { require.NoError(t, metricsExporter.Shutdown(context.Background())) records := logObserver.AllUntimed() - assert.Len(t, records, 3) - assert.Equal(t, "dedup has been deprecated, and will always be enabled in future", records[0].Message) - assert.Equal(t, "dedup has been deprecated, and will always be enabled in future", records[1].Message) - assert.Equal(t, "dedup has been deprecated, and will always be enabled in future", records[2].Message) + var cnt int + for _, record := range records { + if record.Message == "dedup has been deprecated, and will always be enabled in future" { + cnt++ + } + } + assert.Equal(t, 3, cnt) } func TestFactory_DedotDeprecated(t *testing.T) { @@ -161,8 +168,11 @@ func TestFactory_DedotDeprecated(t *testing.T) { } records := logObserver.AllUntimed() - assert.Len(t, records, 6) + var cnt int for _, record := range records { - assert.Equal(t, "dedot has been deprecated: in the future, dedotting will always be performed in ECS mode only", record.Message) + if record.Message == "dedot has been deprecated: in the future, dedotting will always be performed in ECS mode only" { + cnt++ + } } + assert.Equal(t, 6, cnt) } diff --git a/exporter/elasticsearchexporter/generated_component_test.go b/exporter/elasticsearchexporter/generated_component_test.go index deac0d2dd56f..fe80d18ebbc6 100644 --- a/exporter/elasticsearchexporter/generated_component_test.go +++ b/exporter/elasticsearchexporter/generated_component_test.go @@ -106,8 +106,6 @@ func TestComponentLifecycle(t *testing.T) { } }) - require.NoError(t, err) - err = c.Shutdown(context.Background()) require.NoError(t, err) }) diff --git a/exporter/elasticsearchexporter/go.mod b/exporter/elasticsearchexporter/go.mod index b763a55a37dd..a5fb12a0d617 100644 --- a/exporter/elasticsearchexporter/go.mod +++ b/exporter/elasticsearchexporter/go.mod @@ -26,6 +26,7 @@ require ( go.opentelemetry.io/otel/trace v1.28.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 + golang.org/x/sync v0.7.0 ) require ( @@ -83,7 +84,6 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.28.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.26.0 // indirect - golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect diff --git a/exporter/elasticsearchexporter/integrationtest/.gitignore b/exporter/elasticsearchexporter/integrationtest/.gitignore index 484ab7e5c61d..a664f66810d6 100644 --- a/exporter/elasticsearchexporter/integrationtest/.gitignore +++ b/exporter/elasticsearchexporter/integrationtest/.gitignore @@ -1 +1,2 @@ results/* +*.test diff --git a/exporter/elasticsearchexporter/integrationtest/datareceiver.go b/exporter/elasticsearchexporter/integrationtest/datareceiver.go index e530d864fb35..81bfcf7079d5 100644 --- a/exporter/elasticsearchexporter/integrationtest/datareceiver.go +++ b/exporter/elasticsearchexporter/integrationtest/datareceiver.go @@ -4,17 +4,21 @@ package integrationtest // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/integrationtest" import ( + "bufio" "context" "encoding/json" "errors" "fmt" + "math" "net/http" "net/url" + "sync/atomic" "testing" "github.com/elastic/go-docappender/v2/docappendertest" "github.com/gorilla/mux" "github.com/stretchr/testify/require" + "github.com/tidwall/gjson" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" @@ -42,19 +46,26 @@ const ( TestTracesIndex = "traces-test-idx" ) +type counters struct { + observedBulkRequests atomic.Int64 +} + type esDataReceiver struct { testbed.DataReceiverBase receiver receiver.Logs endpoint string decodeBulkRequest bool t testing.TB + + *counters } -func newElasticsearchDataReceiver(t testing.TB, decodeBulkRequest bool) *esDataReceiver { +func newElasticsearchDataReceiver(t testing.TB, decodeBulkRequest bool, counts *counters) *esDataReceiver { return &esDataReceiver{ DataReceiverBase: testbed.DataReceiverBase{}, endpoint: fmt.Sprintf("http://%s:%d", testbed.DefaultHost, testutil.GetAvailablePort(t)), decodeBulkRequest: decodeBulkRequest, + counters: counts, t: t, } } @@ -73,6 +84,9 @@ func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consu cfg := factory.CreateDefaultConfig().(*config) cfg.ServerConfig.Endpoint = esURL.Host cfg.DecodeBulkRequests = es.decodeBulkRequest + if es.counters != nil { + cfg.counters = es.counters + } set := receivertest.NewNopSettings() // Use an actual logger to log errors. @@ -107,13 +121,18 @@ func (es *esDataReceiver) GenConfigYAMLStr() string { endpoints: [%s] logs_index: %s traces_index: %s - flush: - interval: 1s + batcher: + flush_timeout: 1s sending_queue: enabled: true + storage: file_storage/elasticsearchexporter + num_consumers: 100 + queue_size: 100000 retry: enabled: true max_requests: 10000 + initial_interval: 100ms + max_interval: 1s ` return fmt.Sprintf(cfgFormat, es.endpoint, TestLogsIndex, TestTracesIndex) } @@ -131,14 +150,18 @@ type config struct { // set to false then the consumers will not consume any events and the // bulk request will always return http.StatusOK. DecodeBulkRequests bool + + *counters } func createDefaultConfig() component.Config { return &config{ ServerConfig: confighttp.ServerConfig{ - Endpoint: "127.0.0.1:9200", + Endpoint: "127.0.0.1:9200", + MaxRequestBodySize: math.MaxInt64, }, DecodeBulkRequests: true, + counters: &counters{}, } } @@ -215,10 +238,36 @@ func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`) }) r.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) { + es.config.observedBulkRequests.Add(1) if !es.config.DecodeBulkRequests { + defer r.Body.Close() + s := bufio.NewScanner(r.Body) + for s.Scan() { + action := gjson.GetBytes(s.Bytes(), "create._index") + if !action.Exists() { + // might be the last newline, skip + continue + } + switch action.Str { + case TestLogsIndex: + _ = es.logsConsumer.ConsumeLogs(context.Background(), emptyLogs) + case TestTracesIndex: + _ = es.tracesConsumer.ConsumeTraces(context.Background(), emptyTrace) + default: + w.WriteHeader(http.StatusBadRequest) + return + } + s.Scan() // skip next line + } + if s.Err() != nil { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprintln(w, s.Err()) + return + } fmt.Fprintln(w, "{}") return } + _, response := docappendertest.DecodeBulkRequest(r) for _, itemMap := range response.Items { for k, item := range itemMap { diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go index 2a1fe98db9b3..8008ea72691c 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go @@ -25,14 +25,19 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" ) -func BenchmarkExporter(b *testing.B) { +// BenchmarkExporterFlushItems benchmarks exporter flush triggered by flush batch size settings, e.g. min_size_items. +func BenchmarkExporterFlushItems(b *testing.B) { + updateESCfg := func(esCfg *elasticsearchexporter.Config) { + esCfg.BatcherConfig.MinSizeItems = 100 // has to be smaller than the smallest batch size, otherwise it will block + esCfg.BatcherConfig.MaxSizeItems = 500 + esCfg.BatcherConfig.FlushTimeout = time.Hour + } for _, eventType := range []string{"logs", "traces"} { for _, mappingMode := range []string{"none", "ecs", "raw"} { for _, tc := range []struct { name string batchSize int }{ - {name: "small_batch", batchSize: 10}, {name: "medium_batch", batchSize: 100}, {name: "large_batch", batchSize: 1000}, {name: "xlarge_batch", batchSize: 10000}, @@ -40,9 +45,9 @@ func BenchmarkExporter(b *testing.B) { b.Run(fmt.Sprintf("%s/%s/%s", eventType, mappingMode, tc.name), func(b *testing.B) { switch eventType { case "logs": - benchmarkLogs(b, tc.batchSize, mappingMode) + benchmarkLogs(b, tc.batchSize, mappingMode, updateESCfg) case "traces": - benchmarkTraces(b, tc.batchSize, mappingMode) + benchmarkTraces(b, tc.batchSize, mappingMode, updateESCfg) } }) } @@ -50,13 +55,14 @@ func BenchmarkExporter(b *testing.B) { } } -func benchmarkLogs(b *testing.B, batchSize int, mappingMode string) { +func benchmarkLogs(b *testing.B, batchSize int, mappingMode string, updateESCfg func(*elasticsearchexporter.Config)) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() exporterSettings := exportertest.NewNopSettings() exporterSettings.TelemetrySettings.Logger = zaptest.NewLogger(b, zaptest.Level(zap.WarnLevel)) runnerCfg := prepareBenchmark(b, batchSize, mappingMode) + updateESCfg(runnerCfg.esCfg) exporter, err := runnerCfg.factory.CreateLogsExporter( ctx, exporterSettings, runnerCfg.esCfg, ) @@ -72,20 +78,18 @@ func benchmarkLogs(b *testing.B, batchSize int, mappingMode string) { require.NoError(b, exporter.ConsumeLogs(ctx, logs)) b.StopTimer() } - b.ReportMetric( - float64(runnerCfg.generatedCount.Load())/b.Elapsed().Seconds(), - "events/s", - ) require.NoError(b, exporter.Shutdown(ctx)) + reportMetrics(b, runnerCfg) } -func benchmarkTraces(b *testing.B, batchSize int, mappingMode string) { +func benchmarkTraces(b *testing.B, batchSize int, mappingMode string, updateESCfg func(*elasticsearchexporter.Config)) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() exporterSettings := exportertest.NewNopSettings() exporterSettings.TelemetrySettings.Logger = zaptest.NewLogger(b, zaptest.Level(zap.WarnLevel)) runnerCfg := prepareBenchmark(b, batchSize, mappingMode) + updateESCfg(runnerCfg.esCfg) exporter, err := runnerCfg.factory.CreateTracesExporter( ctx, exporterSettings, runnerCfg.esCfg, ) @@ -101,11 +105,8 @@ func benchmarkTraces(b *testing.B, batchSize int, mappingMode string) { require.NoError(b, exporter.ConsumeTraces(ctx, traces)) b.StopTimer() } - b.ReportMetric( - float64(runnerCfg.generatedCount.Load())/b.Elapsed().Seconds(), - "events/s", - ) require.NoError(b, exporter.Shutdown(ctx)) + reportMetrics(b, runnerCfg) } type benchRunnerCfg struct { @@ -113,7 +114,10 @@ type benchRunnerCfg struct { provider testbed.DataProvider esCfg *elasticsearchexporter.Config - generatedCount atomic.Uint64 + generatedCount atomic.Uint64 + observedDocCount atomic.Int64 + + *counters } func prepareBenchmark( @@ -123,9 +127,11 @@ func prepareBenchmark( ) *benchRunnerCfg { b.Helper() - cfg := &benchRunnerCfg{} + cfg := &benchRunnerCfg{ + counters: &counters{}, + } // Benchmarks don't decode the bulk requests to avoid allocations to pollute the results. - receiver := newElasticsearchDataReceiver(b, false /* DecodeBulkRequest */) + receiver := newElasticsearchDataReceiver(b, false /* DecodeBulkRequest */, cfg.counters) cfg.provider = testbed.NewPerfTestDataProvider(testbed.LoadOptions{ItemsPerBatch: batchSize}) cfg.provider.SetLoadGeneratorCounters(&cfg.generatedCount) @@ -135,18 +141,21 @@ func prepareBenchmark( cfg.esCfg.Endpoints = []string{receiver.endpoint} cfg.esCfg.LogsIndex = TestLogsIndex cfg.esCfg.TracesIndex = TestTracesIndex - cfg.esCfg.Flush.Interval = 10 * time.Millisecond cfg.esCfg.NumWorkers = 1 + cfg.esCfg.QueueSettings.Enabled = false - tc, err := consumer.NewTraces(func(context.Context, ptrace.Traces) error { + tc, err := consumer.NewTraces(func(_ context.Context, traces ptrace.Traces) error { + cfg.observedDocCount.Add(int64(traces.SpanCount())) return nil }) require.NoError(b, err) - mc, err := consumer.NewMetrics(func(context.Context, pmetric.Metrics) error { + mc, err := consumer.NewMetrics(func(_ context.Context, metrics pmetric.Metrics) error { + cfg.observedDocCount.Add(int64(metrics.DataPointCount())) return nil }) require.NoError(b, err) - lc, err := consumer.NewLogs(func(context.Context, plog.Logs) error { + lc, err := consumer.NewLogs(func(_ context.Context, logs plog.Logs) error { + cfg.observedDocCount.Add(int64(logs.LogRecordCount())) return nil }) require.NoError(b, err) @@ -156,3 +165,18 @@ func prepareBenchmark( return cfg } + +func reportMetrics(b *testing.B, runnerCfg *benchRunnerCfg) { + b.ReportMetric( + float64(runnerCfg.generatedCount.Load())/b.Elapsed().Seconds(), + "events/s", + ) + b.ReportMetric( + float64(runnerCfg.observedDocCount.Load())/b.Elapsed().Seconds(), + "docs/s", + ) + b.ReportMetric( + float64(runnerCfg.observedBulkRequests.Load())/b.Elapsed().Seconds(), + "bulkReqs/s", + ) +} diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_test.go index c0df3d575308..f1ea974145b0 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_test.go @@ -28,10 +28,12 @@ func TestExporter(t *testing.T) { }{ {name: "basic"}, {name: "es_intermittent_failure", mockESFailure: true}, - /* TODO: Below tests should be enabled after https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30792 is fixed {name: "collector_restarts", restartCollector: true}, - {name: "collector_restart_with_es_intermittent_failure", mockESFailure: true, restartCollector: true}, - */ + // Test is failing due to timeout because in-flight requests are not aware of shutdown + // and will keep retrying up to the configured retry limit + // TODO: re-enable test after moving to use retry sender + // as https://github.com/open-telemetry/opentelemetry-collector/issues/10166 is fixed + // {name: "collector_restart_with_es_intermittent_failure", mockESFailure: true, restartCollector: true}, } { t.Run(fmt.Sprintf("%s/%s", eventType, tc.name), func(t *testing.T) { runner(t, eventType, tc.restartCollector, tc.mockESFailure) @@ -57,14 +59,19 @@ func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool t.Fatalf("failed to create data sender for type: %s", eventType) } - receiver := newElasticsearchDataReceiver(t, true) + receiver := newElasticsearchDataReceiver(t, true, nil) loadOpts := testbed.LoadOptions{ DataItemsPerSecond: 1_000, ItemsPerBatch: 10, } provider := testbed.NewPerfTestDataProvider(loadOpts) - cfg := createConfigYaml(t, sender, receiver, nil, nil, eventType, getDebugFlag(t)) + tempDir := t.TempDir() + extensions := map[string]string{ + "file_storage/elasticsearchexporter": fmt.Sprintf(`file_storage/elasticsearchexporter: + directory: %s`, tempDir), + } + cfg := createConfigYaml(t, sender, receiver, nil, extensions, eventType, getDebugFlag(t)) t.Log("test otel collector configuration:", cfg) collector := newRecreatableOtelCol(t) cleanup, err := collector.PrepareConfig(cfg) diff --git a/exporter/elasticsearchexporter/integrationtest/go.mod b/exporter/elasticsearchexporter/integrationtest/go.mod index 1e1b9f58e0aa..59bfcaf1880a 100644 --- a/exporter/elasticsearchexporter/integrationtest/go.mod +++ b/exporter/elasticsearchexporter/integrationtest/go.mod @@ -12,6 +12,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/testbed v0.104.0 github.com/shirou/gopsutil/v4 v4.24.6 github.com/stretchr/testify v1.9.0 + github.com/tidwall/gjson v1.17.1 go.opentelemetry.io/collector/component v0.104.1-0.20240709093154-e7ce1d50fb5e go.opentelemetry.io/collector/config/confighttp v0.104.1-0.20240709093154-e7ce1d50fb5e go.opentelemetry.io/collector/confmap v0.104.1-0.20240709093154-e7ce1d50fb5e @@ -114,6 +115,8 @@ require ( github.com/soheilhy/cmux v0.1.5 // indirect github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect github.com/tilinna/clock v1.1.0 // indirect github.com/tklauser/go-sysconf v0.3.13 // indirect github.com/tklauser/numcpus v0.7.0 // indirect @@ -172,7 +175,7 @@ require ( go.opentelemetry.io/otel/trace v1.28.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect + golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 // indirect golang.org/x/net v0.27.0 // indirect golang.org/x/sys v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect diff --git a/exporter/elasticsearchexporter/integrationtest/go.sum b/exporter/elasticsearchexporter/integrationtest/go.sum index d758cf02c48b..2a8475ada527 100644 --- a/exporter/elasticsearchexporter/integrationtest/go.sum +++ b/exporter/elasticsearchexporter/integrationtest/go.sum @@ -236,6 +236,12 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U= +github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tilinna/clock v1.1.0 h1:6IQQQCo6KoBxVudv6gwtY8o4eDfhHo8ojA5dP0MfhSs= github.com/tilinna/clock v1.1.0/go.mod h1:ZsP7BcY7sEEz7ktc0IVy8Us6boDrK8VradlKRUGfOao= github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4= @@ -392,8 +398,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= -golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/exporter/elasticsearchexporter/metadata.yaml b/exporter/elasticsearchexporter/metadata.yaml index e1220cce1896..90246a3d9519 100644 --- a/exporter/elasticsearchexporter/metadata.yaml +++ b/exporter/elasticsearchexporter/metadata.yaml @@ -13,3 +13,4 @@ status: tests: config: endpoints: [http://localhost:9200] + expect_consumer_error: true diff --git a/exporter/elasticsearchexporter/testdata/config.yaml b/exporter/elasticsearchexporter/testdata/config.yaml index acd6e92f9001..ccbd56d47681 100644 --- a/exporter/elasticsearchexporter/testdata/config.yaml +++ b/exporter/elasticsearchexporter/testdata/config.yaml @@ -14,13 +14,16 @@ elasticsearch/trace: api_key: AvFsEiPs== discover: on_start: true - flush: - bytes: 10485760 + batcher: + min_size_items: 100 + max_size_items: 200 + flush_timeout: 5s retry: max_requests: 5 retry_on_status: - 429 - 500 + num_workers: 1 elasticsearch/metric: tls: insecure: false @@ -35,8 +38,10 @@ elasticsearch/metric: api_key: AvFsEiPs== discover: on_start: true - flush: - bytes: 10485760 + batcher: + min_size_items: 100 + max_size_items: 200 + flush_timeout: 5s retry: max_requests: 5 retry_on_status: @@ -44,6 +49,7 @@ elasticsearch/metric: - 500 sending_queue: enabled: true + num_workers: 1 elasticsearch/log: tls: insecure: false @@ -58,15 +64,18 @@ elasticsearch/log: api_key: AvFsEiPs== discover: on_start: true - flush: - bytes: 10485760 + batcher: + min_size_items: 100 + max_size_items: 200 + flush_timeout: 5s retry: max_requests: 5 retry_on_status: - 429 - 500 sending_queue: - enabled: true + enabled: false + num_workers: 1 elasticsearch/logstash_format: endpoints: [http://localhost:9200] logstash_format: