Skip to content

Commit beabbce

Browse files
committed
[exporterhelper] Add queue options to the new exporter helper
This change enabled queue capability for the new exporter helper. For now, it preserves the same user configuration interface as the existing exporter helper has. The only difference is that implementing persistence is optional now as it requires providing marshal and unmarshal functions for the custom request. Later, it's possible to introduce more options for controlling the queue: count of items or bytes in the queue.
1 parent 7030388 commit beabbce

File tree

10 files changed

+234
-13
lines changed

10 files changed

+234
-13
lines changed

.chloggen/exporter-helper-v2.yaml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
5+
component: exporter/exporterhelper
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Add API for enabling queue in the new exporter helpers.
9+
10+
# One or more tracking issues or pull requests related to the change
11+
issues: [7874]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext: |
17+
The following experimental API is introduced in exporter/exporterhelper package:
18+
- `WithRequestQueue`: a new exporter helper option for using a queue.
19+
- `Queue`: an interface for queueing requests.
20+
- `NewMemoryQueue`: a function for creating a new memory queue.
21+
- `NewPersistentQueue`: a function for creating a new persistent queue.
22+
- `QueueConfig`: a configuration for queueing requests used by WithMemoryQueue option.
23+
- `NewDefaultQueueConfig`: a function for creating a default QueueConfig.
24+
- `PersistentQueueConfig`: a configuration for queueing requests in persistent storage used by WithPersistentQueue option.
25+
- `NewDefaultPersistentQueueConfig`: a function for creating a default PersistentQueueConfig.
26+
All the new APIs are intended to be used by exporters that need to operate over client-provided requests instead of pdata.
27+
28+
29+
# Optional: The change log or logs in which this entry should be included.
30+
# e.g. '[user]' or '[user, api]'
31+
# Include 'user' if the change is relevant to end users.
32+
# Include 'api' if there is a change to a library API.
33+
# Default: '[user]'
34+
change_logs: [api]

