@@ -5,25 +5,73 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe
55
66import (
77 "context"
8+ "errors"
89
910 "go.uber.org/zap"
1011
12+ "go.opentelemetry.io/collector/component"
1113 "go.opentelemetry.io/collector/exporter/exporterbatcher"
1214 "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1315 "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1416 "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
15- "go.opentelemetry.io/collector/exporter/exporterqueue"
1617)
1718
1819// QueueBatchSettings is a subset of the queuebatch.Settings that are needed when used within an Exporter.
1920type QueueBatchSettings [K any ] struct {
20- Encoding exporterqueue .Encoding [K ]
21+ Encoding queuebatch .Encoding [K ]
2122 Sizers map [exporterbatcher.SizerType ]queuebatch.Sizer [K ]
2223}
2324
25+ // NewDefaultQueueConfig returns the default config for QueueConfig.
26+ // By default, the queue stores 1000 items of telemetry and is non-blocking when full.
27+ func NewDefaultQueueConfig () QueueConfig {
28+ return QueueConfig {
29+ Enabled : true ,
30+ NumConsumers : 10 ,
31+ // By default, batches are 8192 spans, for a total of up to 8 million spans in the queue
32+ // This can be estimated at 1-4 GB worth of maximum memory usage
33+ // This default is probably still too high, and may be adjusted further down in a future release
34+ QueueSize : 1_000 ,
35+ Blocking : false ,
36+ }
37+ }
38+
39+ // QueueConfig defines configuration for queueing requests before exporting.
40+ // It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
41+ // Experimental: This API is at the early stage of development and may change without backward compatibility
42+ // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
43+ type QueueConfig struct {
44+ // Enabled indicates whether to not enqueue batches before exporting.
45+ Enabled bool `mapstructure:"enabled"`
46+ // NumConsumers is the number of consumers from the queue.
47+ NumConsumers int `mapstructure:"num_consumers"`
48+ // QueueSize is the maximum number of requests allowed in queue at any given time.
49+ QueueSize int `mapstructure:"queue_size"`
50+ // Blocking controls the queue behavior when full.
51+ // If true it blocks until enough space to add the new request to the queue.
52+ Blocking bool `mapstructure:"blocking"`
53+ // StorageID if not empty, enables the persistent storage and uses the component specified
54+ // as a storage extension for the persistent queue
55+ StorageID * component.ID `mapstructure:"storage"`
56+ }
57+
58+ // Validate checks if the Config is valid
59+ func (qCfg * QueueConfig ) Validate () error {
60+ if ! qCfg .Enabled {
61+ return nil
62+ }
63+ if qCfg .NumConsumers <= 0 {
64+ return errors .New ("`num_consumers` must be positive" )
65+ }
66+ if qCfg .QueueSize <= 0 {
67+ return errors .New ("`queue_size` must be positive" )
68+ }
69+ return nil
70+ }
71+
2472func NewQueueSender (
2573 qSet queuebatch.Settings [request.Request ],
26- qCfg exporterqueue. Config ,
74+ qCfg QueueConfig ,
2775 bCfg exporterbatcher.Config ,
2876 exportFailureMessage string ,
2977 next sender.Sender [request.Request ],
@@ -43,7 +91,7 @@ func NewQueueSender(
4391 return queuebatch .NewQueueBatch (qSet , newQueueBatchConfig (qCfg , bCfg ), exportFunc )
4492}
4593
46- func newQueueBatchConfig (qCfg exporterqueue. Config , bCfg exporterbatcher.Config ) queuebatch.Config {
94+ func newQueueBatchConfig (qCfg QueueConfig , bCfg exporterbatcher.Config ) queuebatch.Config {
4795 qbCfg := queuebatch.Config {
4896 Enabled : true ,
4997 WaitForResult : ! qCfg .Enabled ,
0 commit comments