From 5de3765ddcf3413ed9faa9be17f561356b697b45 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 17 Jul 2024 11:11:44 +0800 Subject: [PATCH 1/3] Split ES client code & bulk indexer This is in preparation for having two parallel implementations for bulk indexing: the current buffering/async one, and a new synchronous one that would work with the exporterhelper batch sender. --- .../elasticsearch_bulk.go | 173 +----------------- exporter/elasticsearchexporter/esclient.go | 166 +++++++++++++++++ exporter/elasticsearchexporter/exporter.go | 2 +- 3 files changed, 174 insertions(+), 167 deletions(-) create mode 100644 exporter/elasticsearchexporter/esclient.go diff --git a/exporter/elasticsearchexporter/elasticsearch_bulk.go b/exporter/elasticsearchexporter/elasticsearch_bulk.go index 5beb768bc582a..9be1e69e28a67 100644 --- a/exporter/elasticsearchexporter/elasticsearch_bulk.go +++ b/exporter/elasticsearchexporter/elasticsearch_bulk.go @@ -8,180 +8,21 @@ import ( "context" "fmt" "io" - "net/http" "runtime" "sync" "sync/atomic" "time" - "github.com/cenkalti/backoff/v4" "github.com/elastic/go-docappender/v2" - elasticsearch7 "github.com/elastic/go-elasticsearch/v7" - "go.opentelemetry.io/collector/component" + "github.com/elastic/go-elasticsearch/v7" "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/sanitize" ) -type esClientCurrent = elasticsearch7.Client -type esConfigCurrent = elasticsearch7.Config - -type esBulkIndexerCurrent = bulkIndexerPool - -type esBulkIndexerItem = docappender.BulkIndexerItem - -// clientLogger implements the estransport.Logger interface -// that is required by the Elasticsearch client for logging. -type clientLogger struct { - *zap.Logger - logRequestBody bool - logResponseBody bool -} - -// LogRoundTrip should not modify the request or response, except for consuming and closing the body. -// Implementations have to check for nil values in request and response. -func (cl *clientLogger) LogRoundTrip(requ *http.Request, resp *http.Response, clientErr error, _ time.Time, dur time.Duration) error { - zl := cl.Logger - - var fields []zap.Field - if cl.logRequestBody && requ != nil && requ.Body != nil { - if b, err := io.ReadAll(requ.Body); err == nil { - fields = append(fields, zap.ByteString("request_body", b)) - } - } - if cl.logResponseBody && resp != nil && resp.Body != nil { - if b, err := io.ReadAll(resp.Body); err == nil { - fields = append(fields, zap.ByteString("response_body", b)) - } - } - - switch { - case clientErr == nil && resp != nil: - fields = append( - fields, - zap.String("path", sanitize.String(requ.URL.Path)), - zap.String("method", requ.Method), - zap.Duration("duration", dur), - zap.String("status", resp.Status), - ) - zl.Debug("Request roundtrip completed.", fields...) - - case clientErr != nil: - fields = append( - fields, - zap.NamedError("reason", clientErr), - ) - zl.Debug("Request failed.", fields...) - } - - return nil -} - -// RequestBodyEnabled makes the client pass a copy of request body to the logger. -func (cl *clientLogger) RequestBodyEnabled() bool { - return cl.logRequestBody -} - -// ResponseBodyEnabled makes the client pass a copy of response body to the logger. -func (cl *clientLogger) ResponseBodyEnabled() bool { - return cl.logResponseBody -} - -func newElasticsearchClient( - ctx context.Context, - config *Config, - host component.Host, - telemetry component.TelemetrySettings, - userAgent string, -) (*esClientCurrent, error) { - httpClient, err := config.ClientConfig.ToClient(ctx, host, telemetry) - if err != nil { - return nil, err - } - - headers := make(http.Header) - headers.Set("User-Agent", userAgent) - - // maxRetries configures the maximum number of event publishing attempts, - // including the first send and additional retries. - - maxRetries := config.Retry.MaxRequests - 1 - retryDisabled := !config.Retry.Enabled || maxRetries <= 0 - - if retryDisabled { - maxRetries = 0 - } - - // endpoints converts Config.Endpoints, Config.CloudID, - // and Config.ClientConfig.Endpoint to a list of addresses. - endpoints, err := config.endpoints() - if err != nil { - return nil, err - } - - esLogger := clientLogger{ - Logger: telemetry.Logger, - logRequestBody: config.LogRequestBody, - logResponseBody: config.LogResponseBody, - } - - return elasticsearch7.NewClient(esConfigCurrent{ - Transport: httpClient.Transport, - - // configure connection setup - Addresses: endpoints, - Username: config.Authentication.User, - Password: string(config.Authentication.Password), - APIKey: string(config.Authentication.APIKey), - Header: headers, - - // configure retry behavior - RetryOnStatus: config.Retry.RetryOnStatus, - DisableRetry: retryDisabled, - EnableRetryOnTimeout: config.Retry.Enabled, - //RetryOnError: retryOnError, // should be used from esclient version 8 onwards - MaxRetries: maxRetries, - RetryBackoff: createElasticsearchBackoffFunc(&config.Retry), - - // configure sniffing - DiscoverNodesOnStart: config.Discovery.OnStart, - DiscoverNodesInterval: config.Discovery.Interval, - - // configure internal metrics reporting and logging - EnableMetrics: false, // TODO - EnableDebugLogger: false, // TODO - Logger: &esLogger, - }) -} - -func createElasticsearchBackoffFunc(config *RetrySettings) func(int) time.Duration { - if !config.Enabled { - return nil - } - - expBackoff := backoff.NewExponentialBackOff() - if config.InitialInterval > 0 { - expBackoff.InitialInterval = config.InitialInterval - } - if config.MaxInterval > 0 { - expBackoff.MaxInterval = config.MaxInterval - } - expBackoff.Reset() - - return func(attempts int) time.Duration { - if attempts == 1 { - expBackoff.Reset() - } - - return expBackoff.NextBackOff() - } -} - -func pushDocuments(ctx context.Context, index string, document []byte, bulkIndexer *esBulkIndexerCurrent) error { +func pushDocuments(ctx context.Context, index string, document []byte, bulkIndexer *bulkIndexerPool) error { return bulkIndexer.Add(ctx, index, bytes.NewReader(document)) } -func newBulkIndexer(logger *zap.Logger, client *elasticsearch7.Client, config *Config) (*esBulkIndexerCurrent, error) { +func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (*bulkIndexerPool, error) { numWorkers := config.NumWorkers if numWorkers == 0 { numWorkers = runtime.NumCPU() @@ -206,7 +47,7 @@ func newBulkIndexer(logger *zap.Logger, client *elasticsearch7.Client, config *C pool := &bulkIndexerPool{ wg: sync.WaitGroup{}, - items: make(chan esBulkIndexerItem, config.NumWorkers), + items: make(chan docappender.BulkIndexerItem, config.NumWorkers), stats: bulkIndexerStats{}, } pool.wg.Add(numWorkers) @@ -243,7 +84,7 @@ type bulkIndexerStats struct { } type bulkIndexerPool struct { - items chan esBulkIndexerItem + items chan docappender.BulkIndexerItem wg sync.WaitGroup stats bulkIndexerStats } @@ -252,7 +93,7 @@ type bulkIndexerPool struct { // // Adding an item after a call to Close() will panic. func (p *bulkIndexerPool) Add(ctx context.Context, index string, document io.WriterTo) error { - item := esBulkIndexerItem{ + item := docappender.BulkIndexerItem{ Index: index, Body: document, } @@ -282,7 +123,7 @@ func (p *bulkIndexerPool) Close(ctx context.Context) error { type worker struct { indexer *docappender.BulkIndexer - items <-chan esBulkIndexerItem + items <-chan docappender.BulkIndexerItem flushInterval time.Duration flushTimeout time.Duration flushBytes int diff --git a/exporter/elasticsearchexporter/esclient.go b/exporter/elasticsearchexporter/esclient.go new file mode 100644 index 0000000000000..23c2d48bb9ef9 --- /dev/null +++ b/exporter/elasticsearchexporter/esclient.go @@ -0,0 +1,166 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" + +import ( + "context" + "io" + "net/http" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/elastic/go-elasticsearch/v7" + "go.opentelemetry.io/collector/component" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/sanitize" +) + +// clientLogger implements the estransport.Logger interface +// that is required by the Elasticsearch client for logging. +type clientLogger struct { + *zap.Logger + logRequestBody bool + logResponseBody bool +} + +// LogRoundTrip should not modify the request or response, except for consuming and closing the body. +// Implementations have to check for nil values in request and response. +func (cl *clientLogger) LogRoundTrip(requ *http.Request, resp *http.Response, clientErr error, _ time.Time, dur time.Duration) error { + zl := cl.Logger + + var fields []zap.Field + if cl.logRequestBody && requ != nil && requ.Body != nil { + if b, err := io.ReadAll(requ.Body); err == nil { + fields = append(fields, zap.ByteString("request_body", b)) + } + } + if cl.logResponseBody && resp != nil && resp.Body != nil { + if b, err := io.ReadAll(resp.Body); err == nil { + fields = append(fields, zap.ByteString("response_body", b)) + } + } + + switch { + case clientErr == nil && resp != nil: + fields = append( + fields, + zap.String("path", sanitize.String(requ.URL.Path)), + zap.String("method", requ.Method), + zap.Duration("duration", dur), + zap.String("status", resp.Status), + ) + zl.Debug("Request roundtrip completed.", fields...) + + case clientErr != nil: + fields = append( + fields, + zap.NamedError("reason", clientErr), + ) + zl.Debug("Request failed.", fields...) + } + + return nil +} + +// RequestBodyEnabled makes the client pass a copy of request body to the logger. +func (cl *clientLogger) RequestBodyEnabled() bool { + return cl.logRequestBody +} + +// ResponseBodyEnabled makes the client pass a copy of response body to the logger. +func (cl *clientLogger) ResponseBodyEnabled() bool { + return cl.logResponseBody +} + +// newElasticsearchClient returns a new elasticsearch.Client +func newElasticsearchClient( + ctx context.Context, + config *Config, + host component.Host, + telemetry component.TelemetrySettings, + userAgent string, +) (*elasticsearch.Client, error) { + httpClient, err := config.ClientConfig.ToClient(ctx, host, telemetry) + if err != nil { + return nil, err + } + + headers := make(http.Header) + headers.Set("User-Agent", userAgent) + + // maxRetries configures the maximum number of event publishing attempts, + // including the first send and additional retries. + + maxRetries := config.Retry.MaxRequests - 1 + retryDisabled := !config.Retry.Enabled || maxRetries <= 0 + + if retryDisabled { + maxRetries = 0 + } + + // endpoints converts Config.Endpoints, Config.CloudID, + // and Config.ClientConfig.Endpoint to a list of addresses. + endpoints, err := config.endpoints() + if err != nil { + return nil, err + } + + esLogger := clientLogger{ + Logger: telemetry.Logger, + logRequestBody: config.LogRequestBody, + logResponseBody: config.LogResponseBody, + } + + return elasticsearch.NewClient(elasticsearch.Config{ + Transport: httpClient.Transport, + + // configure connection setup + Addresses: endpoints, + Username: config.Authentication.User, + Password: string(config.Authentication.Password), + APIKey: string(config.Authentication.APIKey), + Header: headers, + + // configure retry behavior + RetryOnStatus: config.Retry.RetryOnStatus, + DisableRetry: retryDisabled, + EnableRetryOnTimeout: config.Retry.Enabled, + //RetryOnError: retryOnError, // should be used from esclient version 8 onwards + MaxRetries: maxRetries, + RetryBackoff: createElasticsearchBackoffFunc(&config.Retry), + + // configure sniffing + DiscoverNodesOnStart: config.Discovery.OnStart, + DiscoverNodesInterval: config.Discovery.Interval, + + // configure internal metrics reporting and logging + EnableMetrics: false, // TODO + EnableDebugLogger: false, // TODO + Logger: &esLogger, + }) +} + +func createElasticsearchBackoffFunc(config *RetrySettings) func(int) time.Duration { + if !config.Enabled { + return nil + } + + expBackoff := backoff.NewExponentialBackOff() + if config.InitialInterval > 0 { + expBackoff.InitialInterval = config.InitialInterval + } + if config.MaxInterval > 0 { + expBackoff.MaxInterval = config.MaxInterval + } + expBackoff.Reset() + + return func(attempts int) time.Duration { + if attempts == 1 { + expBackoff.Reset() + } + + return expBackoff.NextBackOff() + } +} diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index d092fb6545ed4..261368f0f0dd4 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -30,7 +30,7 @@ type elasticsearchExporter struct { dynamicIndex bool model mappingModel - bulkIndexer *esBulkIndexerCurrent + bulkIndexer *bulkIndexerPool } func newExporter( From 53129ba21577190381b31dc7e70728e2da33295f Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 17 Jul 2024 11:23:59 +0800 Subject: [PATCH 2/3] Remove pushDocuments function Unnecessary indirection. --- exporter/elasticsearchexporter/elasticsearch_bulk.go | 5 ----- exporter/elasticsearchexporter/exporter.go | 8 ++++---- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/exporter/elasticsearchexporter/elasticsearch_bulk.go b/exporter/elasticsearchexporter/elasticsearch_bulk.go index 9be1e69e28a67..b50362c78c46a 100644 --- a/exporter/elasticsearchexporter/elasticsearch_bulk.go +++ b/exporter/elasticsearchexporter/elasticsearch_bulk.go @@ -4,7 +4,6 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" import ( - "bytes" "context" "fmt" "io" @@ -18,10 +17,6 @@ import ( "go.uber.org/zap" ) -func pushDocuments(ctx context.Context, index string, document []byte, bulkIndexer *bulkIndexerPool) error { - return bulkIndexer.Add(ctx, index, bytes.NewReader(document)) -} - func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (*bulkIndexerPool, error) { numWorkers := config.NumWorkers if numWorkers == 0 { diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 261368f0f0dd4..696a5c785c6c5 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -4,6 +4,7 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" import ( + "bytes" "context" "errors" "fmt" @@ -133,7 +134,7 @@ func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcom if err != nil { return fmt.Errorf("failed to encode log event: %w", err) } - return pushDocuments(ctx, fIndex, document, e.bulkIndexer) + return e.bulkIndexer.Add(ctx, fIndex, bytes.NewReader(document)) } func (e *elasticsearchExporter) pushMetricsData( @@ -193,8 +194,7 @@ func (e *elasticsearchExporter) pushMetricsData( errs = append(errs, err) continue } - - if err := pushDocuments(ctx, fIndex, docBytes, e.bulkIndexer); err != nil { + if err := e.bulkIndexer.Add(ctx, fIndex, bytes.NewReader(docBytes)); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -274,5 +274,5 @@ func (e *elasticsearchExporter) pushTraceRecord(ctx context.Context, resource pc if err != nil { return fmt.Errorf("failed to encode trace record: %w", err) } - return pushDocuments(ctx, fIndex, document, e.bulkIndexer) + return e.bulkIndexer.Add(ctx, fIndex, bytes.NewReader(document)) } From 280e869f3197dc566127992ce324388c20f84728 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 17 Jul 2024 13:35:16 +0800 Subject: [PATCH 3/3] Create bulkIndexer/bulkIndexerSession abstractions This is in preparation for introducing a new synchronous bulk indexer that will integrate with the exporterhelper batch sender: in that setup, the batch sender will batch before getting to the exporter, and make concurrent requests to the exporter's Consume* methods. The new "session" abstraction is intended to enable memory pooling and to efficiently and incrementally add a batch of documents before synchronously flushing. --- .../{elasticsearch_bulk.go => bulkindexer.go} | 104 ++++++++++++++---- ...earch_bulk_test.go => bulkindexer_test.go} | 30 +++-- exporter/elasticsearchexporter/exporter.go | 66 +++++++++-- 3 files changed, 157 insertions(+), 43 deletions(-) rename exporter/elasticsearchexporter/{elasticsearch_bulk.go => bulkindexer.go} (59%) rename exporter/elasticsearchexporter/{elasticsearch_bulk_test.go => bulkindexer_test.go} (81%) diff --git a/exporter/elasticsearchexporter/elasticsearch_bulk.go b/exporter/elasticsearchexporter/bulkindexer.go similarity index 59% rename from exporter/elasticsearchexporter/elasticsearch_bulk.go rename to exporter/elasticsearchexporter/bulkindexer.go index b50362c78c46a..5276d234acd10 100644 --- a/exporter/elasticsearchexporter/elasticsearch_bulk.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -17,7 +17,45 @@ import ( "go.uber.org/zap" ) -func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (*bulkIndexerPool, error) { +type bulkIndexer interface { + // StartSession starts a new bulk indexing session. + StartSession(context.Context) (bulkIndexerSession, error) + + // Close closes the bulk indexer, ending any in-progress + // sessions and stopping any background processing. + Close(ctx context.Context) error +} + +type bulkIndexerSession interface { + // Add adds a document to the bulk indexing session. + Add(ctx context.Context, index string, document io.WriterTo) error + + // End must be called on the session object once it is no longer + // needed, in order to release any associated resources. + // + // Note that ending the session does _not_ implicitly flush + // documents. Call Flush before calling End as needed. + // + // Calling other methods (including End) after End may panic. + End() + + // Flush flushes any documents added to the bulk indexing session. + // + // The behavior of Flush depends on whether the bulk indexer is + // synchronous or asynchronous. Calling Flush on an asynchronous bulk + // indexer session is effectively a no-op; flushing will be done in + // the background. Calling Flush on a synchronous bulk indexer session + // will wait for bulk indexing of added documents to complete, + // successfully or not. + Flush(context.Context) error +} + +func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (bulkIndexer, error) { + // TODO: add support for synchronous bulk indexing, to integrate with the exporterhelper batch sender. + return newAsyncBulkIndexer(logger, client, config) +} + +func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (*asyncBulkIndexer, error) { numWorkers := config.NumWorkers if numWorkers == 0 { numWorkers = runtime.NumCPU() @@ -40,7 +78,7 @@ func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Co maxDocRetry = config.Retry.MaxRequests - 1 } - pool := &bulkIndexerPool{ + pool := &asyncBulkIndexer{ wg: sync.WaitGroup{}, items: make(chan docappender.BulkIndexerItem, config.NumWorkers), stats: bulkIndexerStats{}, @@ -57,7 +95,7 @@ func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Co if err != nil { return nil, err } - w := worker{ + w := asyncBulkIndexerWorker{ indexer: bi, items: pool.items, flushInterval: flushInterval, @@ -78,16 +116,41 @@ type bulkIndexerStats struct { docsIndexed atomic.Int64 } -type bulkIndexerPool struct { +type asyncBulkIndexer struct { items chan docappender.BulkIndexerItem wg sync.WaitGroup stats bulkIndexerStats } -// Add adds an item to the bulk indexer pool. +type asyncBulkIndexerSession struct { + *asyncBulkIndexer +} + +// StartSession returns a new asyncBulkIndexerSession. +func (a *asyncBulkIndexer) StartSession(context.Context) (bulkIndexerSession, error) { + return asyncBulkIndexerSession{a}, nil +} + +// Close closes the asyncBulkIndexer and any active sessions. +func (a *asyncBulkIndexer) Close(ctx context.Context) error { + close(a.items) + doneCh := make(chan struct{}) + go func() { + a.wg.Wait() + close(doneCh) + }() + select { + case <-ctx.Done(): + return ctx.Err() + case <-doneCh: + return nil + } +} + +// Add adds an item to the async bulk indexer session. // // Adding an item after a call to Close() will panic. -func (p *bulkIndexerPool) Add(ctx context.Context, index string, document io.WriterTo) error { +func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo) error { item := docappender.BulkIndexerItem{ Index: index, Body: document, @@ -95,28 +158,21 @@ func (p *bulkIndexerPool) Add(ctx context.Context, index string, document io.Wri select { case <-ctx.Done(): return ctx.Err() - case p.items <- item: + case s.items <- item: return nil } } -// Close closes the items channel and waits for the workers to drain it. -func (p *bulkIndexerPool) Close(ctx context.Context) error { - close(p.items) - doneCh := make(chan struct{}) - go func() { - p.wg.Wait() - close(doneCh) - }() - select { - case <-ctx.Done(): - return ctx.Err() - case <-doneCh: - return nil - } +// End is a no-op. +func (s asyncBulkIndexerSession) End() { +} + +// Flush is a no-op. +func (s asyncBulkIndexerSession) Flush(context.Context) error { + return nil } -type worker struct { +type asyncBulkIndexerWorker struct { indexer *docappender.BulkIndexer items <-chan docappender.BulkIndexerItem flushInterval time.Duration @@ -128,7 +184,7 @@ type worker struct { logger *zap.Logger } -func (w *worker) run() { +func (w *asyncBulkIndexerWorker) run() { flushTick := time.NewTicker(w.flushInterval) defer flushTick.Stop() for { @@ -157,7 +213,7 @@ func (w *worker) run() { } } -func (w *worker) flush() { +func (w *asyncBulkIndexerWorker) flush() { ctx := context.Background() if w.flushTimeout > 0 { var cancel context.CancelFunc diff --git a/exporter/elasticsearchexporter/elasticsearch_bulk_test.go b/exporter/elasticsearchexporter/bulkindexer_test.go similarity index 81% rename from exporter/elasticsearchexporter/elasticsearch_bulk_test.go rename to exporter/elasticsearchexporter/bulkindexer_test.go index 020d29fae623b..4082073c9c850 100644 --- a/exporter/elasticsearchexporter/elasticsearch_bulk_test.go +++ b/exporter/elasticsearchexporter/bulkindexer_test.go @@ -50,7 +50,7 @@ const successResp = `{ ] }` -func TestBulkIndexer_flushOnClose(t *testing.T) { +func TestAsyncBulkIndexer_flushOnClose(t *testing.T) { cfg := Config{NumWorkers: 1, Flush: FlushSettings{Interval: time.Hour, Bytes: 2 << 30}} client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{ RoundTripFunc: func(*http.Request) (*http.Response, error) { @@ -61,14 +61,18 @@ func TestBulkIndexer_flushOnClose(t *testing.T) { }, }}) require.NoError(t, err) - bulkIndexer, err := newBulkIndexer(zap.NewNop(), client, &cfg) + + bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &cfg) + require.NoError(t, err) + session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, bulkIndexer.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`))) + + assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`))) assert.NoError(t, bulkIndexer.Close(context.Background())) assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load()) } -func TestBulkIndexer_flush(t *testing.T) { +func TestAsyncBulkIndexer_flush(t *testing.T) { tests := []struct { name string config Config @@ -96,9 +100,13 @@ func TestBulkIndexer_flush(t *testing.T) { }, }}) require.NoError(t, err) - bulkIndexer, err := newBulkIndexer(zap.NewNop(), client, &tt.config) + + bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &tt.config) + require.NoError(t, err) + session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, bulkIndexer.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`))) + + assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`))) // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load()) @@ -107,7 +115,7 @@ func TestBulkIndexer_flush(t *testing.T) { } } -func TestBulkIndexer_flush_error(t *testing.T) { +func TestAsyncBulkIndexer_flush_error(t *testing.T) { tests := []struct { name string roundTripFunc func(*http.Request) (*http.Response, error) @@ -150,9 +158,13 @@ func TestBulkIndexer_flush_error(t *testing.T) { }}) require.NoError(t, err) core, observed := observer.New(zap.NewAtomicLevelAt(zapcore.DebugLevel)) - bulkIndexer, err := newBulkIndexer(zap.New(core), client, &cfg) + + bulkIndexer, err := newAsyncBulkIndexer(zap.New(core), client, &cfg) require.NoError(t, err) - assert.NoError(t, bulkIndexer.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`))) + session, err := bulkIndexer.StartSession(context.Background()) + require.NoError(t, err) + + assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`))) // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load()) diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 696a5c785c6c5..6384f38aba60a 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -31,7 +31,7 @@ type elasticsearchExporter struct { dynamicIndex bool model mappingModel - bulkIndexer *bulkIndexerPool + bulkIndexer bulkIndexer } func newExporter( @@ -90,8 +90,13 @@ func (e *elasticsearchExporter) Shutdown(ctx context.Context) error { } func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { - var errs []error + session, err := e.bulkIndexer.StartSession(ctx) + if err != nil { + return err + } + defer session.End() + var errs []error rls := ld.ResourceLogs() for i := 0; i < rls.Len(); i++ { rl := rls.At(i) @@ -102,7 +107,7 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) scope := ill.Scope() logs := ill.LogRecords() for k := 0; k < logs.Len(); k++ { - if err := e.pushLogRecord(ctx, resource, logs.At(k), scope); err != nil { + if err := e.pushLogRecord(ctx, resource, logs.At(k), scope, session); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -113,10 +118,22 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) } } + if err := session.Flush(ctx); err != nil { + if cerr := ctx.Err(); cerr != nil { + return cerr + } + errs = append(errs, err) + } return errors.Join(errs...) } -func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) error { +func (e *elasticsearchExporter) pushLogRecord( + ctx context.Context, + resource pcommon.Resource, + record plog.LogRecord, + scope pcommon.InstrumentationScope, + bulkIndexerSession bulkIndexerSession, +) error { fIndex := e.index if e.dynamicIndex { fIndex = routeLogRecord(record, scope, resource, fIndex) @@ -134,15 +151,20 @@ func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcom if err != nil { return fmt.Errorf("failed to encode log event: %w", err) } - return e.bulkIndexer.Add(ctx, fIndex, bytes.NewReader(document)) + return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document)) } func (e *elasticsearchExporter) pushMetricsData( ctx context.Context, metrics pmetric.Metrics, ) error { - var errs []error + session, err := e.bulkIndexer.StartSession(ctx) + if err != nil { + return err + } + defer session.End() + var errs []error resourceMetrics := metrics.ResourceMetrics() for i := 0; i < resourceMetrics.Len(); i++ { resourceMetric := resourceMetrics.At(i) @@ -194,7 +216,7 @@ func (e *elasticsearchExporter) pushMetricsData( errs = append(errs, err) continue } - if err := e.bulkIndexer.Add(ctx, fIndex, bytes.NewReader(docBytes)); err != nil { + if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes)); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -204,6 +226,12 @@ func (e *elasticsearchExporter) pushMetricsData( } } + if err := session.Flush(ctx); err != nil { + if cerr := ctx.Err(); cerr != nil { + return cerr + } + errs = append(errs, err) + } return errors.Join(errs...) } @@ -231,6 +259,12 @@ func (e *elasticsearchExporter) pushTraceData( ctx context.Context, td ptrace.Traces, ) error { + session, err := e.bulkIndexer.StartSession(ctx) + if err != nil { + return err + } + defer session.End() + var errs []error resourceSpans := td.ResourceSpans() for i := 0; i < resourceSpans.Len(); i++ { @@ -243,7 +277,7 @@ func (e *elasticsearchExporter) pushTraceData( spans := scopeSpan.Spans() for k := 0; k < spans.Len(); k++ { span := spans.At(k) - if err := e.pushTraceRecord(ctx, resource, span, scope); err != nil { + if err := e.pushTraceRecord(ctx, resource, span, scope, session); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -253,10 +287,22 @@ func (e *elasticsearchExporter) pushTraceData( } } + if err := session.Flush(ctx); err != nil { + if cerr := ctx.Err(); cerr != nil { + return cerr + } + errs = append(errs, err) + } return errors.Join(errs...) } -func (e *elasticsearchExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error { +func (e *elasticsearchExporter) pushTraceRecord( + ctx context.Context, + resource pcommon.Resource, + span ptrace.Span, + scope pcommon.InstrumentationScope, + bulkIndexerSession bulkIndexerSession, +) error { fIndex := e.index if e.dynamicIndex { fIndex = routeSpan(span, scope, resource, fIndex) @@ -274,5 +320,5 @@ func (e *elasticsearchExporter) pushTraceRecord(ctx context.Context, resource pc if err != nil { return fmt.Errorf("failed to encode trace record: %w", err) } - return e.bulkIndexer.Add(ctx, fIndex, bytes.NewReader(document)) + return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document)) }