Skip to content

Commit d130cd9

Browse files
committed
OBS-6820: add otelcol_exporter_retry_dropped_*, based from open-telemetry#13957
Signed-off-by: Jayson Cena <[email protected]>
1 parent 42a3ae0 commit d130cd9

File tree

10 files changed

+221
-11
lines changed

10 files changed

+221
-11
lines changed

exporter/exporterhelper/internal/experr/err.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,24 @@ func IsShutdownErr(err error) bool {
2727
var sdErr shutdownErr
2828
return errors.As(err, &sdErr)
2929
}
30+
31+
type retriesExhaustedErr struct {
32+
err error
33+
}
34+
35+
func NewRetriesExhaustedErr(err error) error {
36+
return retriesExhaustedErr{err: err}
37+
}
38+
39+
func (r retriesExhaustedErr) Error() string {
40+
return "retries exhausted: " + r.err.Error()
41+
}
42+
43+
func (r retriesExhaustedErr) Unwrap() error {
44+
return r.err
45+
}
46+
47+
func IsRetriesExhaustedErr(err error) bool {
48+
var reErr retriesExhaustedErr
49+
return errors.As(err, &reErr)
50+
}

exporter/exporterhelper/internal/experr/err_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,15 @@ func TestIsShutdownErr(t *testing.T) {
2222
err = NewShutdownErr(err)
2323
require.True(t, IsShutdownErr(err))
2424
}
25+
26+
func TestNewRetriesExhaustedErr(t *testing.T) {
27+
err := NewRetriesExhaustedErr(errors.New("another error"))
28+
assert.Equal(t, "retries exhausted: another error", err.Error())
29+
}
30+
31+
func TestIsRetriesExhaustedErr(t *testing.T) {
32+
err := errors.New("testError")
33+
require.False(t, IsRetriesExhaustedErr(err))
34+
err = NewRetriesExhaustedErr(err)
35+
require.True(t, IsRetriesExhaustedErr(err))
36+
}

exporter/exporterhelper/internal/metadata/generated_telemetry.go

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/exporterhelper/internal/metadatatest/generated_telemetrytest.go

Lines changed: 48 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/exporterhelper/internal/metadatatest/generated_telemetrytest_test.go

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/exporterhelper/internal/obs_report_sender.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"go.opentelemetry.io/collector/component"
1515
"go.opentelemetry.io/collector/exporter"
16+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
1617
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
1718
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1819
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
@@ -40,13 +41,14 @@ type obsReportSender[K request.Request] struct {
4041
component.StartFunc
4142
component.ShutdownFunc
4243

43-
spanName string
44-
tracer trace.Tracer
45-
spanAttrs trace.SpanStartEventOption
46-
metricAttr metric.MeasurementOption
47-
itemsSentInst metric.Int64Counter
48-
itemsFailedInst metric.Int64Counter
49-
next sender.Sender[K]
44+
spanName string
45+
tracer trace.Tracer
46+
spanAttrs trace.SpanStartEventOption
47+
metricAttr metric.MeasurementOption
48+
itemsSentInst metric.Int64Counter
49+
itemsFailedInst metric.Int64Counter
50+
itemsRetryDroppedInst metric.Int64Counter
51+
next sender.Sender[K]
5052
}
5153

