diff --git a/libbeat/outputs/otelconsumer/otelconsumer.go b/libbeat/outputs/otelconsumer/otelconsumer.go index cad11ab1442..ca5da5308e5 100644 --- a/libbeat/outputs/otelconsumer/otelconsumer.go +++ b/libbeat/outputs/otelconsumer/otelconsumer.go @@ -20,12 +20,14 @@ package otelconsumer import ( "context" "fmt" + "strings" "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "go.opentelemetry.io/collector/consumer" @@ -42,6 +44,7 @@ type otelConsumer struct { observer outputs.Observer logsConsumer consumer.Logs beatInfo beat.Info + log *logp.Logger } func makeOtelConsumer(_ outputs.IndexManager, beat beat.Info, observer outputs.Observer, cfg *config.C) (outputs.Group, error) { @@ -50,6 +53,7 @@ func makeOtelConsumer(_ outputs.IndexManager, beat beat.Info, observer outputs.O observer: observer, logsConsumer: beat.LogConsumer, beatInfo: beat, + log: logp.NewLogger("otelconsumer"), } ocConfig := defaultConfig() @@ -99,6 +103,30 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch) err := out.logsConsumer.ConsumeLogs(ctx, pLogs) if err != nil { + // If the batch is too large, the elasticsearchexporter will + // return a 413 error. + // + // At the moment, the exporter does not support batch splitting + // on error so we do it here. + // + // See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36163. + if strings.Contains(err.Error(), "Request Entity Too Large") { + // Try and split the batch into smaller batches and retry + if batch.SplitRetry() { + st.BatchSplit() + st.RetryableErrors(len(events)) + } else { + // If the batch could not be split, there is no option left but + // to drop it and log the error state. + batch.Drop() + st.PermanentErrors(len(events)) + out.log.Errorf("the batch is too large to be sent: %v", err) + } + + // Don't propagate the error, the batch was split and retried. + return nil + } + // Permanent errors shouldn't be retried. This tipically means // the data cannot be serialized by the exporter that is attached // to the pipeline or when the destination refuses the data because diff --git a/libbeat/outputs/otelconsumer/otelconsumer_test.go b/libbeat/outputs/otelconsumer/otelconsumer_test.go index a18bf77e6b8..2751ce7f721 100644 --- a/libbeat/outputs/otelconsumer/otelconsumer_test.go +++ b/libbeat/outputs/otelconsumer/otelconsumer_test.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/outest" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -46,12 +47,15 @@ func TestPublish(t *testing.T) { makeOtelConsumer := func(t *testing.T, consumeFn func(ctx context.Context, ld plog.Logs) error) *otelConsumer { t.Helper() + assert.NoError(t, logp.TestingSetup(logp.WithSelectors("otelconsumer"))) + logConsumer, err := consumer.NewLogs(consumeFn) assert.NoError(t, err) consumer := &otelConsumer{ observer: outputs.NewNilObserver(), logsConsumer: logConsumer, beatInfo: beat.Info{}, + log: logp.NewLogger("otelconsumer"), } return consumer } @@ -86,6 +90,33 @@ func TestPublish(t *testing.T) { assert.Equal(t, outest.BatchRetry, batch.Signals[0].Tag) }) + t.Run("split batch on entity too large error", func(t *testing.T) { + batch := outest.NewBatch(event1, event2, event3) + + otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { + return errors.New("Request Entity Too Large") + }) + + err := otelConsumer.Publish(ctx, batch) + assert.NoError(t, err) + assert.Len(t, batch.Signals, 1) + assert.Equal(t, outest.BatchSplitRetry, batch.Signals[0].Tag) + }) + + t.Run("drop batch if can't split on entity too large error", func(t *testing.T) { + batch := outest.NewBatch(event1) + + otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { + return errors.New("Request Entity Too Large") + }) + + err := otelConsumer.Publish(ctx, batch) + assert.NoError(t, err) + assert.Len(t, batch.Signals, 2) + assert.Equal(t, outest.BatchSplitRetry, batch.Signals[0].Tag) + assert.Equal(t, outest.BatchDrop, batch.Signals[1].Tag) + }) + t.Run("drop batch on permanent consumer error", func(t *testing.T) { batch := outest.NewBatch(event1, event2, event3)