Skip to content

Commit 0e6c727

Browse files
committed
[exporter/elasticsearch] support both v7 & v8
1 parent 09d4ae3 commit 0e6c727

File tree

10 files changed

+138
-28
lines changed

10 files changed

+138
-28
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Use go-elasticsearch/v8 by default, add config to fall back to v7
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [32454]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,21 @@ The Elasticsearch Exporter's own telemetry settings for testing and debugging pu
232232
- `log_request_body` (default=false): Logs Elasticsearch client request body as a field in a log line at DEBUG level. It requires `service::telemetry::logs::level` to be set to `debug`. WARNING: Enabling this config may expose sensitive data.
233233
- `log_response_body` (default=false): Logs Elasticsearch client response body as a field in a log line at DEBUG level. It requires `service::telemetry::logs::level` to be set to `debug`. WARNING: Enabling this config may expose sensitive data.
234234

235+
### Elasticsearch version compatibility
236+
237+
The Elasticsearch Exporter uses the [go-elasticsearch](https://github.com/elastic/go-elasticsearch)
238+
client for communicating with Elasticsearch, and has forward compatbility with Elasticsearch 8+ by
239+
default. It is possible to enable best-effort support for older Elasticsearch 7.x versions by
240+
setting the Elasticsearch exporter config `version` to `7`.
241+
242+
Certain features of the exporter, such as the `otel` mapping mode`, may require newer versions of
243+
Elasticsearch. In general it is recommended to use the exporter with the most recent supported,
244+
as this will have the most in-depth testing.
245+
246+
> [!NOTE]
247+
> See https://www.elastic.co/support/eol for Elasticsearch's End Of Life policy.
248+
> Versions prior to 7.17.x are no longer supported by Elastic.
249+
235250
## Exporting metrics
236251

237252
Metrics support is currently in development.

exporter/elasticsearchexporter/bulkindexer.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
"time"
1616

1717
"github.com/elastic/go-docappender/v2"
18-
"github.com/elastic/go-elasticsearch/v7"
18+
"github.com/elastic/go-elasticsearch/v8/esapi"
1919
"go.opentelemetry.io/collector/config/configcompression"
2020
"go.uber.org/zap"
2121
)
@@ -55,14 +55,14 @@ type bulkIndexerSession interface {
5555

5656
const defaultMaxRetries = 2
5757

58-
func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (bulkIndexer, error) {
58+
func newBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config) (bulkIndexer, error) {
5959
if config.Batcher.Enabled != nil {
6060
return newSyncBulkIndexer(logger, client, config), nil
6161
}
6262
return newAsyncBulkIndexer(logger, client, config)
6363
}
6464

65-
func bulkIndexerConfig(client *elasticsearch.Client, config *Config) docappender.BulkIndexerConfig {
65+
func bulkIndexerConfig(client esapi.Transport, config *Config) docappender.BulkIndexerConfig {
6666
var maxDocRetries int
6767
if config.Retry.Enabled {
6868
maxDocRetries = defaultMaxRetries
@@ -84,7 +84,7 @@ func bulkIndexerConfig(client *elasticsearch.Client, config *Config) docappender
8484
}
8585
}
8686

87-
func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) *syncBulkIndexer {
87+
func newSyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config) *syncBulkIndexer {
8888
return &syncBulkIndexer{
8989
config: bulkIndexerConfig(client, config),
9090
flushTimeout: config.Timeout,
@@ -175,7 +175,7 @@ func (s *syncBulkIndexerSession) Flush(ctx context.Context) error {
175175
}
176176
}
177177

178-
func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (*asyncBulkIndexer, error) {
178+
func newAsyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config) (*asyncBulkIndexer, error) {
179179
numWorkers := config.NumWorkers
180180
if numWorkers == 0 {
181181
numWorkers = runtime.NumCPU()

exporter/elasticsearchexporter/bulkindexer_test.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"testing"
1414
"time"
1515

16-
"github.com/elastic/go-elasticsearch/v7"
16+
"github.com/elastic/go-elasticsearch/v8"
1717
"github.com/stretchr/testify/assert"
1818
"github.com/stretchr/testify/require"
1919
"go.opentelemetry.io/collector/config/confighttp"
@@ -293,15 +293,11 @@ func TestAsyncBulkIndexer_logRoundTrip(t *testing.T) {
293293
runBulkIndexerOnce(t, &tt.config, client)
294294

295295
records := logObserver.AllUntimed()
296-
assert.Len(t, records, 2)
296+
require.Len(t, records, 1)
297297

298-
assert.Equal(t, "/", records[0].ContextMap()["path"])
299-
assert.Nil(t, records[0].ContextMap()["request_body"])
298+
assert.Equal(t, "/_bulk", records[0].ContextMap()["path"])
299+
assert.Equal(t, "{\"create\":{\"_index\":\"foo\"}}\n{\"foo\": \"bar\"}\n", records[0].ContextMap()["request_body"])
300300
assert.JSONEq(t, successResp, records[0].ContextMap()["response_body"].(string))
301-
302-
assert.Equal(t, "/_bulk", records[1].ContextMap()["path"])
303-
assert.Equal(t, "{\"create\":{\"_index\":\"foo\"}}\n{\"foo\": \"bar\"}\n", records[1].ContextMap()["request_body"])
304-
assert.JSONEq(t, successResp, records[1].ContextMap()["response_body"].(string))
305301
})
306302
}
307303
}
@@ -327,8 +323,9 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) {
327323
reqCnt.Add(1)
328324
}
329325
return &http.Response{
330-
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
331-
Body: io.NopCloser(strings.NewReader(successResp)),
326+
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
327+
Body: io.NopCloser(strings.NewReader(successResp)),
328+
StatusCode: http.StatusOK,
332329
}, nil
333330
},
334331
}})

exporter/elasticsearchexporter/config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ type Config struct {
7878
// If Batcher.Enabled is non-nil (i.e. batcher::enabled is specified),
7979
// then the Flush will be ignored even if Batcher.Enabled is false.
8080
Batcher BatcherConfig `mapstructure:"batcher"`
81+
82+
// Version holds the major version of Elasticsearch that the exporter
83+
// will target: 7 or 8.
84+
Version int `mapstructure:"version"`
8185
}
8286

8387
// BatcherConfig holds configuration for exporterbatcher.
@@ -279,6 +283,11 @@ func (cfg *Config) Validate() error {
279283
if cfg.Retry.MaxRetries < 0 {
280284
return errors.New("retry::max_retries should be non-negative")
281285
}
286+
switch cfg.Version {
287+
case 7, 8:
288+
default:
289+
return fmt.Errorf("version must be 7 or 8, got %d", cfg.Version)
290+
}
282291

283292
return nil
284293
}

exporter/elasticsearchexporter/config_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ func TestConfig(t *testing.T) {
120120
MaxSizeItems: 0,
121121
},
122122
},
123+
Version: 8,
123124
},
124125
},
125126
{
@@ -191,6 +192,7 @@ func TestConfig(t *testing.T) {
191192
MaxSizeItems: 0,
192193
},
193194
},
195+
Version: 8,
194196
},
195197
},
196198
{
@@ -262,6 +264,7 @@ func TestConfig(t *testing.T) {
262264
MaxSizeItems: 0,
263265
},
264266
},
267+
Version: 8,
265268
},
266269
},
267270
{

exporter/elasticsearchexporter/esclient.go

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,16 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry
55

66
import (
77
"context"
8+
"errors"
9+
"fmt"
810
"io"
911
"net/http"
1012
"time"
1113

1214
"github.com/cenkalti/backoff/v4"
13-
"github.com/elastic/go-elasticsearch/v7"
15+
elasticsearchv7 "github.com/elastic/go-elasticsearch/v7"
16+
elasticsearchv8 "github.com/elastic/go-elasticsearch/v8"
17+
"github.com/elastic/go-elasticsearch/v8/esapi"
1418
"github.com/klauspost/compress/gzip"
1519
"go.opentelemetry.io/collector/component"
1620
"go.uber.org/zap"
@@ -82,14 +86,14 @@ func (cl *clientLogger) ResponseBodyEnabled() bool {
8286
return cl.logResponseBody
8387
}
8488

85-
// newElasticsearchClient returns a new elasticsearch.Client
89+
// newElasticsearchClient returns a new esapi.Transport.
8690
func newElasticsearchClient(
8791
ctx context.Context,
8892
config *Config,
8993
host component.Host,
9094
telemetry component.TelemetrySettings,
9195
userAgent string,
92-
) (*elasticsearch.Client, error) {
96+
) (esapi.Transport, error) {
9397
httpClient, err := config.ClientConfig.ToClient(ctx, host, telemetry)
9498
if err != nil {
9599
return nil, err
@@ -105,19 +109,36 @@ func newElasticsearchClient(
105109
return nil, err
106110
}
107111

108-
esLogger := clientLogger{
112+
esLogger := &clientLogger{
109113
Logger: telemetry.Logger,
110114
logRequestBody: config.LogRequestBody,
111115
logResponseBody: config.LogResponseBody,
112116
}
113117

114-
maxRetries := defaultMaxRetries
115-
if config.Retry.MaxRetries != 0 {
116-
maxRetries = config.Retry.MaxRetries
118+
switch config.Version {
119+
case 7:
120+
return newElasticsearchClientV7(
121+
config, endpoints, headers,
122+
httpClient.Transport, esLogger,
123+
)
124+
case 8:
125+
return newElasticsearchClientV8(
126+
config, endpoints, headers,
127+
httpClient.Transport, esLogger,
128+
)
117129
}
130+
return nil, fmt.Errorf("unsupported version %d", config.Version)
131+
}
118132

119-
return elasticsearch.NewClient(elasticsearch.Config{
120-
Transport: httpClient.Transport,
133+
func newElasticsearchClientV7(
134+
config *Config,
135+
endpoints []string,
136+
headers http.Header,
137+
httpTransport http.RoundTripper,
138+
esLogger *clientLogger,
139+
) (esapi.Transport, error) {
140+
return elasticsearchv7.NewClient(elasticsearchv7.Config{
141+
Transport: httpTransport,
121142

122143
// configure connection setup
123144
Addresses: endpoints,
@@ -130,8 +151,44 @@ func newElasticsearchClient(
130151
RetryOnStatus: config.Retry.RetryOnStatus,
131152
DisableRetry: !config.Retry.Enabled,
132153
EnableRetryOnTimeout: config.Retry.Enabled,
133-
// RetryOnError: retryOnError, // should be used from esclient version 8 onwards
134-
MaxRetries: maxRetries,
154+
MaxRetries: min(defaultMaxRetries, config.Retry.MaxRetries),
155+
RetryBackoff: createElasticsearchBackoffFunc(&config.Retry),
156+
157+
// configure sniffing
158+
DiscoverNodesOnStart: config.Discovery.OnStart,
159+
DiscoverNodesInterval: config.Discovery.Interval,
160+
161+
// configure internal metrics reporting and logging
162+
EnableMetrics: false, // TODO
163+
EnableDebugLogger: false, // TODO
164+
Logger: esLogger,
165+
})
166+
}
167+
168+
func newElasticsearchClientV8(
169+
config *Config,
170+
endpoints []string,
171+
headers http.Header,
172+
httpTransport http.RoundTripper,
173+
esLogger *clientLogger,
174+
) (*elasticsearchv8.Client, error) {
175+
return elasticsearchv8.NewClient(elasticsearchv8.Config{
176+
Transport: httpTransport,
177+
178+
// configure connection setup
179+
Addresses: endpoints,
180+
Username: config.Authentication.User,
181+
Password: string(config.Authentication.Password),
182+
APIKey: string(config.Authentication.APIKey),
183+
Header: headers,
184+
185+
// configure retry behavior
186+
RetryOnStatus: config.Retry.RetryOnStatus,
187+
DisableRetry: !config.Retry.Enabled,
188+
RetryOnError: func(_ *http.Request, err error) bool {
189+
return !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded)
190+
},
191+
MaxRetries: min(defaultMaxRetries, config.Retry.MaxRetries),
135192
RetryBackoff: createElasticsearchBackoffFunc(&config.Retry),
136193

137194
// configure sniffing
@@ -141,7 +198,7 @@ func newElasticsearchClient(
141198
// configure internal metrics reporting and logging
142199
EnableMetrics: false, // TODO
143200
EnableDebugLogger: false, // TODO
144-
Logger: &esLogger,
201+
Logger: esLogger,
145202
})
146203
}
147204

exporter/elasticsearchexporter/exporter_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1738,6 +1738,7 @@ func TestExporterBatcher(t *testing.T) {
17381738
exporter := newUnstartedTestLogsExporter(t, "http://testing.invalid", func(cfg *Config) {
17391739
cfg.Batcher = BatcherConfig{Enabled: &batcherEnabled}
17401740
cfg.Auth = &configauth.Authentication{AuthenticatorID: testauthID}
1741+
cfg.Retry.Enabled = false
17411742
})
17421743
err := exporter.Start(context.Background(), &mockHost{
17431744
extensions: map[component.ID]component.Component{

exporter/elasticsearchexporter/factory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ func createDefaultConfig() component.Config {
9797
Bytes: 5e+6,
9898
Interval: 30 * time.Second,
9999
},
100+
Version: 8,
100101
}
101102
}
102103

exporter/elasticsearchexporter/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/cenkalti/backoff/v4 v4.3.0
77
github.com/elastic/go-docappender/v2 v2.3.3
88
github.com/elastic/go-elasticsearch/v7 v7.17.10
9+
github.com/elastic/go-elasticsearch/v8 v8.17.0
910
github.com/elastic/go-structform v0.0.12
1011
github.com/json-iterator/go v1.1.12
1112
github.com/klauspost/compress v1.17.11
@@ -35,7 +36,6 @@ require (
3536
github.com/armon/go-radix v1.0.0 // indirect
3637
github.com/davecgh/go-spew v1.1.1 // indirect
3738
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
38-
github.com/elastic/go-elasticsearch/v8 v8.17.0 // indirect
3939
github.com/elastic/go-sysinfo v1.7.1 // indirect
4040
github.com/elastic/go-windows v1.0.1 // indirect
4141
github.com/felixge/httpsnoop v1.0.4 // indirect

0 commit comments

Comments
 (0)