Skip to content

Commit 97d5c19

Browse files
committed
Add traces
1 parent 7fdf150 commit 97d5c19

File tree

5 files changed

+95
-32
lines changed

5 files changed

+95
-32
lines changed

exporter/elasticsearchexporter/factory.go

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func NewFactory() exporter.Factory {
3535
metadata.Type,
3636
createDefaultConfig,
3737
exporter.WithLogs(createLogsRequestExporter, metadata.LogsStability),
38-
exporter.WithTraces(createTracesExporter, metadata.TracesStability),
38+
exporter.WithTraces(createTracesRequestExporter, metadata.TracesStability),
3939
)
4040
}
4141

@@ -144,7 +144,7 @@ func createLogsRequestExporter(
144144
)
145145
}
146146

147-
func createTracesExporter(ctx context.Context,
147+
func createTracesRequestExporter(ctx context.Context,
148148
set exporter.CreateSettings,
149149
cfg component.Config) (exporter.Traces, error) {
150150

@@ -156,13 +156,53 @@ func createTracesExporter(ctx context.Context,
156156
if err != nil {
157157
return nil, fmt.Errorf("cannot configure Elasticsearch tracesExporter: %w", err)
158158
}
159-
return exporterhelper.NewTracesExporter(
159+
160+
batchMergeFunc := func(ctx context.Context, r1, r2 exporterhelper.Request) (exporterhelper.Request, error) {
161+
rr1 := r1.(*request)
162+
rr2 := r2.(*request)
163+
req := newRequest(tracesExporter.bulkIndexer)
164+
req.Items = append(rr1.Items, rr2.Items...)
165+
return req, nil
166+
}
167+
168+
batchMergeSplitFunc := func(ctx context.Context, conf exporterbatcher.MaxSizeConfig, optReq, req exporterhelper.Request) ([]exporterhelper.Request, error) {
169+
// FIXME: implement merge split func
170+
panic("not implemented")
171+
return nil, nil
172+
}
173+
174+
marshalRequest := func(req exporterhelper.Request) ([]byte, error) {
175+
b, err := json.Marshal(*req.(*request))
176+
return b, err
177+
}
178+
179+
unmarshalRequest := func(b []byte) (exporterhelper.Request, error) {
180+
var req request
181+
err := json.Unmarshal(b, &req)
182+
req.bulkIndexer = tracesExporter.bulkIndexer
183+
return &req, err
184+
}
185+
186+
batcherCfg := exporterbatcher.NewDefaultConfig()
187+
188+
// FIXME: is this right?
189+
queueCfg := exporterqueue.NewDefaultConfig()
190+
queueCfg.Enabled = cf.QueueSettings.Enabled
191+
queueCfg.NumConsumers = cf.QueueSettings.NumConsumers
192+
queueCfg.QueueSize = cf.QueueSettings.QueueSize
193+
194+
return exporterhelper.NewTracesRequestExporter(
160195
ctx,
161196
set,
162-
cfg,
163-
tracesExporter.pushTraceData,
197+
tracesExporter.traceDataToRequest,
198+
exporterhelper.WithBatcher(batcherCfg, exporterhelper.WithRequestBatchFuncs(batchMergeFunc, batchMergeSplitFunc)),
164199
exporterhelper.WithShutdown(tracesExporter.Shutdown),
165-
exporterhelper.WithQueue(cf.QueueSettings))
200+
exporterhelper.WithRequestQueue(queueCfg,
201+
exporterqueue.NewPersistentQueueFactory[exporterhelper.Request](cf.QueueSettings.StorageID, exporterqueue.PersistentQueueSettings[exporterhelper.Request]{
202+
Marshaler: marshalRequest,
203+
Unmarshaler: unmarshalRequest,
204+
})),
205+
)
166206
}
167207

168208
// set default User-Agent header with BuildInfo if User-Agent is empty

exporter/elasticsearchexporter/logs_exporter.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,8 @@ func (e *elasticsearchLogsExporter) Shutdown(ctx context.Context) error {
7272
}
7373

7474
func (e *elasticsearchLogsExporter) logsDataToRequest(ctx context.Context, ld plog.Logs) (exporterhelper.Request, error) {
75-
request := newRequest(e.bulkIndexer)
76-
75+
req := newRequest(e.bulkIndexer)
7776
var errs []error
78-
7977
rls := ld.ResourceLogs()
8078
for i := 0; i < rls.Len(); i++ {
8179
rl := rls.At(i)
@@ -88,18 +86,18 @@ func (e *elasticsearchLogsExporter) logsDataToRequest(ctx context.Context, ld pl
8886
item, err := e.logRecordToItem(ctx, resource, logs.At(k), scope)
8987
if err != nil {
9088
if cerr := ctx.Err(); cerr != nil {
91-
return request, cerr
89+
return req, cerr
9290
}
9391

9492
errs = append(errs, err)
9593
continue
9694
}
97-
request.Add(item)
95+
req.Add(item)
9896
}
9997
}
10098
}
10199

102-
return request, errors.Join(errs...)
100+
return req, errors.Join(errs...)
103101
}
104102

105103
func (e *elasticsearchLogsExporter) logRecordToItem(ctx context.Context, resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) (bulkIndexerItem, error) {

exporter/elasticsearchexporter/logs_exporter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ func TestExporter_PushEvent(t *testing.T) {
407407

408408
exporter := newTestExporter(t, server.URL)
409409
err := send(t, exporter, `{"message": "test1"}`)
410-
assert.ErrorContains(t, err, "flush failed")
410+
assert.ErrorContains(t, err, "flush failed: [400 Bad Request] oops")
411411

412412
assert.Equal(t, int64(1), attempts.Load())
413413
})

exporter/elasticsearchexporter/trace_exporter.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"errors"
1111
"fmt"
12+
"go.opentelemetry.io/collector/exporter/exporterhelper"
1213
"time"
1314

1415
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -65,10 +66,11 @@ func (e *elasticsearchTracesExporter) Shutdown(ctx context.Context) error {
6566
return e.bulkIndexer.Close(ctx)
6667
}
6768

68-
func (e *elasticsearchTracesExporter) pushTraceData(
69+
func (e *elasticsearchTracesExporter) traceDataToRequest(
6970
ctx context.Context,
7071
td ptrace.Traces,
71-
) error {
72+
) (exporterhelper.Request, error) {
73+
req := newRequest(e.bulkIndexer)
7274
var errs []error
7375
resourceSpans := td.ResourceSpans()
7476
for i := 0; i < resourceSpans.Len(); i++ {
@@ -81,20 +83,23 @@ func (e *elasticsearchTracesExporter) pushTraceData(
8183
spans := scopeSpan.Spans()
8284
for k := 0; k < spans.Len(); k++ {
8385
span := spans.At(k)
84-
if err := e.pushTraceRecord(ctx, resource, span, scope); err != nil {
86+
item, err := e.traceRecordToItem(ctx, resource, span, scope)
87+
if err != nil {
8588
if cerr := ctx.Err(); cerr != nil {
86-
return cerr
89+
return req, cerr
8790
}
8891
errs = append(errs, err)
92+
continue
8993
}
94+
req.Add(item)
9095
}
9196
}
9297
}
9398

94-
return errors.Join(errs...)
99+
return req, errors.Join(errs...)
95100
}
96101

97-
func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error {
102+
func (e *elasticsearchTracesExporter) traceRecordToItem(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) (bulkIndexerItem, error) {
98103
fIndex := e.index
99104
if e.dynamicIndex {
100105
prefix := getFromAttributes(indexPrefix, resource, scope, span)
@@ -106,16 +111,19 @@ func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resou
106111
if e.logstashFormat.Enabled {
107112
formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now())
108113
if err != nil {
109-
return err
114+
return bulkIndexerItem{}, err
110115
}
111116
fIndex = formattedIndex
112117
}
113118

114119
document, err := e.model.encodeSpan(resource, span, scope)
115120
if err != nil {
116-
return fmt.Errorf("Failed to encode trace record: %w", err)
121+
return bulkIndexerItem{}, fmt.Errorf("Failed to encode trace record: %w", err)
117122
}
118-
return pushDocuments(ctx, fIndex, document, e.bulkIndexer)
123+
return bulkIndexerItem{
124+
Index: fIndex,
125+
Body: document,
126+
}, nil
119127
}
120128

121129
func pushDocuments(ctx context.Context, index string, document []byte, current *esBulkIndexerCurrent) error {

exporter/elasticsearchexporter/traces_exporter_test.go

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,6 @@ func TestExporter_PushTraceRecord(t *testing.T) {
330330
exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { *cfg = *testConfig })
331331
mustSendTraces(t, exporter, `{"message": "test1"}`)
332332

333-
time.Sleep(200 * time.Millisecond)
334333
assert.Equal(t, int64(1), attempts.Load())
335334
})
336335
}
@@ -346,9 +345,9 @@ func TestExporter_PushTraceRecord(t *testing.T) {
346345
})
347346

348347
exporter := newTestTracesExporter(t, server.URL)
349-
mustSendTraces(t, exporter, `{"message": "test1"}`)
348+
err := sendTraces(t, exporter, `{"message": "test1"}`)
349+
assert.ErrorContains(t, err, "flush failed: [400 Bad Request] oops")
350350

351-
time.Sleep(200 * time.Millisecond)
352351
assert.Equal(t, int64(1), attempts.Load())
353352
})
354353

