Skip to content

Commit

Permalink
[chore] [exporterhelper] Refactor queue initialization (#8284)
Browse files Browse the repository at this point in the history
Make the queue initialization process consistent for both queue types.
Instead of having different workflows for memory and persistent queues,
this change breaks the initialization of both into two generic steps:
1. Queue factory: `NewBoundedMemoryQueue`, `NewPersistentQueue`
2. Start method: `queue.Start(context.Context, component.Host,
QueueSettings)`

This change:
- reduces coupling between `queuedRetrySender` and the queues;
- allows future refactoring of `queuedRetrySender`;
- allows future extraction of the queue package from the exporterhelper;
- makes it possible to have `WithRequestQueue` option for the new
exporter helper API as drafted in
#8275.
  • Loading branch information
dmitryax authored Aug 30, 2023
1 parent 4fbf6c0 commit 7030388
Show file tree
Hide file tree
Showing 15 changed files with 372 additions and 370 deletions.
37 changes: 18 additions & 19 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,39 +56,31 @@ func (req *baseRequest) OnProcessingFinished() {
}
}

type queueSettings struct {
config QueueSettings
marshaler internal.RequestMarshaler
unmarshaler internal.RequestUnmarshaler
}

func (qs *queueSettings) persistenceEnabled() bool {
return qs.config.StorageID != nil && qs.marshaler != nil && qs.unmarshaler != nil
}

// baseSettings represents all the options that users can configure.
type baseSettings struct {
component.StartFunc
component.ShutdownFunc
consumerOptions []consumer.Option
TimeoutSettings
queueSettings
queue internal.ProducerConsumerQueue
RetrySettings
requestExporter bool
marshaler internal.RequestMarshaler
unmarshaler internal.RequestUnmarshaler
}

// newBaseSettings returns the baseSettings starting from the default and applying all configured options.
// requestExporter indicates whether the base settings are for a new request exporter or not.
func newBaseSettings(requestExporter bool, options ...Option) *baseSettings {
// TODO: The first three arguments will be removed when the old exporter helpers will be updated to call the new ones.
func newBaseSettings(requestExporter bool, marshaler internal.RequestMarshaler,
unmarshaler internal.RequestUnmarshaler, options ...Option) *baseSettings {
bs := &baseSettings{
requestExporter: requestExporter,
TimeoutSettings: NewDefaultTimeoutSettings(),
// TODO: Enable queuing by default (call DefaultQueueSettings)
queueSettings: queueSettings{
config: QueueSettings{Enabled: false},
},
// TODO: Enable retry by default (call DefaultRetrySettings)
RetrySettings: RetrySettings{Enabled: false},
marshaler: marshaler,
unmarshaler: unmarshaler,
}

for _, op := range options {
Expand Down Expand Up @@ -141,7 +133,14 @@ func WithQueue(config QueueSettings) Option {
if o.requestExporter {
panic("queueing is not available for the new request exporters yet")
}
o.queueSettings.config = config
if !config.Enabled {
return
}
if config.StorageID == nil {
o.queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
return
}
o.queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler)
}
}

Expand Down Expand Up @@ -172,7 +171,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo
return nil, err
}

be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queue, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.sender = be.qrSender
be.StartFunc = func(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
Expand All @@ -181,7 +180,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo
}

// If no error then start the queuedRetrySender.
return be.qrSender.start(ctx, host)
return be.qrSender.start(ctx, host, set)
}
be.ShutdownFunc = func(ctx context.Context) error {
// First shutdown the queued retry sender
Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ var (
)

func TestBaseExporter(t *testing.T) {
be, err := newBaseExporter(defaultSettings, newBaseSettings(false), "")
be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil), "")
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
be, err = newBaseExporter(defaultSettings, newBaseSettings(true), "")
be, err = newBaseExporter(defaultSettings, newBaseSettings(true, nil, nil), "")
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -47,7 +47,7 @@ func TestBaseExporterWithOptions(t *testing.T) {
be, err := newBaseExporter(
defaultSettings,
newBaseSettings(
false,
false, nil, nil,
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutSettings())),
Expand Down
40 changes: 27 additions & 13 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,55 @@
package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
"context"
"sync"
"sync/atomic"

"go.opentelemetry.io/collector/component"
)