5254
func newObsReportSender[K request.Request](set exporter.Settings, signal pipeline.Signal, next sender.Sender[K]) (sender.Sender[K], error) {
@@ -70,14 +72,17 @@ func newObsReportSender[K request.Request](set exporter.Settings, signal pipelin
7072
case pipeline.SignalTraces:
7173
or.itemsSentInst = telemetryBuilder.ExporterSentSpans
7274
or.itemsFailedInst = telemetryBuilder.ExporterSendFailedSpans
75+
or.itemsRetryDroppedInst = telemetryBuilder.ExporterRetryDroppedSpans
7376

7477
case pipeline.SignalMetrics:
7578
or.itemsSentInst = telemetryBuilder.ExporterSentMetricPoints
7679
or.itemsFailedInst = telemetryBuilder.ExporterSendFailedMetricPoints
80+
or.itemsRetryDroppedInst = telemetryBuilder.ExporterRetryDroppedMetricPoints
7781

7882
case pipeline.SignalLogs:
7983
or.itemsSentInst = telemetryBuilder.ExporterSentLogRecords
8084
or.itemsFailedInst = telemetryBuilder.ExporterSendFailedLogRecords
85+
or.itemsRetryDroppedInst = telemetryBuilder.ExporterRetryDroppedLogRecords
8186
}
8287

8388
return or, nil
@@ -116,6 +121,10 @@ func (ors *obsReportSender[K]) endOp(ctx context.Context, numLogRecords int, err
116121
if ors.itemsFailedInst != nil {
117122
ors.itemsFailedInst.Add(ctx, numFailedToSend, ors.metricAttr)
118123
}
124+
// Count drops after retries were exhausted.
125+
if err != nil && ors.itemsRetryDroppedInst != nil && experr.IsRetriesExhaustedErr(err) {
126+
ors.itemsRetryDroppedInst.Add(ctx, numFailedToSend, ors.metricAttr)
127+
}
119128

120129
span := trace.SpanFromContext(ctx)
121130
defer span.End()

exporter/exporterhelper/internal/obs_report_sender_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"go.opentelemetry.io/collector/component"
1919
"go.opentelemetry.io/collector/component/componenttest"
2020
"go.opentelemetry.io/collector/exporter"
21+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
2122
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadatatest"
2223
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2324
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
@@ -31,6 +32,43 @@ var (
3132
errFake = errors.New("errFake")
3233
)
3334

35+
func TestExportTraceRetryDroppedMetric(t *testing.T) {
36+
tt := componenttest.NewTelemetry()
37+
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
38+
39+
obsrep, err := newObsReportSender(
40+
exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
41+
pipeline.SignalTraces,
42+
sender.NewSender(func(context.Context, request.Request) error {
43+
return experr.NewRetriesExhaustedErr(errFake)
44+
}),
45+
)
46+
require.NoError(t, err)
47+
48+
req := &requesttest.FakeRequest{Items: 7}
49+
sendErr := obsrep.Send(context.Background(), req)
50+
require.Error(t, sendErr)
51+
require.True(t, experr.IsRetriesExhaustedErr(sendErr))
52+
53+
wantAttrs := attribute.NewSet(attribute.String("exporter", exporterID.String()))
54+
55+
metadatatest.AssertEqualExporterSendFailedSpans(t, tt,
56+
[]metricdata.DataPoint[int64]{
57+
{
58+
Attributes: wantAttrs,
59+
Value: int64(req.Items),
60+
},
61+
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
62+
63+
metadatatest.AssertEqualExporterRetryDroppedSpans(t, tt,
64+
[]metricdata.DataPoint[int64]{
65+
{
66+
Attributes: wantAttrs,
67+
Value: int64(req.Items),
68+
},
69+
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
70+
}
71+
3472
func TestExportTraceDataOp(t *testing.T) {
3573
tt := componenttest.NewTelemetry()
3674
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

exporter/exporterhelper/internal/retry_sender.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,17 @@ func (rs *retrySender) Send(ctx context.Context, req request.Request) error {
7979
}
8080
span := trace.SpanFromContext(ctx)
8181
retryNum := int64(0)
82+
retried := false
8283
var maxElapsedTime time.Time
8384
if rs.cfg.MaxElapsedTime > 0 {
8485
maxElapsedTime = time.Now().Add(rs.cfg.MaxElapsedTime)
8586
}
87+
wrapRetryErr := func(e error) error {
88+
if retried {
89+
return experr.NewRetriesExhaustedErr(e)
90+
}
91+
return e
92+
}
8693
for {
8794
span.AddEvent(
8895
"Sending request.",
@@ -104,7 +111,7 @@ func (rs *retrySender) Send(ctx context.Context, req request.Request) error {
104111

105112
backoffDelay := expBackoff.NextBackOff()
106113
if backoffDelay == backoff.Stop {
107-
return fmt.Errorf("no more retries left: %w", err)
114+
return wrapRetryErr(fmt.Errorf("no more retries left: %w", err))
108115
}
109116

110117
throttleErr := throttleRetry{}
@@ -115,13 +122,13 @@ func (rs *retrySender) Send(ctx context.Context, req request.Request) error {
115122
nextRetryTime := time.Now().Add(backoffDelay)
116123
if !maxElapsedTime.IsZero() && maxElapsedTime.Before(nextRetryTime) {
117124
// The delay is longer than the maxElapsedTime.
118-
return fmt.Errorf("no more retries left: %w", err)
125+
return wrapRetryErr(fmt.Errorf("no more retries left: %w", err))
119126
}
120127

121128
if deadline, has := ctx.Deadline(); has && deadline.Before(nextRetryTime) {
122129
// The delay is longer than the deadline. There is no point in
123130
// waiting for cancelation.
124-
return fmt.Errorf("request will be cancelled before next retry: %w", err)
131+
return wrapRetryErr(fmt.Errorf("request will be cancelled before next retry: %w", err))
125132
}
126133

127134
backoffDelayStr := backoffDelay.String()
@@ -140,7 +147,7 @@ func (rs *retrySender) Send(ctx context.Context, req request.Request) error {
140147
// back-off, but get interrupted when shutting down or request is cancelled or timed out.
141148
select {
142149
case <-ctx.Done():
143-
return fmt.Errorf("request is cancelled or timed out: %w", err)
150+
return wrapRetryErr(fmt.Errorf("request is cancelled or timed out: %w", err))
144151
case <-rs.stopCh:
145152
return experr.NewShutdownErr(err)
146153
case <-time.After(backoffDelay):

exporter/exporterhelper/internal/retry_sender_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"go.opentelemetry.io/collector/component/componenttest"
1919
"go.opentelemetry.io/collector/config/configretry"
2020
"go.opentelemetry.io/collector/consumer/consumererror"
21+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
2122
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2223
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
2324
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
@@ -76,6 +77,26 @@ func TestRetrySenderMaxElapsedTime(t *testing.T) {
7677
require.NoError(t, rs.Shutdown(context.Background()))
7778
}
7879

80+
func TestRetrySenderRetriesExhaustedErrorWrapped(t *testing.T) {
81+
rCfg := configretry.NewDefaultBackOffConfig()
82+
rCfg.InitialInterval = time.Millisecond
83+
rCfg.RandomizationFactor = 0
84+
rCfg.Multiplier = 1
85+
rCfg.MaxInterval = time.Millisecond
86+
rCfg.MaxElapsedTime = 4 * time.Millisecond
87+
var attempts int
88+
rs := newRetrySender(rCfg, exportertest.NewNopSettings(exportertest.NopType), sender.NewSender(func(context.Context, request.Request) error {
89+
attempts++
90+
return errors.New("transient error")
91+
}))
92+
require.NoError(t, rs.Start(context.Background(), componenttest.NewNopHost()))
93+
err := rs.Send(context.Background(), &requesttest.FakeRequest{Items: 2})
94+
require.Error(t, err)
95+
require.True(t, experr.IsRetriesExhaustedErr(err))
96+
require.GreaterOrEqual(t, attempts, 2)
97+
require.NoError(t, rs.Shutdown(context.Background()))
98+
}
99+
79100
func TestRetrySenderThrottleError(t *testing.T) {
80101
rCfg := configretry.NewDefaultBackOffConfig()
81102
rCfg.InitialInterval = 10 * time.Millisecond

exporter/exporterhelper/metadata.yaml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,33 @@ telemetry:
121121
gauge:
122122
value_type: int
123123
async: true
124+
125+
exporter_retry_dropped_log_records:
126+
enabled: true
127+
stability:
128+
level: alpha
129+
description: Number of log records dropped after exhausting configured retries.
130+
unit: "{records}"
131+
sum:
132+
value_type: int
133+
monotonic: true
134+
135+
exporter_retry_dropped_metric_points:
136+
enabled: true
137+
stability:
138+
level: alpha
139+
description: Number of metric points dropped after exhausting configured retries.
140+
unit: "{datapoints}"
141+
sum:
142+
value_type: int
143+
monotonic: true
144+
145+
exporter_retry_dropped_spans:
146+
enabled: true
147+
stability:
148+
level: alpha
149+
description: Number of spans dropped after exhausting configured retries.
150+
unit: "{spans}"
151+
sum:
152+
value_type: int
153+
monotonic: true

0 commit comments

Comments
 (0)