Skip to content

Commit 12d41f4

Browse files
authored
[chore] [exporter/elasticsearch] refactor bulkindexer to prepare for batch sender (#34127)
**Description:** Refactor the Elasticsearch bulk indexer code to create an abstraction around the existing buffering, asynchronous bulk indexer. This is preparation for supporting two implementations of bulk indexing: the existing asynchronous one, and a new synchronous one that works well with exporterhelper's batch sender -- see #32632. **Link to tracking Issue:** #32377 **Testing:** N/A, this is a non-functional change. **Documentation:** N/A, pure refactoring.
1 parent 68c59f4 commit 12d41f4

File tree

4 files changed

+314
-198
lines changed

4 files changed

+314
-198
lines changed
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"io"
10+
"runtime"
11+
"sync"
12+
"sync/atomic"
13+
"time"
14+
15+
"github.com/elastic/go-docappender/v2"
16+
"github.com/elastic/go-elasticsearch/v7"
17+
"go.uber.org/zap"
18+
)
19+
20+
type bulkIndexer interface {
21+
// StartSession starts a new bulk indexing session.
22+
StartSession(context.Context) (bulkIndexerSession, error)
23+
24+
// Close closes the bulk indexer, ending any in-progress
25+
// sessions and stopping any background processing.
26+
Close(ctx context.Context) error
27+
}
28+
29+
type bulkIndexerSession interface {
30+
// Add adds a document to the bulk indexing session.
31+
Add(ctx context.Context, index string, document io.WriterTo) error
32+
33+
// End must be called on the session object once it is no longer
34+
// needed, in order to release any associated resources.
35+
//
36+
// Note that ending the session does _not_ implicitly flush
37+
// documents. Call Flush before calling End as needed.
38+
//
39+
// Calling other methods (including End) after End may panic.
40+
End()
41+
42+
// Flush flushes any documents added to the bulk indexing session.
43+
//
44+
// The behavior of Flush depends on whether the bulk indexer is
45+
// synchronous or asynchronous. Calling Flush on an asynchronous bulk
46+
// indexer session is effectively a no-op; flushing will be done in
47+
// the background. Calling Flush on a synchronous bulk indexer session
48+
// will wait for bulk indexing of added documents to complete,
49+
// successfully or not.
50+
Flush(context.Context) error
51+
}
52+
53+
func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (bulkIndexer, error) {
54+
// TODO: add support for synchronous bulk indexing, to integrate with the exporterhelper batch sender.
55+
return newAsyncBulkIndexer(logger, client, config)
56+
}
57+
58+
func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (*asyncBulkIndexer, error) {
59+
numWorkers := config.NumWorkers
60+
if numWorkers == 0 {
61+
numWorkers = runtime.NumCPU()
62+
}
63+
64+
flushInterval := config.Flush.Interval
65+
if flushInterval == 0 {
66+
flushInterval = 30 * time.Second
67+
}
68+
69+
flushBytes := config.Flush.Bytes
70+
if flushBytes == 0 {
71+
flushBytes = 5e+6
72+
}
73+
74+
var maxDocRetry int
75+
if config.Retry.Enabled {
76+
// max_requests includes initial attempt
77+
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32344
78+
maxDocRetry = config.Retry.MaxRequests - 1
79+
}
80+
81+
pool := &asyncBulkIndexer{
82+
wg: sync.WaitGroup{},
83+
items: make(chan docappender.BulkIndexerItem, config.NumWorkers),
84+
stats: bulkIndexerStats{},
85+
}
86+
pool.wg.Add(numWorkers)
87+
88+
for i := 0; i < numWorkers; i++ {
89+
bi, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{
90+
Client: client,
91+
MaxDocumentRetries: maxDocRetry,
92+
Pipeline: config.Pipeline,
93+
RetryOnDocumentStatus: config.Retry.RetryOnStatus,
94+
})
95+
if err != nil {
96+
return nil, err
97+
}
98+
w := asyncBulkIndexerWorker{
99+
indexer: bi,
100+
items: pool.items,
101+
flushInterval: flushInterval,
102+
flushTimeout: config.Timeout,
103+
flushBytes: flushBytes,
104+
logger: logger,
105+
stats: &pool.stats,
106+
}
107+
go func() {
108+
defer pool.wg.Done()
109+
w.run()
110+
}()
111+
}
112+
return pool, nil
113+
}
114+
115+
type bulkIndexerStats struct {
116+
docsIndexed atomic.Int64
117+
}
118+
119+
type asyncBulkIndexer struct {
120+
items chan docappender.BulkIndexerItem
121+
wg sync.WaitGroup
122+
stats bulkIndexerStats
123+
}
124+
125+
type asyncBulkIndexerSession struct {
126+
*asyncBulkIndexer
127+
}
128+
129+
// StartSession returns a new asyncBulkIndexerSession.
130+
func (a *asyncBulkIndexer) StartSession(context.Context) (bulkIndexerSession, error) {
131+
return asyncBulkIndexerSession{a}, nil
132+
}
133+
134+
// Close closes the asyncBulkIndexer and any active sessions.
135+
func (a *asyncBulkIndexer) Close(ctx context.Context) error {
136+
close(a.items)
137+
doneCh := make(chan struct{})
138+
go func() {
139+
a.wg.Wait()
140+
close(doneCh)
141+
}()
142+
select {
143+
case <-ctx.Done():
144+
return ctx.Err()
145+
case <-doneCh:
146+
return nil
147+
}
148+
}
149+
150+
// Add adds an item to the async bulk indexer session.
151+
//
152+
// Adding an item after a call to Close() will panic.
153+
func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo) error {
154+
item := docappender.BulkIndexerItem{
155+
Index: index,
156+
Body: document,
157+
}
158+
select {
159+
case <-ctx.Done():
160+
return ctx.Err()
161+
case s.items <- item:
162+
return nil
163+
}
164+
}
165+
166+
// End is a no-op.
167+
func (s asyncBulkIndexerSession) End() {
168+
}
169+
170+
// Flush is a no-op.
171+
func (s asyncBulkIndexerSession) Flush(context.Context) error {
172+
return nil
173+
}
174+
175+
type asyncBulkIndexerWorker struct {
176+
indexer *docappender.BulkIndexer
177+
items <-chan docappender.BulkIndexerItem
178+
flushInterval time.Duration
179+
flushTimeout time.Duration
180+
flushBytes int
181+
182+
stats *bulkIndexerStats
183+
184+
logger *zap.Logger
185+
}
186+
187+
func (w *asyncBulkIndexerWorker) run() {
188+
flushTick := time.NewTicker(w.flushInterval)
189+
defer flushTick.Stop()
190+
for {
191+
select {
192+
case item, ok := <-w.items:
193+
// if channel is closed, flush and return
194+
if !ok {
195+
w.flush()
196+
return
197+
}
198+
199+
if err := w.indexer.Add(item); err != nil {
200+
w.logger.Error("error adding item to bulk indexer", zap.Error(err))
201+
}
202+
203+
// w.indexer.Len() can be either compressed or uncompressed bytes
204+
if w.indexer.Len() >= w.flushBytes {
205+
w.flush()
206+
flushTick.Reset(w.flushInterval)
207+
}
208+
case <-flushTick.C:
209+
// bulk indexer needs to be flushed every flush interval because
210+
// there may be pending bytes in bulk indexer buffer due to e.g. document level 429
211+
w.flush()
212+
}
213+
}
214+
}
215+
216+
func (w *asyncBulkIndexerWorker) flush() {
217+
ctx := context.Background()
218+
if w.flushTimeout > 0 {
219+
var cancel context.CancelFunc
220+
ctx, cancel = context.WithTimeout(context.Background(), w.flushTimeout)
221+
defer cancel()
222+
}
223+
stat, err := w.indexer.Flush(ctx)
224+
w.stats.docsIndexed.Add(stat.Indexed)
225+
if err != nil {
226+
w.logger.Error("bulk indexer flush error", zap.Error(err))
227+
}
228+
for _, resp := range stat.FailedDocs {
229+
w.logger.Error(fmt.Sprintf("Drop docs: failed to index: %#v", resp.Error),
230+
zap.Int("status", resp.Status))
231+
}
232+
}

