Skip to content

Commit c5fd34c

Browse files
committed
Fix es bulk test
1 parent bc692f0 commit c5fd34c

File tree

1 file changed

+55
-50
lines changed

1 file changed

+55
-50
lines changed

exporter/elasticsearchexporter/elasticsearch_bulk_test.go

Lines changed: 55 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ import (
1616
"github.com/stretchr/testify/assert"
1717
"github.com/stretchr/testify/require"
1818
"go.uber.org/zap"
19-
"go.uber.org/zap/zapcore"
20-
"go.uber.org/zap/zaptest/observer"
2119
)
2220

2321
var defaultRoundTripFunc = func(*http.Request) (*http.Response, error) {
@@ -50,7 +48,7 @@ const successResp = `{
5048
]
5149
}`
5250

53-
func TestBulkIndexer_flushOnClose(t *testing.T) {
51+
func TestBulkIndexer_addBatchAndFlush(t *testing.T) {
5452
cfg := Config{NumWorkers: 1, Flush: FlushSettings{Interval: time.Hour, Bytes: 2 << 30}}
5553
client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{
5654
RoundTripFunc: func(*http.Request) (*http.Response, error) {
@@ -63,51 +61,56 @@ func TestBulkIndexer_flushOnClose(t *testing.T) {
6361
require.NoError(t, err)
6462
bulkIndexer, err := newBulkIndexer(zap.NewNop(), client, &cfg)
6563
require.NoError(t, err)
66-
assert.NoError(t, bulkIndexer.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
67-
assert.NoError(t, bulkIndexer.Close(context.Background()))
64+
assert.NoError(t, bulkIndexer.AddBatchAndFlush(context.Background(),
65+
[]esBulkIndexerItem{
66+
{
67+
Index: "foo",
68+
Body: strings.NewReader(`{"foo": "bar"}`),
69+
},
70+
}))
6871
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
6972
}
7073

71-
func TestBulkIndexer_flush(t *testing.T) {
72-
tests := []struct {
73-
name string
74-
config Config
75-
}{
76-
{
77-
name: "flush.bytes",
78-
config: Config{NumWorkers: 1, Flush: FlushSettings{Interval: time.Hour, Bytes: 1}},
79-
},
80-
{
81-
name: "flush.interval",
82-
config: Config{NumWorkers: 1, Flush: FlushSettings{Interval: 50 * time.Millisecond, Bytes: 2 << 30}},
83-
},
84-
}
85-
86-
for _, tt := range tests {
87-
tt := tt
88-
t.Run(tt.name, func(t *testing.T) {
89-
t.Parallel()
90-
client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{
91-
RoundTripFunc: func(*http.Request) (*http.Response, error) {
92-
return &http.Response{
93-
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
94-
Body: io.NopCloser(strings.NewReader(successResp)),
95-
}, nil
96-
},
97-
}})
98-
require.NoError(t, err)
99-
bulkIndexer, err := newBulkIndexer(zap.NewNop(), client, &tt.config)
100-
require.NoError(t, err)
101-
assert.NoError(t, bulkIndexer.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
102-
// should flush
103-
time.Sleep(100 * time.Millisecond)
104-
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
105-
assert.NoError(t, bulkIndexer.Close(context.Background()))
106-
})
107-
}
108-
}
74+
//func TestBulkIndexer_flush(t *testing.T) {
75+
// tests := []struct {
76+
// name string
77+
// config Config
78+
// }{
79+
// {
80+
// name: "flush.bytes",
81+
// config: Config{NumWorkers: 1, Flush: FlushSettings{Interval: time.Hour, Bytes: 1}},
82+
// },
83+
// {
84+
// name: "flush.interval",
85+
// config: Config{NumWorkers: 1, Flush: FlushSettings{Interval: 50 * time.Millisecond, Bytes: 2 << 30}},
86+
// },
87+
// }
88+
//
89+
// for _, tt := range tests {
90+
// tt := tt
91+
// t.Run(tt.name, func(t *testing.T) {
92+
// t.Parallel()
93+
// client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{
94+
// RoundTripFunc: func(*http.Request) (*http.Response, error) {
95+
// return &http.Response{
96+
// Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
97+
// Body: io.NopCloser(strings.NewReader(successResp)),
98+
// }, nil
99+
// },
100+
// }})
101+
// require.NoError(t, err)
102+
// bulkIndexer, err := newBulkIndexer(zap.NewNop(), client, &tt.config)
103+
// require.NoError(t, err)
104+
// assert.NoError(t, bulkIndexer.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
105+
// // should flush
106+
// time.Sleep(100 * time.Millisecond)
107+
// assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
108+
// assert.NoError(t, bulkIndexer.Close(context.Background()))
109+
// })
110+
// }
111+
//}
109112

110-
func TestBulkIndexer_flush_error(t *testing.T) {
113+
func TestBulkIndexer_addBatchAndFlush_error(t *testing.T) {
111114
tests := []struct {
112115
name string
113116
roundTripFunc func(*http.Request) (*http.Response, error)
@@ -149,15 +152,17 @@ func TestBulkIndexer_flush_error(t *testing.T) {
149152
RoundTripFunc: tt.roundTripFunc,
150153
}})
151154
require.NoError(t, err)
152-
core, observed := observer.New(zap.NewAtomicLevelAt(zapcore.DebugLevel))
153-
bulkIndexer, err := newBulkIndexer(zap.New(core), client, &cfg)
155+
bulkIndexer, err := newBulkIndexer(zap.NewNop(), client, &cfg)
154156
require.NoError(t, err)
155-
assert.NoError(t, bulkIndexer.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
156-
// should flush
157-
time.Sleep(100 * time.Millisecond)
157+
assert.ErrorContains(t, bulkIndexer.AddBatchAndFlush(context.Background(),
158+
[]esBulkIndexerItem{
159+
{
160+
Index: "foo",
161+
Body: strings.NewReader(`{"foo": "bar"}`),
162+
},
163+
}), "failed to execute the request")
158164
assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load())
159165
assert.NoError(t, bulkIndexer.Close(context.Background()))
160-
assert.Equal(t, 1, observed.FilterMessage("bulk indexer flush error").Len())
161166
})
162167
}
163168
}

0 commit comments

Comments
 (0)