Skip to content

Commit ddfd2b6

Browse files
committed
Add retry dropped item metrics and an exhausted retry error marker for exporter helper retries
Signed-off-by: Israel Blancas <[email protected]>
1 parent 71418b6 commit ddfd2b6

File tree

12 files changed

+326
-57
lines changed

12 files changed

+326
-57
lines changed

.chloggen/13956.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: pkg/exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add retry dropped item metrics and an exhausted retry error marker for exporter helper retries.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [13956]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |-
19+
New counters `otelcol_exporter_retry_dropped_{spans,metric_points,log_records}` capture items discarded after exhausting retries while `IsRetriesExhaustedErr` detects the terminal retry outcome.
20+
21+
# Optional: The change log or logs in which this entry should be included.
22+
# e.g. '[user]' or '[user, api]'
23+
# Include 'user' if the change is relevant to end users.
24+
# Include 'api' if there is a change to a library API.
25+
# Default: '[user]'
26+
change_logs: [user]

exporter/exporterhelper/documentation.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,30 @@ Current size of the retry queue (in batches). [alpha]
6262
| ---- | ----------- | ---------- | --------- |
6363
| {batches} | Gauge | Int | alpha |
6464

65+
### otelcol_exporter_retry_dropped_log_records
66+
67+
Number of log records dropped after exhausting configured retries. [alpha]
68+
69+
| Unit | Metric Type | Value Type | Monotonic | Stability |
70+
| ---- | ----------- | ---------- | --------- | --------- |
71+
| {records} | Sum | Int | true | alpha |
72+
73+
### otelcol_exporter_retry_dropped_metric_points
74+
75+
Number of metric points dropped after exhausting configured retries. [alpha]
76+
77+
| Unit | Metric Type | Value Type | Monotonic | Stability |
78+
| ---- | ----------- | ---------- | --------- | --------- |
79+
| {datapoints} | Sum | Int | true | alpha |
80+
81+
### otelcol_exporter_retry_dropped_spans
82+
83+
Number of spans dropped after exhausting configured retries. [alpha]
84+
85+
| Unit | Metric Type | Value Type | Monotonic | Stability |
86+
| ---- | ----------- | ---------- | --------- | --------- |
87+
| {spans} | Sum | Int | true | alpha |
88+
6589
### otelcol_exporter_send_failed_log_records
6690

6791
Number of log records in failed attempts to send to destination. [alpha]

exporter/exporterhelper/internal/experr/err.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33

44
package experr // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
55

6-
import (
7-
"errors"
8-
)
6+
import "errors"
97

