Skip to content

Commit

Permalink
[exporter] internal/persistent_queue::OnProcessingFinished is changed…
Browse files Browse the repository at this point in the history
… to a class function instead of a callback (open-telemetry#11338)

#### Description

Why this change?
Each request from the queue contains multiple items, and those items
could be merge-split into multiple batches when they are sent out (see
open-telemetry#8122
for more about exporter batcher). We would like to book-keep those
cases, and only call `onProcessingFinished` when all such batches has
gone out. In this PR, `onProcessingFinished` is changed from a callback
to a method function because it is easier to book keep index instead of
functions.

#### Link to tracking issue
open-telemetry#8122
open-telemetry#10368

#### Testing
`exporter/internal/queue/persistent_queue_test.go`

#### Documentation

This is an internal change invisible to the users.

---------

Co-authored-by: Dmitrii Anoshin <[email protected]>
  • Loading branch information
2 people authored and jackgopack4 committed Oct 8, 2024
1 parent cf18ade commit e959731
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 42 deletions.
5 changes: 5 additions & 0 deletions exporter/internal/queue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) err
return true
}

// Should be called to remove the item of the given index from the queue once processing is finished.
// For in memory queue, this function is noop.
func (q *boundedMemoryQueue[T]) OnProcessingFinished(uint64, error) {
}

// Shutdown closes the queue channel to initiate draining of the queue.
func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
q.sizedChannel.shutdown()
Expand Down
80 changes: 43 additions & 37 deletions exporter/internal/queue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,15 @@ func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) (
func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
for {
var (
req T
onProcessingFinished func(error)
consumed bool
index uint64
req T
consumed bool
)

// If we are stopped we still process all the other events in the channel before, but we
// return fast in the `getNextItem`, so we will free the channel fast and get to the stop.
_, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 {
req, onProcessingFinished, consumed = pq.getNextItem(context.Background())
index, req, consumed = pq.getNextItem(context.Background())
if !consumed {
return 0
}
Expand All @@ -213,7 +213,8 @@ func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error
return false
}
if consumed {
onProcessingFinished(consumeFunc(context.Background(), req))
consumeErr := consumeFunc(context.Background(), req)
pq.OnProcessingFinished(index, consumeErr)
return true
}
}
Expand Down Expand Up @@ -303,20 +304,21 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
return nil
}

// getNextItem pulls the next available item from the persistent storage along with a callback function that should be
// called after the item is processed to clean up the storage. If no new item is available, returns false.
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), bool) {
// getNextItem pulls the next available item from the persistent storage along with its index. Once processing is
// finished, the index should be called with OnProcessingFinished to clean up the storage. If no new item is available,
// returns false.
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) {
pq.mu.Lock()
defer pq.mu.Unlock()

var request T

if pq.stopped {
return request, nil, false
return 0, request, false
}

if pq.readIndex == pq.writeIndex {
return request, nil, false
return 0, request, false
}

index := pq.readIndex
Expand All @@ -340,45 +342,49 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error),
pq.logger.Error("Error deleting item from queue", zap.Error(err))
}

return request, nil, false
return 0, request, false
}

// Increase the reference count, so the client is not closed while the request is being processed.
// The client cannot be closed because we hold the lock since last we checked `stopped`.
pq.refClient++
return request, func(consumeErr error) {
// Delete the item from the persistent storage after it was processed.
pq.mu.Lock()
// Always unref client even if the consumer is shutdown because we always ref it for every valid request.
defer func() {
if err = pq.unrefClient(ctx); err != nil {
pq.logger.Error("Error closing the storage client", zap.Error(err))
}
pq.mu.Unlock()
}()

if experr.IsShutdownErr(consumeErr) {
// The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart.
// TODO: Handle partially delivered requests by updating their values in the storage.
return
}
return index, request, true
}

if err = pq.itemDispatchingFinish(ctx, index); err != nil {
pq.logger.Error("Error deleting item from queue", zap.Error(err))
// Should be called to remove the item of the given index from the queue once processing is finished.
func (pq *persistentQueue[T]) OnProcessingFinished(index uint64, consumeErr error) {
// Delete the item from the persistent storage after it was processed.
pq.mu.Lock()
// Always unref client even if the consumer is shutdown because we always ref it for every valid request.
defer func() {
if err := pq.unrefClient(context.Background()); err != nil {
pq.logger.Error("Error closing the storage client", zap.Error(err))
}
pq.mu.Unlock()
}()

// Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
if (pq.readIndex % 10) == 0 {
if qsErr := pq.backupQueueSize(ctx); qsErr != nil {
pq.logger.Error("Error writing queue size to storage", zap.Error(err))
}
if experr.IsShutdownErr(consumeErr) {
// The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart.
// TODO: Handle partially delivered requests by updating their values in the storage.
return
}

if err := pq.itemDispatchingFinish(context.Background(), index); err != nil {
pq.logger.Error("Error deleting item from queue", zap.Error(err))
}

// Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
if (pq.readIndex % 10) == 0 {
if qsErr := pq.backupQueueSize(context.Background()); qsErr != nil {
pq.logger.Error("Error writing queue size to storage", zap.Error(qsErr))
}
}

// Ensure the used size and the channel size are in sync.
pq.sizedChannel.syncSize()
// Ensure the used size and the channel size are in sync.
pq.sizedChannel.syncSize()

}, true
}

// retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage
Expand Down
10 changes: 5 additions & 5 deletions exporter/internal/queue/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,19 +463,19 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) {
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{})

// Takes index 0 in process.
readReq, _, found := ps.getNextItem(context.Background())
_, readReq, found := ps.getNextItem(context.Background())
require.True(t, found)
assert.Equal(t, req, readReq)
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0})

// This takes item 1 to process.
secondReadReq, onProcessingFinished, found := ps.getNextItem(context.Background())
secondIndex, secondReadReq, found := ps.getNextItem(context.Background())
require.True(t, found)
assert.Equal(t, req, secondReadReq)
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0, 1})

// Lets mark item 1 as finished, it will remove it from the currently dispatched items list.
onProcessingFinished(nil)
ps.OnProcessingFinished(secondIndex, nil)
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0})

// Reload the storage. Since items 0 was not finished, this should be re-enqueued at the end.
Expand Down Expand Up @@ -736,12 +736,12 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) {

require.NoError(t, ps.Offer(context.Background(), newTracesRequest(5, 10)))

_, onProcessingFinished, ok := ps.getNextItem(context.Background())
index, _, ok := ps.getNextItem(context.Background())
require.True(t, ok)
assert.False(t, ps.client.(*mockStorageClient).isClosed())
require.NoError(t, ps.Shutdown(context.Background()))
assert.False(t, ps.client.(*mockStorageClient).isClosed())
onProcessingFinished(nil)
ps.OnProcessingFinished(index, nil)
assert.True(t, ps.client.(*mockStorageClient).isClosed())
}

Expand Down
2 changes: 2 additions & 0 deletions exporter/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Queue[T any] interface {
Size() int
// Capacity returns the capacity of the queue.
Capacity() int
// Should be called to remove the item of the given index from the queue once processing is finished.
OnProcessingFinished(index uint64, consumeErr error)
}

type itemsCounter interface {
Expand Down

0 comments on commit e959731

Please sign in to comment.