@@ -382,7 +381,6 @@ func TestExporter_PushTraceRecord(t *testing.T) {
382381
exporter := newTestTracesExporter(t, server.URL)
383382
mustSendTraces(t, exporter, `{"message": "test1"}`)
384383

385-
time.Sleep(200 * time.Millisecond)
386384
assert.Equal(t, int64(1), attempts.Load())
387385
})
388386

@@ -420,9 +418,7 @@ func TestExporter_PushTraceRecord(t *testing.T) {
420418
cfg.Retry.InitialInterval = 1 * time.Millisecond
421419
cfg.Retry.MaxInterval = 10 * time.Millisecond
422420
})
423-
mustSendTraces(t, exporter, `{"message": "test1", "idx": 0}`)
424-
mustSendTraces(t, exporter, `{"message": "test2", "idx": 1}`)
425-
mustSendTraces(t, exporter, `{"message": "test3", "idx": 2}`)
421+
mustSendTraces(t, exporter, `{"message": "test1", "idx": 0}`, `{"message": "test2", "idx": 1}`, `{"message": "test3", "idx": 2}`)
426422

427423
wg.Wait() // <- this blocks forever if the trace is not retried
428424

@@ -462,8 +458,22 @@ func withTestTracesExporterConfig(fns ...func(*Config)) func(string) *Config {
462458
}
463459
}
464460

