Skip to content

Commit

Permalink
more gaurd rails
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Dec 16, 2024
1 parent 605ccd0 commit 86d0fda
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 19 deletions.
3 changes: 2 additions & 1 deletion exporter/clickhouselogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ func (e *clickhouseLogsExporter) fetchShouldSkipKeys() {
SELECT tag_key, tag_type, tag_data_type, countDistinct(string_value) as string_count, countDistinct(number_value) as number_count
FROM %s.%s
GROUP BY tag_key, tag_type, tag_data_type
HAVING string_count > %d OR number_count > %d`, databaseName, DISTRIBUTED_TAG_ATTRIBUTES_V2, e.maxDistinctValues, e.maxDistinctValues)
HAVING string_count > %d OR number_count > %d
SETTINGS max_threads = 2`, databaseName, DISTRIBUTED_TAG_ATTRIBUTES_V2, e.maxDistinctValues, e.maxDistinctValues)

e.logger.Info("fetching should skip keys", zap.String("query", query))

Expand Down
3 changes: 2 additions & 1 deletion exporter/clickhousetracesexporter/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ func (e *SpanWriter) fetchShouldSkipKeys() {
SELECT tag_key, tag_type, tag_data_type, countDistinct(string_value) as string_count, countDistinct(number_value) as number_count
FROM %s.%s
GROUP BY tag_key, tag_type, tag_data_type
HAVING string_count > %d OR number_count > %d`, e.traceDatabase, e.attributeTableV2, e.maxDistinctValues, e.maxDistinctValues)
HAVING string_count > %d OR number_count > %d
SETTINGS max_threads = 2`, e.traceDatabase, e.attributeTableV2, e.maxDistinctValues, e.maxDistinctValues)

e.logger.Info("fetching should skip keys", zap.String("query", query))

Expand Down
66 changes: 49 additions & 17 deletions exporter/metadataexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import (
)

const (
sixHours = 6 * time.Hour // window size for attributes aggregation
sixHoursInMs = int64(sixHours / time.Millisecond) // window size in ms
maxValuesInCache = 3_000_000 // max number of values for fingerprint cache
valuTrackerKeysTTL = 45 * time.Minute // ttl for keys in value tracker
fetchKeysDistinctCountInterval = 15 * time.Minute
insertStmtQuery = "INSERT INTO signoz_metadata.distributed_attributes_metadata"
sixHours = 6 * time.Hour // window size for attributes aggregation
sixHoursInMs = int64(sixHours / time.Millisecond) // window size in ms
maxValuesInTracesCache = 500_000 // max number of values for traces fingerprint cache
maxValuesInMetricsCache = 2_000_000 // max number of values for metrics fingerprint cache
maxValuesInLogsCache = 500_000 // max number of values for logs fingerprint cache
valuTrackerKeysTTL = 45 * time.Minute // ttl for keys in value tracker
insertStmtQuery = "INSERT INTO signoz_metadata.distributed_attributes_metadata"
)

