Skip to content

Commit

Permalink
[exporterhelper] Add WithRequestQueue option to the exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Oct 5, 2023
1 parent b89b819 commit 0112f40
Show file tree
Hide file tree
Showing 37 changed files with 940 additions and 511 deletions.
26 changes: 26 additions & 0 deletions .chloggen/exporter-helper-v2-move-request.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporter/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Move the experimental request API to a separate package.

# One or more tracking issues or pull requests related to the change
issues: [7874]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The following experimental Request API is moved from exporter/exporterhelper to exporter/exporterhelper/request package:
- `Request` -> `request.Request`
- `RequestItemsCounter` -> `request.ItemsCounter`
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
37 changes: 37 additions & 0 deletions .chloggen/exporter-helper-v2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporter/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add API for enabling queue in the new exporter helpers.

# One or more tracking issues or pull requests related to the change
issues: [7874]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The following experimental API is introduced in exporter/exporterhelper package:
- `WithRequestQueue`: a new exporter helper option for using a queue.
- queue.Queue: an interface for queue implementations.
- queue.Factory: a queue factory interface, implementations of this interface are intended to be used with WithRequestQueue option.
- queue.Settings: queue factory settings.
- queue.Config: common configuration for queue implementations.
- queue.NewDefaultConfig: a function for creating a default queue configuration.
- queue/memoryqueue.NewFactory: a new factory for creating a memory queue.
- queue/memoryqueue.Config: a configuration for the memory queue factory.
- queue/memoryqueue.NewDefaultConfig: a function for creating a default memory queue configuration.
- request.ErrorHandler: an optional interface for handling errors that occur during request processing.
- request.Marshaler: a function that can marshal a Request into bytes.
- request.Unmarshaler: a function that can unmarshal bytes into a Request
All the new APIs are intended to be used by exporters that operate over client-provided requests instead of pdata.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
145 changes: 70 additions & 75 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,63 +9,52 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/queue"
"go.opentelemetry.io/collector/exporter/exporterhelper/queue/persistentqueue"
"go.opentelemetry.io/collector/exporter/exporterhelper/request"
)

// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
type requestSender interface {
start(ctx context.Context, host component.Host, set exporter.CreateSettings) error
shutdown()
send(req internal.Request) error
setNextSender(nextSender requestSender)
}

type baseRequestSender struct {
nextSender requestSender
}

var _ requestSender = (*baseRequestSender)(nil)

func (b *baseRequestSender) start(context.Context, component.Host, exporter.CreateSettings) error {
return nil
}

func (b *baseRequestSender) shutdown() {}

func (b *baseRequestSender) send(req internal.Request) error {
return b.nextSender.send(req)
send(req *intrequest.Request) error
}