108
type shutdownErr struct {
119
err error
@@ -27,3 +25,24 @@ func IsShutdownErr(err error) bool {
2725
var sdErr shutdownErr
2826
return errors.As(err, &sdErr)
2927
}
28+
29+
type retriesExhaustedErr struct {
30+
err error
31+
}
32+
33+
func NewRetriesExhaustedErr(err error) error {
34+
return retriesExhaustedErr{err: err}
35+
}
36+
37+
func (r retriesExhaustedErr) Error() string {
38+
return "retries exhausted: " + r.err.Error()
39+
}
40+
41+
func (r retriesExhaustedErr) Unwrap() error {
42+
return r.err
43+
}
44+
45+
func IsRetriesExhaustedErr(err error) bool {
46+
var reErr retriesExhaustedErr
47+
return errors.As(err, &reErr)
48+
}

exporter/exporterhelper/internal/experr/err_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,15 @@ func TestIsShutdownErr(t *testing.T) {
2222
err = NewShutdownErr(err)
2323
require.True(t, IsShutdownErr(err))
2424
}
25+
26+
func TestNewRetriesExhaustedErr(t *testing.T) {
27+
err := NewRetriesExhaustedErr(errors.New("another error"))
28+
assert.Equal(t, "retries exhausted: another error", err.Error())
29+
}
30+
31+
func TestIsRetriesExhaustedErr(t *testing.T) {
32+
err := errors.New("testError")
33+
require.False(t, IsRetriesExhaustedErr(err))
34+
err = NewRetriesExhaustedErr(err)
35+
require.True(t, IsRetriesExhaustedErr(err))
36+
}

exporter/exporterhelper/internal/metadata/generated_telemetry.go

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/exporterhelper/internal/metadatatest/generated_telemetrytest.go

Lines changed: 48 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/exporterhelper/internal/metadatatest/generated_telemetrytest_test.go

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/exporterhelper/internal/obs_report_sender.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"go.opentelemetry.io/collector/component"
1515
"go.opentelemetry.io/collector/exporter"
16+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
1617
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
1718
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1819
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
@@ -40,13 +41,14 @@ type obsReportSender[K request.Request] struct {
4041
component.StartFunc
4142
component.ShutdownFunc
4243

43-
spanName string
44-
tracer trace.Tracer
45-
spanAttrs trace.SpanStartEventOption
46-
metricAttr metric.MeasurementOption
47-
itemsSentInst metric.Int64Counter
48-
itemsFailedInst metric.Int64Counter
49-
next sender.Sender[K]
44+
spanName string
45+
tracer trace.Tracer
46+
spanAttrs trace.SpanStartEventOption
47+
metricAttr metric.MeasurementOption
48+
itemsSentInst metric.Int64Counter
49+
itemsFailedInst metric.Int64Counter
50+
itemsRetryDroppedInst metric.Int64Counter
51+
next sender.Sender[K]
5052
}
5153

5254
func newObsReportSender[K request.Request](set exporter.Settings, signal pipeline.Signal, next sender.Sender[K]) (sender.Sender[K], error) {
@@ -70,14 +72,17 @@ func newObsReportSender[K request.Request](set exporter.Settings, signal pipelin
7072
case pipeline.SignalTraces:
7173
or.itemsSentInst = telemetryBuilder.ExporterSentSpans
7274
or.itemsFailedInst = telemetryBuilder.ExporterSendFailedSpans
75+
or.itemsRetryDroppedInst = telemetryBuilder.ExporterRetryDroppedSpans
7376

7477
case pipeline.SignalMetrics:
7578
or.itemsSentInst = telemetryBuilder.ExporterSentMetricPoints
7679
or.itemsFailedInst = telemetryBuilder.ExporterSendFailedMetricPoints
80+
or.itemsRetryDroppedInst = telemetryBuilder.ExporterRetryDroppedMetricPoints
7781

7882
case pipeline.SignalLogs:
7983
or.itemsSentInst = telemetryBuilder.ExporterSentLogRecords
8084
or.itemsFailedInst = telemetryBuilder.ExporterSendFailedLogRecords
85+
or.itemsRetryDroppedInst = telemetryBuilder.ExporterRetryDroppedLogRecords
8186
}
8287

8388
return or, nil
@@ -116,6 +121,10 @@ func (ors *obsReportSender[K]) endOp(ctx context.Context, numLogRecords int, err
116121
if ors.itemsFailedInst != nil {
117122
ors.itemsFailedInst.Add(ctx, numFailedToSend, ors.metricAttr)
118123
}
124+
// Count drops after retries were exhausted.
125+
if err != nil && ors.itemsRetryDroppedInst != nil && experr.IsRetriesExhaustedErr(err) {
126+
ors.itemsRetryDroppedInst.Add(ctx, numFailedToSend, ors.metricAttr)
127+
}
119128

120129
span := trace.SpanFromContext(ctx)
121130
defer span.End()

exporter/exporterhelper/internal/obs_report_sender_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"go.opentelemetry.io/collector/component"
1919
"go.opentelemetry.io/collector/component/componenttest"
2020
"go.opentelemetry.io/collector/exporter"
21+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
2122
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadatatest"
2223
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2324
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
@@ -31,6 +32,43 @@ var (
3132
errFake = errors.New("errFake")
3233
)
3334

35+
func TestExportTraceRetryDroppedMetric(t *testing.T) {
36+
tt := componenttest.NewTelemetry()
37+
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
38+
39+
obsrep, err := newObsReportSender(
40+
exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
41+
pipeline.SignalTraces,
42+
sender.NewSender(func(context.Context, request.Request) error {
43+
return experr.NewRetriesExhaustedErr(errFake)
44+
}),
45+
)
46+
require.NoError(t, err)
47+
48+
req := &requesttest.FakeRequest{Items: 7}
49+
sendErr := obsrep.Send(context.Background(), req)
50+
require.Error(t, sendErr)
51+
require.True(t, experr.IsRetriesExhaustedErr(sendErr))
52+
53+
wantAttrs := attribute.NewSet(attribute.String("exporter", exporterID.String()))
54+
55+
metadatatest.AssertEqualExporterSendFailedSpans(t, tt,
56+
[]metricdata.DataPoint[int64]{
57+
{
58+
Attributes: wantAttrs,
59+
Value: int64(req.Items),
60+
},
61+
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
62+
63+
metadatatest.AssertEqualExporterRetryDroppedSpans(t, tt,
64+
[]metricdata.DataPoint[int64]{
65+
{
66+
Attributes: wantAttrs,
67+
Value: int64(req.Items),
68+
},
69+
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
70+
}
71+
3472
func TestExportTraceDataOp(t *testing.T) {
3573
tt := componenttest.NewTelemetry()
3674
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

exporter/exporterhelper/internal/retry_sender.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,17 @@ func (rs *retrySender) Send(ctx context.Context, req request.Request) error {
7979
}
8080
span := trace.SpanFromContext(ctx)
8181
retryNum := int64(0)
82+
retried := false
8283
var maxElapsedTime time.Time
8384
if rs.cfg.MaxElapsedTime > 0 {
8485
maxElapsedTime = time.Now().Add(rs.cfg.MaxElapsedTime)
8586
}
87+
wrapRetryErr := func(e error) error {
88+
if retried {
89+
return experr.NewRetriesExhaustedErr(e)
90+
}
91+
return e
92+
}
8693
for {
8794
span.AddEvent(
8895
"Sending request.",
@@ -104,7 +111,7 @@ func (rs *retrySender) Send(ctx context.Context, req request.Request) error {
104111

105112
backoffDelay := expBackoff.NextBackOff()
106113
if backoffDelay == backoff.Stop {
107-
return fmt.Errorf("no more retries left: %w", err)
114+
return wrapRetryErr(fmt.Errorf("no more retries left: %w", err))
108115
}
109116

110117
throttleErr := throttleRetry{}
@@ -115,13 +122,13 @@ func (rs *retrySender) Send(ctx context.Context, req request.Request) error {
115122
nextRetryTime := time.Now().Add(backoffDelay)
116123
if !maxElapsedTime.IsZero() && maxElapsedTime.Before(nextRetryTime) {
117124
// The delay is longer than the maxElapsedTime.
118-
return fmt.Errorf("no more retries left: %w", err)
125+
return wrapRetryErr(fmt.Errorf("no more retries left: %w", err))
119126
}
120127

121128
if deadline, has := ctx.Deadline(); has && deadline.Before(nextRetryTime) {
122129
// The delay is longer than the deadline. There is no point in
123130
// waiting for cancelation.
124-
return fmt.Errorf("request will be cancelled before next retry: %w", err)
131+
return wrapRetryErr(fmt.Errorf("request will be cancelled before next retry: %w", err))
125132
}
126133

127134
backoffDelayStr := backoffDelay.String()
@@ -140,10 +147,11 @@ func (rs *retrySender) Send(ctx context.Context, req request.Request) error {
140147
// back-off, but get interrupted when shutting down or request is cancelled or timed out.
141148
select {
142149
case <-ctx.Done():
143-
return fmt.Errorf("request is cancelled or timed out: %w", err)
150+
return wrapRetryErr(fmt.Errorf("request is cancelled or timed out: %w", err))
144151
case <-rs.stopCh:
145152
return experr.NewShutdownErr(err)
146153
case <-time.After(backoffDelay):
154+
retried = true
147155
}
148156
}
149157
}

0 commit comments

Comments
 (0)