-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
866c7fc
commit 7be491d
Showing
3 changed files
with
264 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" | ||
|
||
import ( | ||
"context" | ||
"math" | ||
"sync" | ||
"time" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/exporter/exporterbatcher" | ||
"go.opentelemetry.io/collector/exporter/internal" | ||
) | ||
|
||
type batch struct { | ||
ctx context.Context | ||
req internal.Request | ||
idxList []uint64 | ||
} | ||
|
||
type Batcher struct { | ||
batchCfg exporterbatcher.Config | ||
|
||
queue Queue[internal.Request] | ||
maxWorkers int | ||
|
||
exportFunc func(context.Context, internal.Request) error | ||
|
||
readingBatch *batch | ||
timer *time.Timer | ||
shutdownCh chan bool | ||
|
||
stopWG sync.WaitGroup | ||
} | ||
|
||
func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], | ||
maxWorkers int, exportFunc func(context.Context, internal.Request) error) *Batcher { | ||
return &Batcher{ | ||
batchCfg: batchCfg, | ||
queue: queue, | ||
maxWorkers: maxWorkers, | ||
exportFunc: exportFunc, | ||
stopWG: sync.WaitGroup{}, | ||
shutdownCh: make(chan bool, 1), | ||
} | ||
} | ||
|
||
// If preconditions pass, flush() take an item from the head of batch list and exports it. | ||
func (qb *Batcher) flush(batchToFlush batch) { | ||
err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req) | ||
for _, idx := range batchToFlush.idxList { | ||
qb.queue.OnProcessingFinished(idx, err) | ||
} | ||
} | ||
|
||
// allocateFlusher() starts a goroutine that calls flushIfNecessary(). It blocks until a worker is available. | ||
func (qb *Batcher) allocateFlusher(batchToFlush batch) { | ||
// maxWorker = 0 means we don't limit the number of flushers. | ||
if qb.maxWorkers == 0 { | ||
qb.stopWG.Add(1) | ||
go func() { | ||
qb.flush(batchToFlush) | ||
qb.stopWG.Done() | ||
}() | ||
return | ||
} | ||
panic("not implemented") | ||
} | ||
|
||
// Start ensures that queue and all consumers are started. | ||
func (qb *Batcher) Start(ctx context.Context, host component.Host) error { | ||
if err := qb.queue.Start(ctx, host); err != nil { | ||
return err | ||
} | ||
|
||
if qb.batchCfg.Enabled { | ||
panic("not implemented") | ||
} | ||
|
||
// Timer doesn't do anything yet, but adding it so compiler won't complain about qb.timer.C | ||
qb.timer = time.NewTimer(math.MaxInt) | ||
qb.timer.Stop() | ||
|
||
if qb.maxWorkers != 0 { | ||
panic("not implemented") | ||
} | ||
|
||
// This goroutine keeps reading until flush is triggered because of request size. | ||
qb.stopWG.Add(1) | ||
go func() { | ||
defer qb.stopWG.Done() | ||
for { | ||
idx, _, req, ok := qb.queue.Read(context.Background()) | ||
|
||
if !ok { | ||
qb.shutdownCh <- true | ||
if qb.readingBatch != nil { | ||
panic("batching is supported yet so reading batch should always be nil") | ||
} | ||
|
||
return | ||
} | ||
if !qb.batchCfg.Enabled { | ||
qb.readingBatch = &batch{ | ||
req: req, | ||
ctx: context.Background(), | ||
idxList: []uint64{idx}} | ||
qb.allocateFlusher(*qb.readingBatch) | ||
qb.readingBatch = nil | ||
} else { | ||
panic("not implemented") | ||
} | ||
} | ||
}() | ||
|
||
qb.stopWG.Add(1) | ||
go func() { | ||
defer qb.stopWG.Done() | ||
for { | ||
select { | ||
case <-qb.shutdownCh: | ||
return | ||
case <-qb.timer.C: | ||
panic("batching is not yet implemented. Having timer here just so compiler won't complain") | ||
} | ||
|
||
} | ||
}() | ||
return nil | ||
} | ||
|
||
// Shutdown ensures that queue and all Batcher are stopped. | ||
func (qb *Batcher) Shutdown(ctx context.Context) error { | ||
if err := qb.queue.Shutdown(ctx); err != nil { | ||
return err | ||
} | ||
qb.stopWG.Wait() | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package queue | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"go.opentelemetry.io/collector/component/componenttest" | ||
"go.opentelemetry.io/collector/exporter/exporterbatcher" | ||
"go.opentelemetry.io/collector/exporter/internal" | ||
) | ||
|
||
func testExportFunc(ctx context.Context, req internal.Request) error { | ||
return req.Export(ctx) | ||
} | ||
|
||
func TestBatcher_BatchNotEnabled_InfiniteWorkerPool(t *testing.T) { | ||
cfg := exporterbatcher.NewDefaultConfig() | ||
cfg.Enabled = false | ||
|
||
q := NewBoundedMemoryQueue[internal.Request]( | ||
MemoryQueueSettings[internal.Request]{ | ||
Sizer: &RequestSizer[internal.Request]{}, | ||
Capacity: 10, | ||
}) | ||
|
||
maxWorkers := 0 | ||
ba := NewBatcher(cfg, q, maxWorkers, testExportFunc) | ||
|
||
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) | ||
t.Cleanup(func() { | ||
require.NoError(t, ba.Shutdown(context.Background())) | ||
}) | ||
|
||
sink := newFakeRequestSink() | ||
|
||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink})) | ||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink})) | ||
assert.Eventually(t, func() bool { | ||
return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 | ||
}, 20*time.Millisecond, 10*time.Millisecond) | ||
|
||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink})) | ||
assert.Eventually(t, func() bool { | ||
return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 25 | ||
}, 20*time.Millisecond, 10*time.Millisecond) | ||
|
||
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink})) | ||
|
||
assert.Eventually(t, func() bool { | ||
return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 38 | ||
}, 20*time.Millisecond, 10*time.Millisecond) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"sync/atomic" | ||
"time" | ||
|
||
"go.opentelemetry.io/collector/exporter/exporterbatcher" | ||
"go.opentelemetry.io/collector/exporter/internal" | ||
) | ||
|
||
type fakeRequestSink struct { | ||
requestsCount *atomic.Int64 | ||
itemsCount *atomic.Int64 | ||
} | ||
|
||
func newFakeRequestSink() *fakeRequestSink { | ||
return &fakeRequestSink{ | ||
requestsCount: new(atomic.Int64), | ||
itemsCount: new(atomic.Int64), | ||
} | ||
} | ||
|
||
type fakeRequest struct { | ||
items int | ||
exportErr error | ||
delay time.Duration | ||
sink *fakeRequestSink | ||
} | ||
|
||
func (r *fakeRequest) Export(ctx context.Context) error { | ||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
case <-time.After(r.delay): | ||
} | ||
if r.exportErr != nil { | ||
return r.exportErr | ||
} | ||
if r.sink != nil { | ||
r.sink.requestsCount.Add(1) | ||
r.sink.itemsCount.Add(int64(r.items)) | ||
} | ||
return nil | ||
} | ||
|
||
func (r *fakeRequest) ItemsCount() int { | ||
return r.items | ||
} | ||
|
||
func (r *fakeRequest) Merge(_ context.Context, | ||
_ internal.Request) (internal.Request, error) { | ||
return nil, errors.New("not implemented") | ||
} | ||
|
||
func (r *fakeRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, | ||
_ internal.Request) ([]internal.Request, error) { | ||
return nil, errors.New("not implemented") | ||
} |