Skip to content

Commit

Permalink
otelconsumer: handle entity too large errors (#41523)
Browse files Browse the repository at this point in the history
* otelconsumer: handle entity too large errors

The elasticsearchexporter does not handle a 413 Request Entity too Large from
Elasticsearch, only forwarding the error back to the client.

When using the batcher config in the ES exporter, it runs synchronously, any
error reported can be intercepted in the otelconsumer. When using the batch
processor this happens asynchronously and there is no way to handle the
error. If we can intercept an entity too large error, split the batch and
retry.

* use logp.TestingSetup
  • Loading branch information
mauri870 authored Nov 8, 2024
1 parent 8cd6feb commit 9c34e4e
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 0 deletions.
28 changes: 28 additions & 0 deletions libbeat/outputs/otelconsumer/otelconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package otelconsumer
import (
"context"
"fmt"
"strings"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

"go.opentelemetry.io/collector/consumer"
Expand All @@ -42,6 +44,7 @@ type otelConsumer struct {
observer outputs.Observer
logsConsumer consumer.Logs
beatInfo beat.Info
log *logp.Logger
}

func makeOtelConsumer(_ outputs.IndexManager, beat beat.Info, observer outputs.Observer, cfg *config.C) (outputs.Group, error) {
Expand All @@ -50,6 +53,7 @@ func makeOtelConsumer(_ outputs.IndexManager, beat beat.Info, observer outputs.O
observer: observer,
logsConsumer: beat.LogConsumer,
beatInfo: beat,
log: logp.NewLogger("otelconsumer"),
}

ocConfig := defaultConfig()
Expand Down Expand Up @@ -99,6 +103,30 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)

err := out.logsConsumer.ConsumeLogs(ctx, pLogs)
if err != nil {
// If the batch is too large, the elasticsearchexporter will
// return a 413 error.
//
// At the moment, the exporter does not support batch splitting
// on error so we do it here.
//
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36163.
if strings.Contains(err.Error(), "Request Entity Too Large") {
// Try and split the batch into smaller batches and retry
if batch.SplitRetry() {
st.BatchSplit()
st.RetryableErrors(len(events))
} else {
// If the batch could not be split, there is no option left but
// to drop it and log the error state.
batch.Drop()
st.PermanentErrors(len(events))
out.log.Errorf("the batch is too large to be sent: %v", err)
}

// Don't propagate the error, the batch was split and retried.
return nil
}

// Permanent errors shouldn't be retried. This tipically means
// the data cannot be serialized by the exporter that is attached
// to the pipeline or when the destination refuses the data because
Expand Down
31 changes: 31 additions & 0 deletions libbeat/outputs/otelconsumer/otelconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/outest"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

Expand All @@ -46,12 +47,15 @@ func TestPublish(t *testing.T) {
makeOtelConsumer := func(t *testing.T, consumeFn func(ctx context.Context, ld plog.Logs) error) *otelConsumer {
t.Helper()

assert.NoError(t, logp.TestingSetup(logp.WithSelectors("otelconsumer")))

logConsumer, err := consumer.NewLogs(consumeFn)
assert.NoError(t, err)
consumer := &otelConsumer{
observer: outputs.NewNilObserver(),
logsConsumer: logConsumer,
beatInfo: beat.Info{},
log: logp.NewLogger("otelconsumer"),
}
return consumer
}
Expand Down Expand Up @@ -86,6 +90,33 @@ func TestPublish(t *testing.T) {
assert.Equal(t, outest.BatchRetry, batch.Signals[0].Tag)
})

t.Run("split batch on entity too large error", func(t *testing.T) {
batch := outest.NewBatch(event1, event2, event3)

otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
return errors.New("Request Entity Too Large")
})

err := otelConsumer.Publish(ctx, batch)
assert.NoError(t, err)
assert.Len(t, batch.Signals, 1)
assert.Equal(t, outest.BatchSplitRetry, batch.Signals[0].Tag)
})

t.Run("drop batch if can't split on entity too large error", func(t *testing.T) {
batch := outest.NewBatch(event1)

otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
return errors.New("Request Entity Too Large")
})

err := otelConsumer.Publish(ctx, batch)
assert.NoError(t, err)
assert.Len(t, batch.Signals, 2)
assert.Equal(t, outest.BatchSplitRetry, batch.Signals[0].Tag)
assert.Equal(t, outest.BatchDrop, batch.Signals[1].Tag)
})

t.Run("drop batch on permanent consumer error", func(t *testing.T) {
batch := outest.NewBatch(event1, event2, event3)

Expand Down

0 comments on commit 9c34e4e

Please sign in to comment.