func (b *baseRequestSender) setNextSender(nextSender requestSender) {
b.nextSender = nextSender
type starter interface {
start(context.Context, component.Host) error
}

type obsrepSenderFactory func(obsrep *obsExporter) requestSender

// baseRequest is a base implementation for the internal.Request.
type baseRequest struct {
ctx context.Context
processingFinishedCallback func()
type shutdowner interface {
shutdown()
}

func (req *baseRequest) Context() context.Context {
return req.ctx
type baseRequestSender struct {
sender requestSender
nextSender *baseRequestSender
}

func (req *baseRequest) SetContext(ctx context.Context) {
req.ctx = ctx
func (b *baseRequestSender) start(ctx context.Context, host component.Host) error {
if s, ok := b.sender.(starter); ok {
return s.start(ctx, host)
}
return nil
}

func (req *baseRequest) SetOnProcessingFinished(callback func()) {
req.processingFinishedCallback = callback
func (b *baseRequestSender) shutdown() {
if s, ok := b.sender.(shutdowner); ok {
s.shutdown()
}
}

func (req *baseRequest) OnProcessingFinished() {
if req.processingFinishedCallback != nil {
req.processingFinishedCallback()
func (b *baseRequestSender) send(req *intrequest.Request) error {
if b.sender == nil {
return b.nextSender.send(req)
}
return b.sender.send(req)
}

type obsrepSenderFactory func(obsrep *obsExporter, nextSender *baseRequestSender) requestSender

// Option apply changes to baseExporter.
type Option func(*baseExporter)

Expand All @@ -89,15 +78,15 @@ func WithShutdown(shutdown component.ShutdownFunc) Option {
// The default TimeoutSettings is 5 seconds.
func WithTimeout(timeoutSettings TimeoutSettings) Option {
return func(o *baseExporter) {
o.timeoutSender.cfg = timeoutSettings
o.timeoutSender.sender = &timeoutSender{cfg: timeoutSettings}
}
}

// WithRetry overrides the default RetrySettings for an exporter.
// The default RetrySettings is to disable retries.
func WithRetry(retrySettings RetrySettings) Option {
return func(o *baseExporter) {
o.retrySender = newRetrySender(o.set.ID, retrySettings, o.set.Logger, o.onTemporaryFailure)
o.retrySender.sender = newRetrySender(o.set.ID, retrySettings, o.set.Logger, o.onTemporaryFailure, o.retrySender.nextSender)
}
}

Expand All @@ -107,19 +96,32 @@ func WithRetry(retrySettings RetrySettings) Option {
func WithQueue(config QueueSettings) Option {
return func(o *baseExporter) {
if o.requestExporter {
panic("queueing is not available for the new request exporters yet")
panic("this option is not available for the new request exporters, " +
"use WithMemoryQueue or WithPersistentQueue instead")
}
var queue internal.ProducerConsumerQueue
if config.Enabled {
if config.StorageID == nil {
queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
} else {
queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler)
}
}
qs := newQueueSender(o.set.ID, o.signal, queue, o.set.Logger)
o.queueSender = qs
factory := persistentqueue.NewFactory(persistentqueue.Config{
Config: queue.Config{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
},
StorageID: config.StorageID,
}, o.marshaler, o.unmarshaler)
qs := newQueueSender(o.set, o.signal, factory, o.queueSender.nextSender)
o.setOnTemporaryFailure(qs.onTemporaryFailure)
o.queueSender.sender = qs
}
}

// WithRequestQueue enables queueing for an exporter.
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithRequestQueue(queueFactory queue.Factory) Option {
return func(o *baseExporter) {
qs := newQueueSender(o.set, o.signal, queueFactory, o.queueSender.nextSender)
o.setOnTemporaryFailure(qs.onTemporaryFailure)
o.queueSender.sender = qs
}
}

Expand All @@ -138,8 +140,8 @@ type baseExporter struct {
component.ShutdownFunc

requestExporter bool
marshaler internal.RequestMarshaler
unmarshaler internal.RequestUnmarshaler
marshaler request.Marshaler
unmarshaler request.Unmarshaler
signal component.DataType

set exporter.CreateSettings
Expand All @@ -148,10 +150,10 @@ type baseExporter struct {
// Chain of senders that the exporter helper applies before passing the data to the actual exporter.
// The data is handled by each sender in the respective order starting from the queueSender.
// Most of the senders are optional, and initialized with a no-op path-through sender.
queueSender requestSender
obsrepSender requestSender
retrySender requestSender
timeoutSender *timeoutSender // timeoutSender is always initialized.
queueSender *baseRequestSender
obsrepSender *baseRequestSender
retrySender *baseRequestSender
timeoutSender *baseRequestSender

// onTemporaryFailure is a function that is called when the retrySender is unable to send data to the next consumer.
onTemporaryFailure onRequestHandlingFinishedFunc
Expand All @@ -160,8 +162,8 @@ type baseExporter struct {
}

// TODO: requestExporter, marshaler, and unmarshaler arguments can be removed when the old exporter helpers will be updated to call the new ones.
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler internal.RequestMarshaler,
unmarshaler internal.RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler request.Marshaler,
unmarshaler request.Unmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {

obsrep, err := newObsExporter(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments)
if err != nil {
Expand All @@ -174,43 +176,36 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req
unmarshaler: unmarshaler,
signal: signal,

queueSender: &baseRequestSender{},
obsrepSender: osf(obsrep),
retrySender: &baseRequestSender{},
timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutSettings()},

set: set,
obsrep: obsrep,
}

// Initialize the chain of senders in the reverse order.
be.timeoutSender = &baseRequestSender{sender: &timeoutSender{cfg: NewDefaultTimeoutSettings()}}
be.retrySender = &baseRequestSender{nextSender: be.timeoutSender}
be.obsrepSender = &baseRequestSender{sender: osf(obsrep, be.retrySender)}
be.queueSender = &baseRequestSender{nextSender: be.obsrepSender}

for _, op := range options {
op(be)
}
be.connectSenders()

return be, nil
}

// send sends the request using the first sender in the chain.
func (be *baseExporter) send(req internal.Request) error {
func (be *baseExporter) send(req *intrequest.Request) error {
return be.queueSender.send(req)
}

// connectSenders connects the senders in the predefined order.
func (be *baseExporter) connectSenders() {
be.queueSender.setNextSender(be.obsrepSender)
be.obsrepSender.setNextSender(be.retrySender)
be.retrySender.setNextSender(be.timeoutSender)
}

func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
if err := be.StartFunc.Start(ctx, host); err != nil {
return err
}

// If no error then start the queueSender.
return be.queueSender.start(ctx, host, be.set)
return be.queueSender.start(ctx, host)
}

func (be *baseExporter) Shutdown(ctx context.Context) error {
Expand All @@ -226,7 +221,7 @@ func (be *baseExporter) Shutdown(ctx context.Context) error {

func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) {
be.onTemporaryFailure = onTemporaryFailure
if rs, ok := be.retrySender.(*retrySender); ok {
if rs, ok := be.retrySender.sender.(*retrySender); ok {
rs.onTemporaryFailure = onTemporaryFailure
}
}
4 changes: 2 additions & 2 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ var (
}
)

func newNoopObsrepSender(_ *obsExporter) requestSender {
return &baseRequestSender{}
func newNoopObsrepSender(_ *obsExporter, nextSender *baseRequestSender) requestSender {
return &baseRequestSender{nextSender: nextSender}
}

func TestBaseExporter(t *testing.T) {
Expand Down
16 changes: 10 additions & 6 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sync/atomic"

"go.opentelemetry.io/collector/component"
intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/queue"
)

// boundedMemoryQueue implements a producer-consumer exchange similar to a ring buffer queue,
Expand All @@ -20,26 +22,28 @@ type boundedMemoryQueue struct {
stopWG sync.WaitGroup
size *atomic.Uint32
stopped *atomic.Bool
items chan Request
items chan *intrequest.Request
capacity uint32
numConsumers int
callback func(item *intrequest.Request)
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue(capacity int, numConsumers int) ProducerConsumerQueue {
func NewBoundedMemoryQueue(capacity int, numConsumers int, callback func(item *intrequest.Request)) queue.Queue {
return &boundedMemoryQueue{
items: make(chan Request, capacity),
items: make(chan *intrequest.Request, capacity),
stopped: &atomic.Bool{},
size: &atomic.Uint32{},
capacity: uint32(capacity),
numConsumers: numConsumers,
callback: callback,
}
}

// StartConsumers starts a given number of goroutines consuming items from the queue
// and passing them into the consumer callback.
func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set QueueSettings) error {
func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host) error {
var startWG sync.WaitGroup
for i := 0; i < q.numConsumers; i++ {
q.stopWG.Add(1)
Expand All @@ -49,7 +53,7 @@ func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set Queu
defer q.stopWG.Done()
for item := range q.items {
q.size.Add(^uint32(0))
set.Callback(item)
q.callback(item)
}
}()
}
Expand All @@ -58,7 +62,7 @@ func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set Queu
}

// Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
func (q *boundedMemoryQueue) Produce(item Request) bool {
func (q *boundedMemoryQueue) Produce(item *intrequest.Request) bool {
if q.stopped.Load() {
return false
}
Expand Down
Loading

0 comments on commit 0112f40

Please sign in to comment.