Skip to content

Commit

Permalink
Stop wrting to v1 tag attributes table (#487)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv authored Dec 16, 2024
1 parent 3856ccb commit 2854986
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 124 deletions.
92 changes: 8 additions & 84 deletions exporter/clickhouselogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
var insertLogsStmtV2 driver.Batch
var insertResourcesStmtV2 driver.Batch
var statement driver.Batch
var tagStatement driver.Batch
var tagStatementV2 driver.Batch
var attributeKeysStmt driver.Batch
var resourceKeysStmt driver.Batch
Expand All @@ -311,9 +310,6 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
if statement != nil {
_ = statement.Abort()
}
if tagStatement != nil {
_ = tagStatement.Abort()
}
if insertLogsStmtV2 != nil {
_ = insertLogsStmtV2.Abort()
}
Expand Down Expand Up @@ -342,11 +338,6 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
}
}

tagStatement, err = e.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", databaseName, DISTRIBUTED_TAG_ATTRIBUTES), driver.WithReleaseConnection())
if err != nil {
return fmt.Errorf("PrepareTagBatch:%w", err)
}

tagStatementV2, err = e.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", databaseName, DISTRIBUTED_TAG_ATTRIBUTES_V2), driver.WithReleaseConnection())
if err != nil {
return fmt.Errorf("PrepareTagBatchV2:%w", err)
Expand Down Expand Up @@ -385,7 +376,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
}
resourceJson := string(serializedRes)

err = e.addAttrsToTagStatement(tagStatement, tagStatementV2, attributeKeysStmt, resourceKeysStmt, utils.TagTypeResource, resources, e.useNewSchema, shouldSkipKeys)
err = e.addAttrsToTagStatement(tagStatementV2, attributeKeysStmt, resourceKeysStmt, utils.TagTypeResource, resources, e.useNewSchema, shouldSkipKeys)
if err != nil {
return err
}
Expand All @@ -401,7 +392,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
scopeAttributes := attributesToSlice(scope.Attributes(), true)
scopeMap := attributesToMap(scope.Attributes(), true)

err := e.addAttrsToTagStatement(tagStatement, tagStatementV2, attributeKeysStmt, resourceKeysStmt, utils.TagTypeScope, scopeAttributes, e.useNewSchema, shouldSkipKeys)
err := e.addAttrsToTagStatement(tagStatementV2, attributeKeysStmt, resourceKeysStmt, utils.TagTypeScope, scopeAttributes, e.useNewSchema, shouldSkipKeys)
if err != nil {
return err
}
Expand Down Expand Up @@ -445,7 +436,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
attributes := attributesToSlice(r.Attributes(), false)
attrsMap := attributesToMap(r.Attributes(), false)

err = e.addAttrsToTagStatement(tagStatement, tagStatementV2, attributeKeysStmt, resourceKeysStmt, utils.TagTypeAttribute, attributes, e.useNewSchema, shouldSkipKeys)
err = e.addAttrsToTagStatement(tagStatementV2, attributeKeysStmt, resourceKeysStmt, utils.TagTypeAttribute, attributes, e.useNewSchema, shouldSkipKeys)
if err != nil {
return err
}
Expand Down Expand Up @@ -581,20 +572,6 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(usage.TagTenantKey, k), tag.Upsert(usage.TagExporterIdKey, e.id.String())}, ExporterSigNozSentLogRecords.M(int64(v.Count)), ExporterSigNozSentLogRecordsBytes.M(int64(v.Size)))
}

// push tag attributes
tagWriteStart := time.Now()
err = tagStatement.Send()
stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(exporterKey, pipeline.SignalLogs.String()),
tag.Upsert(tableKey, DISTRIBUTED_TAG_ATTRIBUTES),
},
writeLatencyMillis.M(int64(time.Since(tagWriteStart).Milliseconds())),
)
if err != nil {
return err
}

return err
}
}
Expand Down Expand Up @@ -667,7 +644,6 @@ func (e *clickhouseLogsExporter) addAttrsToAttributeKeysStatement(
}

