Skip to content

Commit 6a15a74

Browse files
Fix monitoring metrics for individual collectors (#389)
* fix monitoring metrics for individual collectors Signed-off-by: Ananya Kumar Mallik <[email protected]> * add collector cache TTL Signed-off-by: Ananya Kumar Mallik <[email protected]> * reuse Signed-off-by: Ananya Kumar Mallik <[email protected]> * Add collector cache test Signed-off-by: Ananya Kumar Mallik <[email protected]> * Fix caching logic and TTL for cache Signed-off-by: Ananya Kumar Mallik <[email protected]> --------- Signed-off-by: Ananya Kumar Mallik <[email protected]>
1 parent 231987c commit 6a15a74

File tree

3 files changed

+173
-12
lines changed

3 files changed

+173
-12
lines changed

collectors/cache.go

+76
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,79 @@ func (d *descriptorCache) Store(prefix string, data []*monitoring.MetricDescript
7272
defer d.lock.Unlock()
7373
d.cache[prefix] = &entry
7474
}
75+
76+
// collectorCache is a cache for MonitoringCollectors
77+
type CollectorCache struct {
78+
cache map[string]*collectorCacheEntry
79+
lock sync.RWMutex
80+
ttl time.Duration
81+
}
82+
83+
// collectorCacheEntry is a cache entry for a MonitoringCollector
84+
type collectorCacheEntry struct {
85+
collector *MonitoringCollector
86+
expiry time.Time
87+
}
88+
89+
// NewCollectorCache returns a new CollectorCache with the given TTL
90+
func NewCollectorCache(ttl time.Duration) *CollectorCache {
91+
c := &CollectorCache{
92+
cache: make(map[string]*collectorCacheEntry),
93+
ttl: ttl,
94+
}
95+
96+
go c.cleanup()
97+
return c
98+
}
99+
100+
// Get returns a MonitoringCollector if the key is found and not expired
101+
// If key is found it resets the TTL for the collector
102+
func (c *CollectorCache) Get(key string) (*MonitoringCollector, bool) {
103+
c.lock.RLock()
104+
defer c.lock.RUnlock()
105+
106+
entry, ok := c.cache[key]
107+
108+
if !ok {
109+
return nil, false
110+
}
111+
112+
if time.Now().After(entry.expiry) {
113+
delete(c.cache, key)
114+
return nil, false
115+
}
116+
117+
entry.expiry = time.Now().Add(c.ttl)
118+
return entry.collector, true
119+
}
120+
121+
func (c *CollectorCache) Store(key string, collector *MonitoringCollector) {
122+
entry := &collectorCacheEntry{
123+
collector: collector,
124+
expiry: time.Now().Add(c.ttl),
125+
}
126+
127+
c.lock.Lock()
128+
defer c.lock.Unlock()
129+
c.cache[key] = entry
130+
}
131+
132+
func (c *CollectorCache) cleanup() {
133+
ticker := time.NewTicker(5 * time.Minute)
134+
defer ticker.Stop()
135+
for range ticker.C {
136+
c.removeExpired()
137+
}
138+
}
139+
140+
func (c *CollectorCache) removeExpired() {
141+
c.lock.Lock()
142+
defer c.lock.Unlock()
143+
144+
now := time.Now()
145+
for key, entry := range c.cache {
146+
if now.After(entry.expiry) {
147+
delete(c.cache, key)
148+
}
149+
}
150+
}

collectors/cache_test.go

+53
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,56 @@ func TestDescriptorCache(t *testing.T) {
7474
t.Error("cache entries should have expired")
7575
}
7676
}
77+
78+
func TestCollectorCache(t *testing.T) {
79+
createCollector := func(id string) *MonitoringCollector {
80+
return &MonitoringCollector{
81+
projectID: id,
82+
}
83+
}
84+
85+
t.Run("basic cache Op", func(t *testing.T) {
86+
ttl := 1 * time.Second
87+
cache := NewCollectorCache(ttl)
88+
collector := createCollector("test-project")
89+
key := "test-key"
90+
91+
cache.Store(key, collector)
92+
93+
if _, found := cache.Get("test-key"); !found {
94+
t.Error("Collector should be available in cache before TTL")
95+
}
96+
97+
time.Sleep(2 * ttl)
98+
if _, found := cache.Get("test-key"); found {
99+
t.Error("Collector should have expired")
100+
}
101+
})
102+
103+
t.Run("multiple collectors", func(t *testing.T) {
104+
ttl := 1 * time.Second
105+
cache := NewCollectorCache(ttl)
106+
107+
collectors := map[string]*MonitoringCollector{
108+
"test-key-1": createCollector("test-project-1"),
109+
"test-key-2": createCollector("test-project-2"),
110+
"test-key-3": createCollector("test-project-3"),
111+
}
112+
113+
for k, v := range collectors {
114+
cache.Store(k, v)
115+
}
116+
117+
for k, original := range collectors {
118+
cached, found := cache.Get(k)
119+
if !found {
120+
t.Errorf("Collector %s not found in cache", k)
121+
continue
122+
}
123+
124+
if cached.projectID != original.projectID {
125+
t.Errorf("Wrong collector for key %s. Got projectId %s, want %s", k, cached.projectID, original.projectID)
126+
}
127+
}
128+
})
129+
}