CHANGELOG-API.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ If you are looking for user-facing changes, check out [CHANGELOG.md](./CHANGELOG
2121
- `LogsConverter`: an interface for converting plog.Logs to Request.
2222
- `MetricsConverter`: an interface for converting pmetric.Metrics to Request.
2323
- `TracesConverter`: an interface for converting ptrace.Traces to Request.
24-
All the new APIs are intended to be used by exporters that need to operate over client-provided requests instead of pdata.
24+
All the new APIs are intended to be used by exporters that operate over client-provided requests instead of pdata.
2525

2626
- `otlpreceiver`: Export HTTPConfig as part of the API for creating the otlpreceiver configuration. (#8175)
2727
Changes signature of receiver/otlpreceiver/config.go type httpServerSettings to HTTPConfig.

exporter/exporterhelper/common.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ type baseSettings struct {
6262
component.ShutdownFunc
6363
consumerOptions []consumer.Option
6464
TimeoutSettings
65-
queue internal.ProducerConsumerQueue
65+
queue Queue
6666
RetrySettings
6767
requestExporter bool
6868
marshaler internal.RequestMarshaler
@@ -131,7 +131,8 @@ func WithRetry(retrySettings RetrySettings) Option {
131131
func WithQueue(config QueueSettings) Option {
132132
return func(o *baseSettings) {
133133
if o.requestExporter {
134-
panic("queueing is not available for the new request exporters yet")
134+
panic("this option is not available for the new request exporters, " +
135+
"use WithMemoryQueue or WithPersistentQueue instead")
135136
}
136137
if !config.Enabled {
137138
return
@@ -144,6 +145,15 @@ func WithQueue(config QueueSettings) Option {
144145
}
145146
}
146147

148+
// WithRequestQueue enables queueing for an exporter.
149+
// This API is at the early stage of development and may change without backward compatibility
150+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
151+
func WithRequestQueue(queue Queue) Option {
152+
return func(o *baseSettings) {
153+
o.queue = queue
154+
}
155+
}
156+
147157
// WithCapabilities overrides the default Capabilities() function for a Consumer.
148158
// The default is non-mutable data.
149159
// TODO: Verify if we can change the default to be mutable as we do for processors.

exporter/exporterhelper/internal/bounded_memory_queue.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,5 @@ func (q *boundedMemoryQueue) Capacity() int {
101101
func (q *boundedMemoryQueue) IsPersistent() bool {
102102
return false
103103
}
104+
105+
func (q *boundedMemoryQueue) unexported() {}

exporter/exporterhelper/internal/persistent_queue.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ func (pq *persistentQueue) IsPersistent() bool {
108108
return true
109109
}
110110

111+
func (pq *persistentQueue) unexported() {}
112+
111113
func toStorageClient(ctx context.Context, storageID component.ID, host component.Host, ownerID component.ID, signal component.DataType) (storage.Client, error) {
112114
extension, err := getStorageExtension(host.GetExtensions(), storageID)
113115
if err != nil {

exporter/exporterhelper/internal/producer_consumer_queue.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,6 @@ type ProducerConsumerQueue interface {
3737
// IsPersistent returns true if the queue is persistent.
3838
// TODO: Do not expose this method if the interface moves to a public package.
3939
IsPersistent() bool
40+
41+
unexported()
4042
}

exporter/exporterhelper/queued_retry.go

Lines changed: 113 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,58 @@ const defaultQueueSize = 1000
2727

2828
var errSendingQueueIsFull = errors.New("sending_queue is full")
2929

30+
// Queue defines the queue interface for exporterhelper.
31+
// This API is at the early stage of development and may change without backward compatibility
32+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
33+
type Queue = internal.ProducerConsumerQueue
34+
35+
// NewMemoryQueue creates a new in-memory queue. If config.Enabled is false, it returns nil.
36+
// Should be used with WithQueue option.
37+
// This API is at the early stage of development and may change without backward compatibility
38+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
39+
func NewMemoryQueue(config QueueConfig) Queue {
40+
if !config.Enabled {
41+
return nil
42+
}
43+
return internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
44+
}
45+
46+
// NewPersistentQueue creates a new queue backed by file storage if config.StorageID is not nil.
47+
// If config.StorageID is nil, it creates a new in-memory queue. If config.Enabled is false, it returns nil.
48+
// Should be used with WithQueue option.
49+
// This API is at the early stage of development and may change without backward compatibility
50+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
51+
func NewPersistentQueue(config PersistentQueueConfig, marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) Queue {
52+
if !config.Enabled {
53+
return nil
54+
}
55+
if config.StorageID == nil {
56+
return internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
57+
}
58+
return internal.NewPersistentQueue(
59+
config.QueueSize,
60+
config.NumConsumers,
61+
*config.StorageID,
62+
func(req internal.Request) ([]byte, error) {
63+
r, ok := req.(*request)
64+
if !ok {
65+
return nil, fmt.Errorf("invalid request type: %T", req)
66+
}
67+
return marshaler(r.Request)
68+
},
69+
func(data []byte) (internal.Request, error) {
70+
req, err := unmarshaler(data)
71+
if err != nil {
72+
return nil, err
73+
}
74+
return &request{
75+
Request: req,
76+
baseRequest: baseRequest{ctx: context.Background()},
77+
}, nil
78+
},
79+
)
80+
}
81+
3082
// QueueSettings defines configuration for queueing batches before sending to the consumerSender.
3183
type QueueSettings struct {
3284
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
@@ -65,20 +117,78 @@ func (qCfg *QueueSettings) Validate() error {
65117
return nil
66118
}
67119

120+
// QueueConfig defines configuration for queueing requests before exporting.
121+
// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
122+
// This API is at the early stage of development and may change without backward compatibility
123+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
124+
type QueueConfig struct {
125+
// Enabled indicates whether to not enqueue batches before exporting.
126+
Enabled bool `mapstructure:"enabled"`
127+
// NumConsumers is the number of consumers from the queue.
128+
NumConsumers int `mapstructure:"num_consumers"`
129+
// QueueSize is the maximum number of batches allowed in queue at a given time.
130+
// This field is left for backward compatibility with QueueSettings.
131+
// Later, it will be replaced with size fields specified explicitly in terms of items or batches.
132+
QueueSize int `mapstructure:"queue_size"`
133+
}
134+
135+
// NewDefaultQueueConfig returns the default QueueConfig.
136+
// This API is at the early stage of development and may change without backward compatibility
137+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
138+
func NewDefaultQueueConfig() QueueConfig {
139+
return QueueConfig{
140+
Enabled: true,
141+
NumConsumers: 10,
142+
QueueSize: defaultQueueSize,
143+
}
144+
}
145+
146+
// PersistentQueueConfig defines configuration for queueing requests before exporting using a persistent storage.
147+
// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter and will replace
148+
// QueueSettings in the future.
149+
// This API is at the early stage of development and may change without backward compatibility
150+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
151+
type PersistentQueueConfig struct {
152+
QueueConfig `mapstructure:",squash"`
153+
// StorageID if not empty, enables the persistent storage and uses the component specified
154+
// as a storage extension for the persistent queue
155+
StorageID *component.ID `mapstructure:"storage"`
156+
}
157+
158+
// NewDefaultPersistentQueueConfig returns the default PersistentQueueConfig.
159+
// This API is at the early stage of development and may change without backward compatibility
160+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
161+
func NewDefaultPersistentQueueConfig() PersistentQueueConfig {
162+
return PersistentQueueConfig{
163+
QueueConfig: NewDefaultQueueConfig(),
164+
}
165+
}
166+
167+
// Validate checks if the QueueSettings configuration is valid
168+
func (qCfg *QueueConfig) Validate() error {
169+
if !qCfg.Enabled {
170+
return nil
171+
}
172+
if qCfg.QueueSize <= 0 {
173+
return errors.New("queue size must be positive")
174+
}
175+
return nil
176+
}
177+
68178
type queuedRetrySender struct {
69179
fullName string
70180
id component.ID
71181
signal component.DataType
72182
consumerSender requestSender
73-
queue internal.ProducerConsumerQueue
183+
queue Queue
74184
retryStopCh chan struct{}
75185
traceAttribute attribute.KeyValue
76186
logger *zap.Logger
77187
requeuingEnabled bool
78188
}
79189

80-
func newQueuedRetrySender(id component.ID, signal component.DataType, queue internal.ProducerConsumerQueue,
81-
rCfg RetrySettings, nextSender requestSender, logger *zap.Logger) *queuedRetrySender {
190+
func newQueuedRetrySender(id component.ID, signal component.DataType, queue Queue, rCfg RetrySettings,
191+
nextSender requestSender, logger *zap.Logger) *queuedRetrySender {
82192
retryStopCh := make(chan struct{})
83193
sampledLogger := createSampledLogger(logger)
84194
traceAttr := attribute.String(obsmetrics.ExporterKey, id.String())

exporter/exporterhelper/queued_retry_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,18 @@ func TestQueueSettings_Validate(t *testing.T) {
392392
assert.NoError(t, qCfg.Validate())
393393
}
394394

395+
func TestQueueConfig_Validate(t *testing.T) {
396+
qCfg := NewDefaultQueueConfig()
397+
assert.NoError(t, qCfg.Validate())
398+
399+
qCfg.QueueSize = 0
400+
assert.EqualError(t, qCfg.Validate(), "queue size must be positive")
401+
402+
// Confirm Validate doesn't return error with invalid config when feature is disabled
403+
qCfg.Enabled = false
404+
assert.NoError(t, qCfg.Validate())
405+
}
406+
395407
// if requeueing is enabled, we eventually retry even if we failed at first
396408
func TestQueuedRetry_RequeuingEnabled(t *testing.T) {
397409
qCfg := NewDefaultQueueSettings()
@@ -505,6 +517,31 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
505517
require.Error(t, be.Start(context.Background(), host), "could not get storage client")
506518
}
507519

520+
func TestPersistentQueueRetryPersistenceEnabledStorageError(t *testing.T) {
521+
storageError := errors.New("could not get storage client")
522+
tt, err := obsreporttest.SetupTelemetry(defaultID)
523+
require.NoError(t, err)
524+
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
525+
526+
qCfg := NewDefaultPersistentQueueConfig()
527+
storageID := component.NewIDWithName("file_storage", "storage")
528+
qCfg.StorageID = &storageID // enable persistence
529+
rCfg := NewDefaultRetrySettings()
530+
set := tt.ToExporterCreateSettings()
531+
bs := newBaseSettings(true, nil, nil, WithRetry(rCfg),
532+
WithRequestQueue(NewPersistentQueue(qCfg, fakeRequestMarshaler, fakeRequestUnmarshaler)))
533+
be, err := newBaseExporter(set, bs, "")
534+
require.NoError(t, err)
535+
536+
var extensions = map[component.ID]component.Component{
537+
storageID: &mockStorageExtension{GetClientError: storageError},
538+
}
539+
host := &mockHost{ext: extensions}
540+
541+
// we fail to start if we get an error creating the storage client
542+
require.Error(t, be.Start(context.Background(), host), "could not get storage client")
543+
}
544+
508545
func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) {
509546

510547
produceCounter := &atomic.Uint32{}

exporter/exporterhelper/request.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,9 @@ func (req *request) Count() int {
5252
}
5353
return 0
5454
}
55+
56+
// RequestMarshaler is a function that can marshal a Request into bytes.
57+
type RequestMarshaler func(req Request) ([]byte, error)
58+
59+
// RequestUnmarshaler is a function that can unmarshal bytes into a Request.
60+
type RequestUnmarshaler func(data []byte) (Request, error)

exporter/exporterhelper/request_test.go

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,25 @@ package exporterhelper
55

66
import (
77
"context"
8+
"encoding/json"
9+
"errors"
810

911
"go.opentelemetry.io/collector/pdata/plog"
1012
"go.opentelemetry.io/collector/pdata/pmetric"
1113
"go.opentelemetry.io/collector/pdata/ptrace"
1214
)
1315

1416
type fakeRequest struct {
15-
items int
16-
err error
17+
Items int
18+
Err error
1719
}
1820

1921
func (r fakeRequest) Export(_ context.Context) error {
20-
return r.err
22+
return r.Err
2123
}
2224

2325
func (r fakeRequest) ItemsCount() int {
24-
return r.items
26+
return r.Items
2527
}
2628

2729
type fakeRequestConverter struct {
@@ -32,13 +34,29 @@ type fakeRequestConverter struct {
3234
}
3335

3436
func (c fakeRequestConverter) RequestFromMetrics(_ context.Context, md pmetric.Metrics) (Request, error) {
35-
return fakeRequest{items: md.DataPointCount(), err: c.requestError}, c.metricsError
37+
return fakeRequest{Items: md.DataPointCount(), Err: c.requestError}, c.metricsError
3638
}
3739

3840
func (c fakeRequestConverter) RequestFromTraces(_ context.Context, td ptrace.Traces) (Request, error) {
39-
return fakeRequest{items: td.SpanCount(), err: c.requestError}, c.tracesError
41+
return fakeRequest{Items: td.SpanCount(), Err: c.requestError}, c.tracesError
4042
}
4143

4244
func (c fakeRequestConverter) RequestFromLogs(_ context.Context, ld plog.Logs) (Request, error) {
43-
return fakeRequest{items: ld.LogRecordCount(), err: c.requestError}, c.logsError
45+
return fakeRequest{Items: ld.LogRecordCount(), Err: c.requestError}, c.logsError
46+
}
47+
48+
func fakeRequestMarshaler(req Request) ([]byte, error) {
49+
r, ok := req.(fakeRequest)
50+
if !ok {
51+
return nil, errors.New("invalid request type")
52+
}
53+
return json.Marshal(r)
54+
}
55+
56+
func fakeRequestUnmarshaler(bytes []byte) (Request, error) {
57+
var r fakeRequest
58+
if err := json.Unmarshal(bytes, &r); err != nil {
59+
return nil, err
60+
}
61+
return r, nil
4462
}

0 commit comments

Comments
 (0)