Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions libbeat/otelbeat/beatconverter/beatconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ exporters:
max_retries: 3
user: elastic
max_conns_per_host: 1
logs_dynamic_pipeline:
enabled: true
sending_queue:
batch:
flush_timeout: 10s
Expand Down Expand Up @@ -208,6 +210,8 @@ exporters:
max_interval: 1m0s
max_retries: 3
user: elastic-cloud
logs_dynamic_pipeline:
enabled: true
max_conns_per_host: 1
sending_queue:
batch:
Expand Down Expand Up @@ -431,6 +435,8 @@ exporters:
elasticsearch:
endpoints:
- http://localhost:9200
logs_dynamic_pipeline:
enabled: true
retry:
enabled: true
initial_interval: 1s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
escfg := defaultOptions

// check for unsupported config
err := checkUnsupportedConfig(output, logger)
err := checkUnsupportedConfig(output)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -134,6 +134,9 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
"mapping": map[string]any{
"mode": "bodymap",
},
"logs_dynamic_pipeline": map[string]any{
"enabled": true,
},
}

// Compression
Expand Down Expand Up @@ -163,7 +166,7 @@ func ToOTelConfig(output *config.C, logger *logp.Logger) (map[string]any, error)
}

// log warning for unsupported config
func checkUnsupportedConfig(cfg *config.C, logger *logp.Logger) error {
func checkUnsupportedConfig(cfg *config.C) error {
if cfg.HasField("indices") {
return fmt.Errorf("indices is currently not supported: %w", errors.ErrUnsupported)
} else if cfg.HasField("pipelines") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ logs_index: some-index
max_conns_per_host: 1
password: changeme
pipeline: some-ingest-pipeline
logs_dynamic_pipeline:
enabled: true
retry:
enabled: true
initial_interval: 42s
Expand Down Expand Up @@ -109,6 +111,8 @@ api_key: "TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA"
endpoints:
- http://localhost:9200
logs_index: some-index
logs_dynamic_pipeline:
enabled: true
retry:
enabled: true
initial_interval: 1s
Expand Down Expand Up @@ -155,6 +159,8 @@ preset: %s
`

commonOTelCfg := `
logs_dynamic_pipeline:
enabled: true
endpoints:
- http://localhost:9200
retry:
Expand Down Expand Up @@ -221,6 +227,8 @@ retry:
max_interval: 5m0s
max_retries: 3
logs_index: some-index
logs_dynamic_pipeline:
enabled: true
password: changeme
user: elastic
max_conns_per_host: 1
Expand Down Expand Up @@ -315,6 +323,8 @@ retry:
initial_interval: 1s
max_interval: 1m0s
max_retries: 3
logs_dynamic_pipeline:
enabled: true
max_conns_per_host: 1
user: elastic
sending_queue:
Expand Down
9 changes: 8 additions & 1 deletion x-pack/libbeat/outputs/otelconsumer/otelconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,14 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)
}
}

beatEvent := event.Content.Fields
// if pipeline field is set on event metadata
if pipeline, err := event.Content.Meta.GetValue("pipeline"); err == nil {
if s, ok := pipeline.(string); ok {
logRecord.Attributes().PutStr("elasticsearch.ingest_pipeline", s)
}
}

beatEvent := event.Content.Fields.Clone()
if beatEvent == nil {
beatEvent = mapstr.M{}
}
Expand Down
34 changes: 34 additions & 0 deletions x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,40 @@ func TestPublish(t *testing.T) {
}
})

t.Run("elasticsearch.ingest_pipeline fields are set on logrecord.Attribute", func(t *testing.T) {
event1.Meta = mapstr.M{}
event1.Meta["pipeline"] = "error_pipeline"

batch := outest.NewBatch(event1)

var countLogs int
var attributes pcommon.Map
otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
countLogs = countLogs + ld.LogRecordCount()
for i := 0; i < ld.ResourceLogs().Len(); i++ {
resourceLog := ld.ResourceLogs().At(i)
for j := 0; j < resourceLog.ScopeLogs().Len(); j++ {
scopeLog := resourceLog.ScopeLogs().At(j)
for k := 0; k < scopeLog.LogRecords().Len(); k++ {
LogRecord := scopeLog.LogRecords().At(k)
attributes = LogRecord.Attributes()
}
}
}
return nil
})

err := otelConsumer.Publish(ctx, batch)
assert.NoError(t, err)
assert.Len(t, batch.Signals, 1)
assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag)

dynamicAttributeKey := "elasticsearch.ingest_pipeline"
gotValue, ok := attributes.Get(dynamicAttributeKey)
require.True(t, ok, "dynamic pipeline attribute was not set")
assert.EqualValues(t, "error_pipeline", gotValue.AsString())
})

t.Run("retries the batch on non-permanent consumer error", func(t *testing.T) {
batch := outest.NewBatch(event1, event2, event3)

Expand Down