// boundedMemoryQueue implements a producer-consumer exchange similar to a ring buffer queue,
// where the queue is bounded and if it fills up due to slow consumers, the new items written by
// the producer are dropped.
type boundedMemoryQueue struct {
stopWG sync.WaitGroup
size *atomic.Uint32
stopped *atomic.Bool
items chan Request
capacity uint32
stopWG sync.WaitGroup
size *atomic.Uint32
stopped *atomic.Bool
items chan Request
capacity uint32
numConsumers int
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue(capacity int) ProducerConsumerQueue {
func NewBoundedMemoryQueue(capacity int, numConsumers int) ProducerConsumerQueue {
return &boundedMemoryQueue{
items: make(chan Request, capacity),
stopped: &atomic.Bool{},
size: &atomic.Uint32{},
capacity: uint32(capacity),
items: make(chan Request, capacity),
stopped: &atomic.Bool{},
size: &atomic.Uint32{},
capacity: uint32(capacity),
numConsumers: numConsumers,
}
}

// StartConsumers starts a given number of goroutines consuming items from the queue
// and passing them into the consumer callback.
func (q *boundedMemoryQueue) StartConsumers(numWorkers int, callback func(item Request)) {
func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set QueueSettings) error {
var startWG sync.WaitGroup
for i := 0; i < numWorkers; i++ {
for i := 0; i < q.numConsumers; i++ {
q.stopWG.Add(1)
startWG.Add(1)
go func() {
startWG.Done()
defer q.stopWG.Done()
for item := range q.items {
q.size.Add(^uint32(0))
callback(item)
set.Callback(item)
}
}()
}
startWG.Wait()
return nil
}

// Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
Expand Down Expand Up @@ -87,3 +93,11 @@ func (q *boundedMemoryQueue) Stop() {
func (q *boundedMemoryQueue) Size() int {
return int(q.size.Load())
}

func (q *boundedMemoryQueue) Capacity() int {
return int(q.capacity)
}

func (q *boundedMemoryQueue) IsPersistent() bool {
return false
}
49 changes: 21 additions & 28 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package internal

import (
"context"
"reflect"
"sync"
"sync/atomic"
Expand All @@ -14,8 +15,20 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exportertest"
)

func newNopQueueSettings(callback func(item Request)) QueueSettings {
return QueueSettings{
CreateSettings: exportertest.NewNopCreateSettings(),
DataType: component.DataTypeMetrics,
Callback: callback,
}
}

type stringRequest struct {
Request
str string
Expand All @@ -29,7 +42,7 @@ func newStringRequest(str string) Request {
// We want to test the overflow behavior, so we block the consumer
// by holding a startLock before submitting items to the queue.
func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerFn func(item Request))) {
q := NewBoundedMemoryQueue(1)
q := NewBoundedMemoryQueue(1, 1)

var startLock sync.Mutex

Expand Down Expand Up @@ -88,7 +101,7 @@ func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerF

func TestBoundedQueue(t *testing.T) {
helper(t, func(q ProducerConsumerQueue, consumerFn func(item Request)) {
q.StartConsumers(1, consumerFn)
assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(consumerFn)))
})
}

Expand All @@ -99,14 +112,14 @@ func TestBoundedQueue(t *testing.T) {
// only after Stop will mean the consumers are still locked while
// trying to perform the final consumptions.
func TestShutdownWhileNotEmpty(t *testing.T) {
q := NewBoundedMemoryQueue(10)
q := NewBoundedMemoryQueue(10, 1)

consumerState := newConsumerState(t)

q.StartConsumers(1, func(item Request) {
assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {
consumerState.record(item.(stringRequest).str)
time.Sleep(1 * time.Second)
})
})))

q.Produce(newStringRequest("a"))
q.Produce(newStringRequest("b"))
Expand Down Expand Up @@ -183,30 +196,10 @@ func (s *consumerState) assertConsumed(expected map[string]bool) {
}

func TestZeroSize(t *testing.T) {
q := NewBoundedMemoryQueue(0)
q := NewBoundedMemoryQueue(0, 1)

q.StartConsumers(1, func(item Request) {
})
err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {}))
assert.NoError(t, err)

assert.False(t, q.Produce(newStringRequest("a"))) // in process
}

func BenchmarkBoundedQueue(b *testing.B) {
q := NewBoundedMemoryQueue(1000)

q.StartConsumers(10, func(item Request) {})

for n := 0; n < b.N; n++ {
q.Produce(newStringRequest("a"))
}
}

func BenchmarkBoundedQueueWithFactory(b *testing.B) {
q := NewBoundedMemoryQueue(1000)

q.StartConsumers(10, func(item Request) {})

for n := 0; n < b.N; n++ {
q.Produce(newStringRequest("a"))
}
}
4 changes: 4 additions & 0 deletions exporter/exporterhelper/internal/mock_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ import (
type mockStorageExtension struct {
component.StartFunc
component.ShutdownFunc
getClientError error
}

func (m mockStorageExtension) GetClient(_ context.Context, _ component.Kind, _ component.ID, _ string) (storage.Client, error) {
if m.getClientError != nil {
return nil, m.getClientError
}
return &mockStorageClient{st: map[string][]byte{}}, nil
}

Expand Down
Loading

0 comments on commit 7030388

Please sign in to comment.