Skip to content

Commit

Permalink
Merge branch 'main' into issue_6365
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv authored Dec 17, 2024
2 parents 938c959 + 590c017 commit 732745e
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 116 deletions.
2 changes: 2 additions & 0 deletions components/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ import (
"github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter"
"github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporterv2"
"github.com/SigNoz/signoz-otel-collector/exporter/clickhousetracesexporter"
"github.com/SigNoz/signoz-otel-collector/exporter/metadataexporter"
"github.com/SigNoz/signoz-otel-collector/exporter/signozkafkaexporter"
signozhealthcheckextension "github.com/SigNoz/signoz-otel-collector/extension/healthcheckextension"
_ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok"
Expand Down Expand Up @@ -325,6 +326,7 @@ func Components() (otelcol.Factories, error) {
googlecloudpubsubexporter.NewFactory(),
kafkaexporter.NewFactory(),
loadbalancingexporter.NewFactory(),
metadataexporter.NewFactory(),
opencensusexporter.NewFactory(),
prometheusexporter.NewFactory(),
prometheusremotewriteexporter.NewFactory(),
Expand Down
51 changes: 30 additions & 21 deletions exporter/clickhouselogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,34 +155,43 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro

func (e *clickhouseLogsExporter) Start(ctx context.Context, host component.Host) error {
e.fetchShouldSkipKeysTicker = time.NewTicker(e.fetchKeysInterval)
go e.fetchShouldSkipKeys()
go func() {
e.doFetchShouldSkipKeys() // Immediate first fetch
e.fetchShouldSkipKeys() // Start ticker routine
}()
return nil
}

func (e *clickhouseLogsExporter) fetchShouldSkipKeys() {
for range e.fetchShouldSkipKeysTicker.C {
query := fmt.Sprintf(`
SELECT tag_key, tag_type, tag_data_type, countDistinct(string_value) as string_count, countDistinct(number_value) as number_count
FROM %s.%s
GROUP BY tag_key, tag_type, tag_data_type
HAVING string_count > %d OR number_count > %d`, databaseName, DISTRIBUTED_TAG_ATTRIBUTES_V2, e.maxDistinctValues, e.maxDistinctValues)
func (e *clickhouseLogsExporter) doFetchShouldSkipKeys() {
query := fmt.Sprintf(`
SELECT tag_key, tag_type, tag_data_type, countDistinct(string_value) as string_count, countDistinct(number_value) as number_count
FROM %s.%s
WHERE unix_milli >= (toUnixTimestamp(now() - toIntervalHour(6)) * 1000)
GROUP BY tag_key, tag_type, tag_data_type
HAVING string_count > %d OR number_count > %d
SETTINGS max_threads = 2`, databaseName, DISTRIBUTED_TAG_ATTRIBUTES_V2, e.maxDistinctValues, e.maxDistinctValues)

e.logger.Info("fetching should skip keys", zap.String("query", query))
e.logger.Info("fetching should skip keys", zap.String("query", query))

keys := []shouldSkipKey{}
keys := []shouldSkipKey{}

err := e.db.Select(context.Background(), &keys, query)
if err != nil {
e.logger.Error("error while fetching should skip keys", zap.Error(err))
}
err := e.db.Select(context.Background(), &keys, query)
if err != nil {
e.logger.Error("error while fetching should skip keys", zap.Error(err))
}

shouldSkipKeys := make(map[string]shouldSkipKey)
for _, key := range keys {
mapKey := utils.MakeKeyForAttributeKeys(key.TagKey, utils.TagType(key.TagType), utils.TagDataType(key.TagDataType))
e.logger.Debug("adding to should skip keys", zap.String("key", mapKey), zap.Any("string_count", key.StringCount), zap.Any("number_count", key.NumberCount))
shouldSkipKeys[mapKey] = key
}
e.shouldSkipKeyValue.Store(shouldSkipKeys)
shouldSkipKeys := make(map[string]shouldSkipKey)
for _, key := range keys {
mapKey := utils.MakeKeyForAttributeKeys(key.TagKey, utils.TagType(key.TagType), utils.TagDataType(key.TagDataType))
e.logger.Debug("adding to should skip keys", zap.String("key", mapKey), zap.Any("string_count", key.StringCount), zap.Any("number_count", key.NumberCount))
shouldSkipKeys[mapKey] = key
}
e.shouldSkipKeyValue.Store(shouldSkipKeys)
}

func (e *clickhouseLogsExporter) fetchShouldSkipKeys() {
for range e.fetchShouldSkipKeysTicker.C {
e.doFetchShouldSkipKeys()
}
}

Expand Down
54 changes: 33 additions & 21 deletions exporter/clickhousetracesexporter/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,34 +151,46 @@ func NewSpanWriter(options WriterOptions) *SpanWriter {
fetchShouldSkipKeysTicker: time.NewTicker(options.fetchKeysInterval),
}

go writer.fetchShouldSkipKeys()
// Fetch keys immediately, then start the background ticker routine
go func() {
writer.doFetchShouldSkipKeys() // Immediate first fetch
writer.fetchShouldSkipKeys() // Start ticker routine
}()

return writer
}

func (e *SpanWriter) fetchShouldSkipKeys() {
for range e.fetchShouldSkipKeysTicker.C {
query := fmt.Sprintf(`
SELECT tag_key, tag_type, tag_data_type, countDistinct(string_value) as string_count, countDistinct(number_value) as number_count
FROM %s.%s
GROUP BY tag_key, tag_type, tag_data_type
HAVING string_count > %d OR number_count > %d`, e.traceDatabase, e.attributeTableV2, e.maxDistinctValues, e.maxDistinctValues)
// doFetchShouldSkipKeys contains the logic for fetching skip keys
func (e *SpanWriter) doFetchShouldSkipKeys() {
query := fmt.Sprintf(`
SELECT tag_key, tag_type, tag_data_type, countDistinct(string_value) as string_count, countDistinct(number_value) as number_count
FROM %s.%s
WHERE unix_milli >= (toUnixTimestamp(now() - toIntervalHour(6)) * 1000)
GROUP BY tag_key, tag_type, tag_data_type
HAVING string_count > %d OR number_count > %d
SETTINGS max_threads = 2`, e.traceDatabase, e.attributeTableV2, e.maxDistinctValues, e.maxDistinctValues)

e.logger.Info("fetching should skip keys", zap.String("query", query))
e.logger.Info("fetching should skip keys", zap.String("query", query))

keys := []shouldSkipKey{}
keys := []shouldSkipKey{}

err := e.db.Select(context.Background(), &keys, query)
if err != nil {
e.logger.Error("error while fetching should skip keys", zap.Error(err))
}
err := e.db.Select(context.Background(), &keys, query)
if err != nil {
e.logger.Error("error while fetching should skip keys", zap.Error(err))
}

shouldSkipKeys := make(map[string]shouldSkipKey)
for _, key := range keys {
mapKey := utils.MakeKeyForAttributeKeys(key.TagKey, utils.TagType(key.TagType), utils.TagDataType(key.TagDataType))
e.logger.Debug("adding to should skip keys", zap.String("key", mapKey), zap.Any("string_count", key.StringCount), zap.Any("number_count", key.NumberCount))
shouldSkipKeys[mapKey] = key
}
e.shouldSkipKeyValue.Store(shouldSkipKeys)
shouldSkipKeys := make(map[string]shouldSkipKey)
for _, key := range keys {
mapKey := utils.MakeKeyForAttributeKeys(key.TagKey, utils.TagType(key.TagType), utils.TagDataType(key.TagDataType))
e.logger.Debug("adding to should skip keys", zap.String("key", mapKey), zap.Any("string_count", key.StringCount), zap.Any("number_count", key.NumberCount))
shouldSkipKeys[mapKey] = key
}
e.shouldSkipKeyValue.Store(shouldSkipKeys)
}

func (e *SpanWriter) fetchShouldSkipKeys() {
for range e.fetchShouldSkipKeysTicker.C {
e.doFetchShouldSkipKeys()
}
}

Expand Down
Loading

0 comments on commit 732745e

Please sign in to comment.