From 9c34e4e0e8002d0698442ce5e424f345e544d31b Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Fri, 8 Nov 2024 09:07:43 -0300 Subject: [PATCH] otelconsumer: handle entity too large errors (#41523) * otelconsumer: handle entity too large errors The elasticsearchexporter does not handle a 413 Request Entity too Large from Elasticsearch, only forwarding the error back to the client. When using the batcher config in the ES exporter, it runs synchronously, any error reported can be intercepted in the otelconsumer. When using the batch processor this happens asynchronously and there is no way to handle the error. If we can intercept an entity too large error, split the batch and retry. * use logp.TestingSetup --- libbeat/outputs/otelconsumer/otelconsumer.go | 28 +++++++++++++++++ .../outputs/otelconsumer/otelconsumer_test.go | 31 +++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/libbeat/outputs/otelconsumer/otelconsumer.go b/libbeat/outputs/otelconsumer/otelconsumer.go index cad11ab1442..ca5da5308e5 100644 --- a/libbeat/outputs/otelconsumer/otelconsumer.go +++ b/libbeat/outputs/otelconsumer/otelconsumer.go @@ -20,12 +20,14 @@ package otelconsumer import ( "context" "fmt" + "strings" "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "go.opentelemetry.io/collector/consumer" @@ -42,6 +44,7 @@ type otelConsumer struct { observer outputs.Observer logsConsumer consumer.Logs beatInfo beat.Info + log *logp.Logger } func makeOtelConsumer(_ outputs.IndexManager, beat beat.Info, observer outputs.Observer, cfg *config.C) (outputs.Group, error) { @@ -50,6 +53,7 @@ func makeOtelConsumer(_ outputs.IndexManager, beat beat.Info, observer outputs.O observer: observer, logsConsumer: beat.LogConsumer, beatInfo: beat, + log: logp.NewLogger("otelconsumer"), } ocConfig := defaultConfig() @@ -99,6 +103,30 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch) err := out.logsConsumer.ConsumeLogs(ctx, pLogs) if err != nil { + // If the batch is too large, the elasticsearchexporter will + // return a 413 error. + // + // At the moment, the exporter does not support batch splitting + // on error so we do it here. + // + // See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36163. + if strings.Contains(err.Error(), "Request Entity Too Large") { + // Try and split the batch into smaller batches and retry + if batch.SplitRetry() { + st.BatchSplit() + st.RetryableErrors(len(events)) + } else { + // If the batch could not be split, there is no option left but + // to drop it and log the error state. + batch.Drop() + st.PermanentErrors(len(events)) + out.log.Errorf("the batch is too large to be sent: %v", err) + } + + // Don't propagate the error, the batch was split and retried. + return nil + } + // Permanent errors shouldn't be retried. This tipically means // the data cannot be serialized by the exporter that is attached // to the pipeline or when the destination refuses the data because diff --git a/libbeat/outputs/otelconsumer/otelconsumer_test.go b/libbeat/outputs/otelconsumer/otelconsumer_test.go index a18bf77e6b8..2751ce7f721 100644 --- a/libbeat/outputs/otelconsumer/otelconsumer_test.go +++ b/libbeat/outputs/otelconsumer/otelconsumer_test.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/outest" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -46,12 +47,15 @@ func TestPublish(t *testing.T) { makeOtelConsumer := func(t *testing.T, consumeFn func(ctx context.Context, ld plog.Logs) error) *otelConsumer { t.Helper() + assert.NoError(t, logp.TestingSetup(logp.WithSelectors("otelconsumer"))) + logConsumer, err := consumer.NewLogs(consumeFn) assert.NoError(t, err) consumer := &otelConsumer{ observer: outputs.NewNilObserver(), logsConsumer: logConsumer, beatInfo: beat.Info{}, + log: logp.NewLogger("otelconsumer"), } return consumer } @@ -86,6 +90,33 @@ func TestPublish(t *testing.T) { assert.Equal(t, outest.BatchRetry, batch.Signals[0].Tag) }) + t.Run("split batch on entity too large error", func(t *testing.T) { + batch := outest.NewBatch(event1, event2, event3) + + otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { + return errors.New("Request Entity Too Large") + }) + + err := otelConsumer.Publish(ctx, batch) + assert.NoError(t, err) + assert.Len(t, batch.Signals, 1) + assert.Equal(t, outest.BatchSplitRetry, batch.Signals[0].Tag) + }) + + t.Run("drop batch if can't split on entity too large error", func(t *testing.T) { + batch := outest.NewBatch(event1) + + otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { + return errors.New("Request Entity Too Large") + }) + + err := otelConsumer.Publish(ctx, batch) + assert.NoError(t, err) + assert.Len(t, batch.Signals, 2) + assert.Equal(t, outest.BatchSplitRetry, batch.Signals[0].Tag) + assert.Equal(t, outest.BatchDrop, batch.Signals[1].Tag) + }) + t.Run("drop batch on permanent consumer error", func(t *testing.T) { batch := outest.NewBatch(event1, event2, event3)