exporter/elasticsearchexporter/elasticsearch_bulk_test.go renamed to exporter/elasticsearchexporter/bulkindexer_test.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ const successResp = `{
5050
]
5151
}`
5252

53-
func TestBulkIndexer_flushOnClose(t *testing.T) {
53+
func TestAsyncBulkIndexer_flushOnClose(t *testing.T) {
5454
cfg := Config{NumWorkers: 1, Flush: FlushSettings{Interval: time.Hour, Bytes: 2 << 30}}
5555
client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{
5656
RoundTripFunc: func(*http.Request) (*http.Response, error) {
@@ -61,14 +61,18 @@ func TestBulkIndexer_flushOnClose(t *testing.T) {
6161
},
6262
}})
6363
require.NoError(t, err)
64-
bulkIndexer, err := newBulkIndexer(zap.NewNop(), client, &cfg)
64+
65+
bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &cfg)
66+
require.NoError(t, err)
67+
session, err := bulkIndexer.StartSession(context.Background())
6568
require.NoError(t, err)
66-
assert.NoError(t, bulkIndexer.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
69+
70+
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
6771
assert.NoError(t, bulkIndexer.Close(context.Background()))
6872
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
6973
}
7074

71-
func TestBulkIndexer_flush(t *testing.T) {
75+
func TestAsyncBulkIndexer_flush(t *testing.T) {
7276
tests := []struct {
7377
name string
7478
config Config
@@ -96,9 +100,13 @@ func TestBulkIndexer_flush(t *testing.T) {
96100
},
97101
}})
98102
require.NoError(t, err)
99-
bulkIndexer, err := newBulkIndexer(zap.NewNop(), client, &tt.config)
103+
104+
bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &tt.config)
105+
require.NoError(t, err)
106+
session, err := bulkIndexer.StartSession(context.Background())
100107
require.NoError(t, err)
101-
assert.NoError(t, bulkIndexer.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
108+
109+
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
102110
// should flush
103111
time.Sleep(100 * time.Millisecond)
104112
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
@@ -107,7 +115,7 @@ func TestBulkIndexer_flush(t *testing.T) {
107115
}
108116
}
109117

110-
func TestBulkIndexer_flush_error(t *testing.T) {
118+
func TestAsyncBulkIndexer_flush_error(t *testing.T) {
111119
tests := []struct {
112120
name string
113121
roundTripFunc func(*http.Request) (*http.Response, error)
@@ -150,9 +158,13 @@ func TestBulkIndexer_flush_error(t *testing.T) {
150158
}})
151159
require.NoError(t, err)
152160
core, observed := observer.New(zap.NewAtomicLevelAt(zapcore.DebugLevel))
153-
bulkIndexer, err := newBulkIndexer(zap.New(core), client, &cfg)
161+
162+
bulkIndexer, err := newAsyncBulkIndexer(zap.New(core), client, &cfg)
154163
require.NoError(t, err)
155-
assert.NoError(t, bulkIndexer.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
164+
session, err := bulkIndexer.StartSession(context.Background())
165+
require.NoError(t, err)
166+
167+
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
156168
// should flush
157169
time.Sleep(100 * time.Millisecond)
158170
assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load())

0 commit comments

Comments
 (0)