func (e *clickhouseLogsExporter) addAttrsToTagStatement(
statement driver.Batch,
tagStatementV2 driver.Batch,
attributeKeysStmt driver.Batch,
resourceKeysStmt driver.Batch,
Expand All @@ -683,20 +659,8 @@ func (e *clickhouseLogsExporter) addAttrsToTagStatement(
e.logger.Debug("key has been skipped", zap.String("key", key))
continue
}
err := statement.Append(
time.Now(),
v,
tagType,
"string",
attrs.StringValues[i],
nil,
nil,
)
if err != nil {
return fmt.Errorf("could not append string attribute to batch, err: %w", err)
}
e.addAttrsToAttributeKeysStatement(attributeKeysStmt, resourceKeysStmt, v, tagType, utils.TagDataTypeString)
err = tagStatementV2.Append(
err := tagStatementV2.Append(
unixMilli,
v,
tagType,
Expand All @@ -709,31 +673,14 @@ func (e *clickhouseLogsExporter) addAttrsToTagStatement(
}
}

intTypeName := "int64"
if useNewSchema {
intTypeName = "float64"
}
for i, v := range attrs.IntKeys {
key := utils.MakeKeyForAttributeKeys(v, tagType, utils.TagDataTypeNumber)
if _, ok := shouldSkipKeys[key]; ok {
e.logger.Debug("key has been skipped", zap.String("key", key))
continue
}

err := statement.Append(
time.Now(),
v,
tagType,
intTypeName,
nil,
nil,
attrs.IntValues[i],
)
if err != nil {
return fmt.Errorf("could not append number attribute to batch, err: %w", err)
}
e.addAttrsToAttributeKeysStatement(attributeKeysStmt, resourceKeysStmt, v, tagType, utils.TagDataTypeNumber)
err = tagStatementV2.Append(
err := tagStatementV2.Append(
unixMilli,
v,
tagType,
Expand All @@ -751,20 +698,8 @@ func (e *clickhouseLogsExporter) addAttrsToTagStatement(
e.logger.Debug("key has been skipped", zap.String("key", key))
continue
}
err := statement.Append(
time.Now(),
v,
tagType,
"float64",
nil,
nil,
attrs.FloatValues[i],
)
if err != nil {
return fmt.Errorf("could not append number attribute to batch, err: %w", err)
}
e.addAttrsToAttributeKeysStatement(attributeKeysStmt, resourceKeysStmt, v, tagType, utils.TagDataTypeNumber)
err = tagStatementV2.Append(
err := tagStatementV2.Append(
unixMilli,
v,
tagType,
Expand All @@ -782,20 +717,9 @@ func (e *clickhouseLogsExporter) addAttrsToTagStatement(
e.logger.Debug("key has been skipped", zap.String("key", key))
continue
}
err := statement.Append(
time.Now(),
v,
tagType,
"bool",
nil,
nil,
nil,
)
if err != nil {
return fmt.Errorf("could not append bool attribute to batch, err: %w", err)
}

e.addAttrsToAttributeKeysStatement(attributeKeysStmt, resourceKeysStmt, v, tagType, utils.TagDataTypeBool)
err = tagStatementV2.Append(
err := tagStatementV2.Append(
unixMilli,
v,
tagType,
Expand Down
43 changes: 3 additions & 40 deletions exporter/clickhousetracesexporter/writerV3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package clickhousetracesexporter

import (
"context"
"errors"
"fmt"
"strconv"
"sync"
Expand Down Expand Up @@ -137,7 +136,6 @@ func (w *SpanWriter) writeErrorBatchV3(ctx context.Context, batchSpans []*SpanV3

func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3) error {
var tagKeyStatement driver.Batch
var tagStatement driver.Batch
var tagStatementV2 driver.Batch
var err error
var shouldSkipKeys map[string]shouldSkipKey
Expand All @@ -150,17 +148,10 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3)
if tagKeyStatement != nil {
_ = tagKeyStatement.Abort()
}
if tagStatement != nil {
_ = tagStatement.Abort()
}
if tagStatementV2 != nil {
_ = tagStatementV2.Abort()
}
}()
tagStatement, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.attributeTable), driver.WithReleaseConnection())
if err != nil {
return fmt.Errorf("could not prepare batch for span attributes table due to error: %w", err)
}
tagKeyStatement, err = w.db.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s", w.traceDatabase, w.attributeKeyTable), driver.WithReleaseConnection())
if err != nil {
return fmt.Errorf("could not prepare batch for span attributes key table due to error: %w", err)
Expand Down Expand Up @@ -216,15 +207,6 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3)
unixMilli := (int64(span.StartTimeUnixNano/1e6) / 3600000) * 3600000

if spanAttribute.DataType == "string" {
err = tagStatement.Append(
time.Unix(0, int64(span.StartTimeUnixNano)),
spanAttribute.Key,
spanAttribute.TagType,
spanAttribute.DataType,
spanAttribute.StringValue,
nil,
spanAttribute.IsColumn,
)

if _, ok := shouldSkipKeys[v2Key]; !ok {
err = tagStatementV2.Append(
Expand All @@ -238,15 +220,6 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3)
}

} else if spanAttribute.DataType == "float64" {
err = tagStatement.Append(
time.Unix(0, int64(span.StartTimeUnixNano)),
spanAttribute.Key,
spanAttribute.TagType,
spanAttribute.DataType,
nil,
spanAttribute.NumberValue,
spanAttribute.IsColumn,
)
if _, ok = shouldSkipKeys[v2Key]; !ok {
err = tagStatementV2.Append(
unixMilli,
Expand All @@ -258,15 +231,6 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3)
)
}
} else if spanAttribute.DataType == "bool" {
err = tagStatement.Append(
time.Unix(0, int64(span.StartTimeUnixNano)),
spanAttribute.Key,
spanAttribute.TagType,
spanAttribute.DataType,
nil,
nil,
spanAttribute.IsColumn,
)
if _, ok = shouldSkipKeys[v2Key]; !ok {
err = tagStatementV2.Append(
unixMilli,
Expand All @@ -285,17 +249,16 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3)
}

tagStart := time.Now()
err1 := tagStatement.Send()
err2 := tagStatementV2.Send()
err = tagStatementV2.Send()
stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(exporterKey, pipeline.SignalTraces.String()),
tag.Upsert(tableKey, w.attributeTable),
},
writeLatencyMillis.M(int64(time.Since(tagStart).Milliseconds())),
)
if err1 != nil || err2 != nil {
return fmt.Errorf("could not write to span attributes table due to error: %w", errors.Join(err1, err2))
if err != nil {
return fmt.Errorf("could not write to span attributes table due to error: %w", err)
}

tagKeyStart := time.Now()
Expand Down

0 comments on commit 2854986

Please sign in to comment.