465-
func mustSendTraces(t *testing.T, exporter *elasticsearchTracesExporter, contents string) {
466-
err := pushDocuments(context.TODO(), exporter.index, []byte(contents), exporter.bulkIndexer)
461+
func sendTraces(t *testing.T, exporter *elasticsearchTracesExporter, contents ...string) error {
462+
req := request{
463+
bulkIndexer: exporter.bulkIndexer,
464+
Items: nil,
465+
}
466+
for _, body := range contents {
467+
req.Add(bulkIndexerItem{
468+
Index: exporter.index,
469+
Body: []byte(body),
470+
})
471+
}
472+
return req.Export(context.TODO())
473+
}
474+
475+
func mustSendTraces(t *testing.T, exporter *elasticsearchTracesExporter, contents ...string) {
476+
err := sendTraces(t, exporter, contents...)
467477
require.NoError(t, err)
468478
}
469479

@@ -474,6 +484,13 @@ func mustSendTracesWithAttributes(t *testing.T, exporter *elasticsearchTracesExp
474484
span := resSpans.ScopeSpans().At(0).Spans().At(0)
475485
scope := resSpans.ScopeSpans().At(0).Scope()
476486

477-
err := exporter.pushTraceRecord(context.TODO(), resSpans.Resource(), span, scope)
487+
req := request{
488+
bulkIndexer: exporter.bulkIndexer,
489+
Items: nil,
490+
}
491+
item, err := exporter.traceRecordToItem(context.TODO(), resSpans.Resource(), span, scope)
492+
require.NoError(t, err)
493+
req.Add(item)
494+
err = req.Export(context.TODO())
478495
require.NoError(t, err)
479496
}

0 commit comments

Comments
 (0)