type tagValueCountFromDB struct {
Expand All @@ -42,8 +43,10 @@ type metadataExporter struct {
cfg Config
set exporter.Settings

conn driver.Conn
fingerprintCache *ttlcache.Cache[string, bool]
conn driver.Conn
tracesFingerprintCache *ttlcache.Cache[string, bool]
metricsFingerprintCache *ttlcache.Cache[string, bool]
logsFingerprintCache *ttlcache.Cache[string, bool]

tracesTracker *ValueTracker
metricsTracker *ValueTracker
Expand Down Expand Up @@ -87,12 +90,26 @@ func newMetadataExporter(cfg Config, set exporter.Settings) (*metadataExporter,
return nil, err
}

fingerprintCache := ttlcache.New[string, bool](
tracesFingerprintCache := ttlcache.New[string, bool](
ttlcache.WithTTL[string, bool](sixHours),
ttlcache.WithDisableTouchOnHit[string, bool](), // don't update the ttl when the item is accessed
ttlcache.WithCapacity[string, bool](maxValuesInCache), // max 3M items in the cache
ttlcache.WithDisableTouchOnHit[string, bool](), // don't update the ttl when the item is accessed
ttlcache.WithCapacity[string, bool](maxValuesInTracesCache), // max 1M items in the cache
)
go fingerprintCache.Start()
go tracesFingerprintCache.Start()

metricsFingerprintCache := ttlcache.New[string, bool](
ttlcache.WithTTL[string, bool](sixHours),
ttlcache.WithDisableTouchOnHit[string, bool](),
ttlcache.WithCapacity[string, bool](maxValuesInMetricsCache),
)
go metricsFingerprintCache.Start()

logsFingerprintCache := ttlcache.New[string, bool](
ttlcache.WithTTL[string, bool](sixHours),
ttlcache.WithDisableTouchOnHit[string, bool](),
ttlcache.WithCapacity[string, bool](maxValuesInLogsCache),
)
go logsFingerprintCache.Start()

tracesTracker := NewValueTracker(
int(cfg.MaxDistinctValues.Traces.MaxKeys),
Expand Down Expand Up @@ -139,7 +156,9 @@ func newMetadataExporter(cfg Config, set exporter.Settings) (*metadataExporter,
cfg: cfg,
set: set,
conn: conn,
fingerprintCache: fingerprintCache,
tracesFingerprintCache: tracesFingerprintCache,
metricsFingerprintCache: metricsFingerprintCache,
logsFingerprintCache: logsFingerprintCache,
tracesTracker: tracesTracker,
metricsTracker: metricsTracker,
logsTracker: logsTracker,
Expand Down Expand Up @@ -171,9 +190,11 @@ func (e *metadataExporter) Start(_ context.Context, host component.Host) error {
conn: e.conn,
query: `SELECT tag_key, tag_data_type, countDistinct(string_value) as string_value_count, countDistinct(number_value) as number_value_count
FROM signoz_logs.distributed_tag_attributes_v2
WHERE unix_milli >= toUnixTimestamp(now() - INTERVAL 6 HOUR) * 1000
GROUP BY tag_key, tag_data_type
ORDER BY number_value_count DESC, string_value_count DESC, tag_key
LIMIT 1 BY tag_key, tag_data_type`,
LIMIT 1 BY tag_key, tag_data_type
SETTINGS max_threads = 2`,
storeFunc: e.storeLogTagValues,
signalName: pipeline.SignalLogs.String(),
interval: e.cfg.MaxDistinctValues.Logs.FetchInterval,
Expand All @@ -187,9 +208,11 @@ func (e *metadataExporter) Start(_ context.Context, host component.Host) error {
conn: e.conn,
query: `SELECT tag_key, tag_data_type, countDistinct(string_value) as string_value_count, countDistinct(number_value) as number_value_count
FROM signoz_traces.distributed_tag_attributes_v2
WHERE unix_milli >= toUnixTimestamp(now() - INTERVAL 6 HOUR) * 1000
GROUP BY tag_key, tag_data_type
ORDER BY number_value_count DESC, string_value_count DESC, tag_key
LIMIT 1 BY tag_key, tag_data_type`,
LIMIT 1 BY tag_key, tag_data_type
SETTINGS max_threads = 2`,
storeFunc: e.storeTracesTagValues,
signalName: pipeline.SignalTraces.String(),
interval: e.cfg.MaxDistinctValues.Traces.FetchInterval,
Expand Down Expand Up @@ -451,8 +474,17 @@ func makeFingerprintCacheKey(a, b uint64, datasource string) string {

// writeToStmt writes the attributes to the statement
func (e *metadataExporter) writeToStmt(_ context.Context, stmt driver.Batch, ds pipeline.Signal, resourceFingerprint, fprint uint64, rAttrs, filtered map[string]any, roundedSixHrsUnixMilli int64) (bool, error) {
var cache *ttlcache.Cache[string, bool]
switch ds {
case pipeline.SignalTraces:
cache = e.tracesFingerprintCache
case pipeline.SignalMetrics:
cache = e.metricsFingerprintCache
case pipeline.SignalLogs:
cache = e.logsFingerprintCache
}
cacheKey := makeFingerprintCacheKey(fprint, uint64(roundedSixHrsUnixMilli), ds.String())
if item := e.fingerprintCache.Get(cacheKey); item != nil && item.Value() {
if item := cache.Get(cacheKey); item != nil && item.Value() {
return true, nil
}

Expand All @@ -467,7 +499,7 @@ func (e *metadataExporter) writeToStmt(_ context.Context, stmt driver.Batch, ds
return false, err
}

e.fingerprintCache.Set(cacheKey, true, ttlcache.DefaultTTL)
cache.Set(cacheKey, true, ttlcache.DefaultTTL)
return false, nil
}

Expand Down

0 comments on commit 86d0fda

Please sign in to comment.