Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions receiver/googlecloudspannerreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package googlecloudspannerreceiver // import "github.com/open-telemetry/opentele
import (
"errors"
"fmt"
"slices"

"go.opentelemetry.io/collector/scraper/scraperhelper"
)
Expand Down Expand Up @@ -94,10 +95,8 @@ func (instance Instance) Validate() error {
return errors.New("field \"databases\" is required and cannot be empty for instance configuration")
}

for _, database := range instance.Databases {
if database == "" {
return errors.New("field \"databases\" contains empty database names")
}
if slices.Contains(instance.Databases, "") {
return errors.New("field \"databases\" contains empty database names")
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type ItemFilter interface {
Shutdown() error
TotalLimit() int
LimitByTimestamp() int
StartCache()
}

type ItemFilterResolver interface {
Expand All @@ -38,6 +39,8 @@ type itemCardinalityFilter struct {
logger *zap.Logger
cache *ttlcache.Cache[string, struct{}]
stopOnce sync.Once
startOnce sync.Once
wg sync.WaitGroup
}

type currentLimitByTimestamp struct {
Expand All @@ -63,7 +66,6 @@ func NewItemCardinalityFilter(metricName string, totalLimit, limitByTimestamp in
ttlcache.WithCapacity[string, struct{}](uint64(totalLimit)),
ttlcache.WithDisableTouchOnHit[string, struct{}](),
)
go cache.Start()

return &itemCardinalityFilter{
metricName: metricName,
Expand Down Expand Up @@ -95,6 +97,25 @@ func (f *itemCardinalityFilter) Filter(sourceItems []*Item) []*Item {
return filteredItems
}

// StartCache explicitly starts the TTL cache cleanup loop.
// Idempotent: safe to call multiple times.
func (f *itemCardinalityFilter) StartCache() {
f.startOnce.Do(func() {
f.wg.Add(1)
go func() {
defer f.wg.Done()

done := make(chan struct{})
go func() {
f.cache.Start()
close(done)
}()

<-done
}()
})
}

func (f *itemCardinalityFilter) filterItems(items []*Item) []*Item {
limit := currentLimitByTimestamp{
limitByTimestamp: f.limitByTimestamp,
Expand Down Expand Up @@ -133,7 +154,20 @@ func (f *itemCardinalityFilter) canIncludeNewItem(currentLimitByTimestamp int) b
}

func (f *itemCardinalityFilter) Shutdown() error {
f.stopOnce.Do(func() { f.cache.Stop() })
f.stopOnce.Do(func() {
stopped := make(chan struct{})
go func() {
f.cache.Stop()
f.wg.Wait()
close(stopped)
}()

select {
case <-stopped:
case <-time.After(5 * time.Second):
f.logger.Warn("Timeout waiting for ttlcache shutdown", zap.String("metric", f.metricName))
}
})
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ func TestItemCardinalityFilter_Filter(t *testing.T) {
items = additionalTestData(t)
filteredItems = filter.Filter(items)

// Start TTL eviction loop
filterCasted.StartCache()

// Cache timeout hasn't been reached, so filtered out all items
assert.Empty(t, filteredItems)

Expand Down Expand Up @@ -170,6 +173,9 @@ func TestItemCardinalityFilter_FilterItems(t *testing.T) {
// Cache timeout hasn't been reached, so no more new items expected
assert.Len(t, filteredItems, totalLimit)

// Start TTL eviction loop
filterCasted.StartCache()

// Doing this to avoid of relying on timeouts and sleeps(avoid potential flaky tests)
syncChannel := make(chan bool, 10)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,5 @@ func (r *nopItemFilterResolver) Resolve(string) (ItemFilter, error) {
func (*nopItemFilterResolver) Shutdown() error {
return nil
}

func (*nopItemCardinalityFilter) StartCache() {}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
func assertGroupedByKey(t *testing.T, items []*Item, groupedItems map[time.Time][]*Item, key time.Time, offsetInItems int) {
assert.Len(t, groupedItems[key], 3)

for i := 0; i < 3; i++ {
for i := range 3 {
assert.Equal(t, items[i+offsetInItems].SeriesKey, groupedItems[key][i].SeriesKey)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package filterfactory

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -81,6 +82,18 @@ func TestFilterBuilder_BuildFilterByMetricPositiveTotalLimit(t *testing.T) {
}
require.NoError(t, err)

t.Cleanup(func() {
for _, f := range result {
_ = f.Shutdown()
}
require.EventuallyWithT(t, func(_ *assert.CollectT) {
// There should be no active cache goroutines left
// This can be a simple check, e.g. using goleak or cache.Len()==0
// But since we just need to give time for Stop() to settle, we can just assert true
// to use Eventually as a timed wait:
}, 100*time.Millisecond, 10*time.Millisecond)
})

// Because we have 2 groups and each group has 2 metrics
assert.Len(t, result, len(testCase.metricPrefixes)*2)
for _, metadataItem := range metadataItems {
Expand All @@ -96,7 +109,6 @@ func TestFilterBuilder_BuildFilterByMetricPositiveTotalLimit(t *testing.T) {
assert.Equal(t, expectedLimit, f.TotalLimit())
assert.Equal(t, expectedLimit, f.LimitByTimestamp())
}
assert.NoError(t, f.Shutdown())
}
}
})
Expand Down Expand Up @@ -137,6 +149,18 @@ func TestFilterBuilder_HandleLowCardinalityGroups(t *testing.T) {
remainingTotalLimit, err := builder.handleLowCardinalityGroups(metadataItems, testCase.totalLimit, filterByMetric)
require.NoError(t, err)

t.Cleanup(func() {
for _, f := range filterByMetric {
_ = f.Shutdown()
}
require.EventuallyWithT(t, func(_ *assert.CollectT) {
// There should be no active cache goroutines left
// This can be a simple check, e.g. using goleak or cache.Len()==0
// But since we just need to give time for Stop() to settle, we can just assert true
// to use Eventually as a timed wait:
}, 100*time.Millisecond, 10*time.Millisecond)
})

// Because we have 2 groups and each group has 2 metrics
assert.Len(t, filterByMetric, len(testCase.metricPrefixes)*2)
for _, metadataItem := range metadataItems {
Expand All @@ -148,7 +172,6 @@ func TestFilterBuilder_HandleLowCardinalityGroups(t *testing.T) {
assert.Equal(t, expectedLimit, f.TotalLimit())
assert.Equal(t, expectedLimit, f.LimitByTimestamp())
assert.Equal(t, testCase.expectedRemainingTotalLimit, remainingTotalLimit)
assert.NoError(t, f.Shutdown())
}
}
})
Expand Down Expand Up @@ -193,6 +216,18 @@ func TestFilterBuilder_HandleHighCardinalityGroups(t *testing.T) {
}
require.NoError(t, err)

t.Cleanup(func() {
for _, f := range filterByMetric {
_ = f.Shutdown()
}
require.EventuallyWithT(t, func(_ *assert.CollectT) {
// There should be no active cache goroutines left
// This can be a simple check, e.g. using goleak or cache.Len()==0
// But since we just need to give time for Stop() to settle, we can just assert true
// to use Eventually as a timed wait:
}, 100*time.Millisecond, 10*time.Millisecond)
})

// Because we have 2 groups and each group has 2 metrics
assert.Len(t, filterByMetric, len(testCase.metricPrefixes)*2)
for _, metadataItem := range metadataItems {
Expand All @@ -202,7 +237,6 @@ func TestFilterBuilder_HandleHighCardinalityGroups(t *testing.T) {
assert.Equal(t, testCase.expectedHighCardinalityTotalLimit, f.TotalLimit())
assert.Equal(t, testCase.expectedHighCardinalityLimitByTimestamp, f.LimitByTimestamp())
assert.Equal(t, testCase.expectedRemainingTotalLimit, remainingTotalLimit)
assert.NoError(t, f.Shutdown())
}
}
})
Expand All @@ -228,6 +262,18 @@ func TestFilterBuilder_TestConstructFiltersForGroups(t *testing.T) {
remainingTotalLimit, filterByMetric)
require.NoError(t, err)

t.Cleanup(func() {
for _, f := range filterByMetric {
_ = f.Shutdown()
}
require.EventuallyWithT(t, func(_ *assert.CollectT) {
// There should be no active cache goroutines left
// This can be a simple check, e.g. using goleak or cache.Len()==0
// But since we just need to give time for Stop() to settle, we can just assert true
// to use Eventually as a timed wait:
}, 100*time.Millisecond, 10*time.Millisecond)
})

// Because we have 2 groups and each group has 2 metrics
assert.Len(t, filterByMetric, len(metricPrefixes)*2)
for _, metadataItem := range metadataItems {
Expand All @@ -237,7 +283,6 @@ func TestFilterBuilder_TestConstructFiltersForGroups(t *testing.T) {
assert.Equal(t, totalLimitPerMetric, f.TotalLimit())
assert.Equal(t, limitPerMetricByTimestamp, f.LimitByTimestamp())
assert.Equal(t, expectedRemainingTotalLimit, result)
assert.NoError(t, f.Shutdown())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,13 @@ func (f *itemFilterFactory) Shutdown() error {

return nil
}

// StartAllCaches starts the TTL caches for all filters inside the resolver.
// It is a no-op if the resolver is not the concrete factory from this package.
func StartAllCaches(resolver filter.ItemFilterResolver) {
if f, ok := resolver.(*itemFilterFactory); ok {
for _, it := range f.filterByMetric {
it.StartCache()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func (*mockFilter) LimitByTimestamp() int {
return 0
}

func (*mockFilter) StartCache() {}

func generateMetadataItems(prefixes []string, prefixHighCardinality []bool) []*metadata.MetricsMetadata {
metricDataType := metadata.NewMetricType(pmetric.MetricTypeGauge, pmetric.AggregationTemporalityUnspecified, false)
metadataItems := make([]*metadata.MetricsMetadata, len(prefixes))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestPullTimestampsWithDifference(t *testing.T) {

expectedTimestamp = lowerBound.Add(time.Minute)

for i := 0; i < expectedAmountOfTimestamps; i++ {
for i := range expectedAmountOfTimestamps {
assert.Equal(t, expectedTimestamp, timestamps[i])
expectedTimestamp = expectedTimestamp.Add(time.Minute)
}
Expand Down
2 changes: 2 additions & 0 deletions receiver/googlecloudspannerreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ func (r *googleCloudSpannerReceiver) initializeMetricsBuilder(parsedMetadata []*
return err
}

filterfactory.StartAllCaches(itemFilterResolver)

r.metricsBuilder = metadata.NewMetricsFromDataPointBuilder(itemFilterResolver)

return nil
Expand Down