stackdriver_exporter.go

+44-12
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"os"
2121
"slices"
2222
"strings"
23+
"time"
2324

2425
"github.com/PuerkitoBio/rehttp"
2526
"github.com/alecthomas/kingpin/v2"
@@ -185,6 +186,7 @@ type handler struct {
185186
metricsExtraFilters []collectors.MetricFilter
186187
additionalGatherer prometheus.Gatherer
187188
m *monitoring.Service
189+
collectors *collectors.CollectorCache
188190
}
189191

190192
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -203,35 +205,65 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
203205
}
204206

205207
func newHandler(projectIDs []string, metricPrefixes []string, metricExtraFilters []collectors.MetricFilter, m *monitoring.Service, logger *slog.Logger, additionalGatherer prometheus.Gatherer) *handler {
208+
var ttl time.Duration
209+
// Add collector caching TTL as max of deltas aggregation or descriptor caching
210+
if *monitoringMetricsAggregateDeltas || *monitoringDescriptorCacheTTL > 0 {
211+
ttl = *monitoringMetricsDeltasTTL
212+
if *monitoringDescriptorCacheTTL > ttl {
213+
ttl = *monitoringDescriptorCacheTTL
214+
}
215+
} else {
216+
ttl = 2 * time.Hour
217+
}
218+
219+
logger.Info("Creating collector cache", "ttl", ttl)
220+
206221
h := &handler{
207222
logger: logger,
208223
projectIDs: projectIDs,
209224
metricsPrefixes: metricPrefixes,
210225
metricsExtraFilters: metricExtraFilters,
211226
additionalGatherer: additionalGatherer,
212227
m: m,
228+
collectors: collectors.NewCollectorCache(ttl),
213229
}
214230

215231
h.handler = h.innerHandler(nil)
216232
return h
217233
}
218234

235+
func (h *handler) getCollector(project string, filters map[string]bool) (*collectors.MonitoringCollector, error) {
236+
filterdPrefixes := h.filterMetricTypePrefixes(filters)
237+
collectorKey := fmt.Sprintf("%s-%v", project, filterdPrefixes)
238+
239+
if collector, found := h.collectors.Get(collectorKey); found {
240+
return collector, nil
241+
}
242+
243+
collector, err := collectors.NewMonitoringCollector(project, h.m, collectors.MonitoringCollectorOptions{
244+
MetricTypePrefixes: filterdPrefixes,
245+
ExtraFilters: h.metricsExtraFilters,
246+
RequestInterval: *monitoringMetricsInterval,
247+
RequestOffset: *monitoringMetricsOffset,
248+
IngestDelay: *monitoringMetricsIngestDelay,
249+
FillMissingLabels: *collectorFillMissingLabels,
250+
DropDelegatedProjects: *monitoringDropDelegatedProjects,
251+
AggregateDeltas: *monitoringMetricsAggregateDeltas,
252+
DescriptorCacheTTL: *monitoringDescriptorCacheTTL,
253+
DescriptorCacheOnlyGoogle: *monitoringDescriptorCacheOnlyGoogle,
254+
}, h.logger, delta.NewInMemoryCounterStore(h.logger, *monitoringMetricsDeltasTTL), delta.NewInMemoryHistogramStore(h.logger, *monitoringMetricsDeltasTTL))
255+
if err != nil {
256+
return nil, err
257+
}
258+
h.collectors.Store(collectorKey, collector)
259+
return collector, nil
260+
}
261+
219262
func (h *handler) innerHandler(filters map[string]bool) http.Handler {
220263
registry := prometheus.NewRegistry()
221264

222265
for _, project := range h.projectIDs {
223-
monitoringCollector, err := collectors.NewMonitoringCollector(project, h.m, collectors.MonitoringCollectorOptions{
224-
MetricTypePrefixes: h.filterMetricTypePrefixes(filters),
225-
ExtraFilters: h.metricsExtraFilters,
226-
RequestInterval: *monitoringMetricsInterval,
227-
RequestOffset: *monitoringMetricsOffset,
228-
IngestDelay: *monitoringMetricsIngestDelay,
229-
FillMissingLabels: *collectorFillMissingLabels,
230-
DropDelegatedProjects: *monitoringDropDelegatedProjects,
231-
AggregateDeltas: *monitoringMetricsAggregateDeltas,
232-
DescriptorCacheTTL: *monitoringDescriptorCacheTTL,
233-
DescriptorCacheOnlyGoogle: *monitoringDescriptorCacheOnlyGoogle,
234-
}, h.logger, delta.NewInMemoryCounterStore(h.logger, *monitoringMetricsDeltasTTL), delta.NewInMemoryHistogramStore(h.logger, *monitoringMetricsDeltasTTL))
266+
monitoringCollector, err := h.getCollector(project, filters)
235267
if err != nil {
236268
h.logger.Error("error creating monitoring collector", "err", err)
237269
os.Exit(1)

0 commit comments

Comments
 (0)