diff --git a/exporter/clickhouselogsexporter/exporter.go b/exporter/clickhouselogsexporter/exporter.go index 1e5be1ee..964ce378 100644 --- a/exporter/clickhouselogsexporter/exporter.go +++ b/exporter/clickhouselogsexporter/exporter.go @@ -38,9 +38,12 @@ import ( "go.opencensus.io/stats/view" "go.opencensus.io/tag" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" ) @@ -77,6 +80,8 @@ type clickhouseLogsExporter struct { wg *sync.WaitGroup closeChan chan struct{} + durationHistogram metric.Float64Histogram + useNewSchema bool keysCache *ttlcache.Cache[string, struct{}] @@ -88,7 +93,10 @@ type clickhouseLogsExporter struct { fetchShouldSkipKeysTicker *time.Ticker } -func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, error) { +func newExporter(set exporter.Settings, cfg *Config) (*clickhouseLogsExporter, error) { + logger := set.Logger + meter := set.MeterProvider.Meter("github.com/SigNoz/signoz-otel-collector/exporter/clickhouselogsexporter") + if err := cfg.Validate(); err != nil { return nil, err } @@ -118,6 +126,14 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro return nil, err } + durationHistogram, err := meter.Float64Histogram( + "exporter_db_write_latency", + metric.WithUnit("ms"), + metric.WithExplicitBucketBoundaries(250, 500, 750, 1000, 2000, 2500, 3000, 4000, 5000, 6000, 8000, 10000, 15000, 25000, 30000), + ) + if err != nil { + return nil, err + } // keys cache is used to avoid duplicate inserts for the same attribute key. keysCache := ttlcache.New[string, struct{}]( ttlcache.WithTTL[string, struct{}](240*time.Minute), @@ -136,7 +152,6 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro go rfCache.Start() return &clickhouseLogsExporter{ - id: id, db: client, insertLogsSQL: insertLogsSQL, insertLogsSQLV2: insertLogsSQLV2, @@ -146,6 +161,7 @@ func newExporter(logger *zap.Logger, cfg *Config) (*clickhouseLogsExporter, erro wg: new(sync.WaitGroup), closeChan: make(chan struct{}), useNewSchema: cfg.UseNewSchema, + durationHistogram: durationHistogram, keysCache: keysCache, rfCache: rfCache, maxDistinctValues: cfg.AttributesLimits.MaxDistinctValues, @@ -557,12 +573,13 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L // store the duration for send the data for i := 0; i < chLen; i++ { sendDuration := <-chDuration - stats.RecordWithTags(ctx, - []tag.Mutator{ - tag.Upsert(exporterKey, pipeline.SignalLogs.String()), - tag.Upsert(tableKey, sendDuration.Name), - }, - writeLatencyMillis.M(int64(sendDuration.duration.Milliseconds())), + e.durationHistogram.Record( + ctx, + float64(sendDuration.duration.Milliseconds()), + metric.WithAttributes( + attribute.String("table", sendDuration.Name), + attribute.String("exporter", pipeline.SignalLogs.String()), + ), ) } diff --git a/exporter/clickhouselogsexporter/factory.go b/exporter/clickhouselogsexporter/factory.go index 59809bee..fd1fe6a3 100644 --- a/exporter/clickhouselogsexporter/factory.go +++ b/exporter/clickhouselogsexporter/factory.go @@ -19,9 +19,6 @@ import ( "fmt" "time" - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" @@ -37,25 +34,8 @@ const ( tableName = "logs" ) -var ( - writeLatencyMillis = stats.Int64("exporter_db_write_latency", "Time taken (in millis) for exporter to write batch", "ms") - exporterKey = tag.MustNewKey("exporter") - tableKey = tag.MustNewKey("table") -) - // NewFactory creates a factory for Elastic exporter. func NewFactory() exporter.Factory { - writeLatencyDistribution := view.Distribution(100, 250, 500, 750, 1000, 2000, 4000, 8000, 16000, 32000, 64000, 128000, 256000, 512000) - - writeLatencyView := &view.View{ - Name: "exporter_db_write_latency", - Measure: writeLatencyMillis, - Description: writeLatencyMillis.Description(), - TagKeys: []tag.Key{exporterKey, tableKey}, - Aggregation: writeLatencyDistribution, - } - - view.Register(writeLatencyView) return exporter.NewFactory( component.MustNewType(typeStr), @@ -84,7 +64,7 @@ func createLogsExporter( cfg component.Config, ) (exporter.Logs, error) { c := cfg.(*Config) - exporter, err := newExporter(set.Logger, c) + exporter, err := newExporter(set, c) if err != nil { return nil, fmt.Errorf("cannot configure clickhouse logs exporter: %w", err) } diff --git a/exporter/clickhousemetricsexporter/clickhouse.go b/exporter/clickhousemetricsexporter/clickhouse.go index 560080d7..fe5f3235 100644 --- a/exporter/clickhousemetricsexporter/clickhouse.go +++ b/exporter/clickhousemetricsexporter/clickhouse.go @@ -31,11 +31,14 @@ import ( "github.com/jellydator/ttlcache/v3" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" - "github.com/sirupsen/logrus" "go.opencensus.io/stats" "go.opencensus.io/tag" + "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pipeline" semconv "go.opentelemetry.io/collector/semconv/v1.13.0" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.uber.org/zap" "github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/base" "github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/utils/timeseries" @@ -61,7 +64,7 @@ const ( // clickHouse implements storage interface for the ClickHouse. type clickHouse struct { conn clickhouse.Conn - l *logrus.Entry + l *zap.Logger database string maxTimeSeriesInQuery int @@ -80,6 +83,8 @@ type clickHouse struct { mWrittenTimeSeries prometheus.Counter exporterID uuid.UUID + + durationHistogram metric.Float64Histogram } type ClickHouseParams struct { @@ -92,10 +97,13 @@ type ClickHouseParams struct { WriteTSToV4 bool DisableV2 bool ExporterId uuid.UUID + Settings exporter.Settings } func NewClickHouse(params *ClickHouseParams) (base.Storage, error) { - l := logrus.WithField("component", "clickhouse") + + logger := params.Settings.Logger + meter := params.Settings.MeterProvider.Meter("github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter") options, err := clickhouse.ParseDSN(params.DSN) @@ -124,9 +132,19 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) { ) go cache.Start() + durationHistogram, err := meter.Float64Histogram( + "exporter_db_write_latency", + metric.WithDescription("Time taken to write data to ClickHouse"), + metric.WithUnit("ms"), + metric.WithExplicitBucketBoundaries(250, 500, 750, 1000, 2000, 2500, 3000, 4000, 5000, 6000, 8000, 10000, 15000, 25000, 30000), + ) + if err != nil { + return nil, err + } + ch := &clickHouse{ conn: conn, - l: l, + l: logger, database: options.Auth.Database, maxTimeSeriesInQuery: params.MaxTimeSeriesInQuery, cache: cache, @@ -139,10 +157,11 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) { Name: "written_time_series", Help: "Number of written time series.", }), - watcherInterval: params.WatcherInterval, - writeTSToV4: params.WriteTSToV4, - disableV2: params.DisableV2, - exporterID: params.ExporterId, + watcherInterval: params.WatcherInterval, + writeTSToV4: params.WriteTSToV4, + disableV2: params.DisableV2, + exporterID: params.ExporterId, + durationHistogram: durationHistogram, } go func() { @@ -176,7 +195,7 @@ func (ch *clickHouse) shardCountWatcher(ctx context.Context) { ch.timeSeriesRW.Lock() if ch.prevShardCount != shardCount { - ch.l.Infof("Shard count changed from %d to %d. Resetting time series map.", ch.prevShardCount, shardCount) + ch.l.Info("Shard count changed. Resetting time series map.", zap.Uint64("prev", ch.prevShardCount), zap.Uint64("current", shardCount)) ch.timeSeries = make(map[uint64]struct{}) } ch.prevShardCount = shardCount @@ -184,12 +203,12 @@ func (ch *clickHouse) shardCountWatcher(ctx context.Context) { return nil }() if err != nil { - ch.l.Error(err) + ch.l.Error("error getting shard count", zap.Error(err)) } select { case <-ctx.Done(): - ch.l.Warn(ctx.Err()) + ch.l.Warn("shard count watcher stopped", zap.Error(ctx.Err())) return case <-ticker.C: } @@ -254,7 +273,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr fingerprintToName[f][envLabel] = env } if len(fingerprints) != len(timeSeries) { - ch.l.Debugf("got %d fingerprints, but only %d of them were unique time series", len(fingerprints), len(timeSeries)) + ch.l.Debug("got fingerprints, but only unique time series", zap.Int("fingerprints", len(fingerprints)), zap.Int("time series", len(timeSeries))) } // find new time series @@ -299,11 +318,14 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr start := time.Now() err = statement.Send() - ctx, _ = tag.New(ctx, - tag.Upsert(exporterKey, pipeline.SignalMetrics.String()), - tag.Upsert(tableKey, DISTRIBUTED_TIME_SERIES_TABLE), + ch.durationHistogram.Record( + ctx, + float64(time.Since(start).Milliseconds()), + metric.WithAttributes( + attribute.String("exporter", pipeline.SignalMetrics.String()), + attribute.String("table", DISTRIBUTED_TIME_SERIES_TABLE), + ), ) - stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds()))) return err }() @@ -339,11 +361,14 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr } start := time.Now() err = statement.Send() - ctx, _ = tag.New(ctx, - tag.Upsert(exporterKey, pipeline.SignalMetrics.String()), - tag.Upsert(tableKey, DISTRIBUTED_SAMPLES_TABLE), + ch.durationHistogram.Record( + ctx, + float64(time.Since(start).Milliseconds()), + metric.WithAttributes( + attribute.String("exporter", pipeline.SignalMetrics.String()), + attribute.String("table", DISTRIBUTED_SAMPLES_TABLE), + ), ) - stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds()))) return err }() if err != nil { @@ -396,11 +421,14 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr start := time.Now() err = statement.Send() - ctx, _ = tag.New(ctx, - tag.Upsert(exporterKey, pipeline.SignalMetrics.String()), - tag.Upsert(tableKey, DISTRIBUTED_SAMPLES_TABLE_V4), + ch.durationHistogram.Record( + ctx, + float64(time.Since(start).Milliseconds()), + metric.WithAttributes( + attribute.String("exporter", pipeline.SignalMetrics.String()), + attribute.String("table", DISTRIBUTED_SAMPLES_TABLE_V4), + ), ) - stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds()))) return err }() @@ -451,11 +479,14 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr start := time.Now() err = statement.Send() - ctx, _ = tag.New(ctx, - tag.Upsert(exporterKey, pipeline.SignalMetrics.String()), - tag.Upsert(tableKey, DISTRIBUTED_TIME_SERIES_TABLE_V4), + ch.durationHistogram.Record( + ctx, + float64(time.Since(start).Milliseconds()), + metric.WithAttributes( + attribute.String("exporter", pipeline.SignalMetrics.String()), + attribute.String("table", DISTRIBUTED_TIME_SERIES_TABLE_V4), + ), ) - stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds()))) return err }() @@ -467,7 +498,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr n := len(newTimeSeries) if n != 0 { ch.mWrittenTimeSeries.Add(float64(n)) - ch.l.Debugf("Wrote %d new time series.", n) + ch.l.Debug("wrote new time series", zap.Int("count", n)) } err = func() error { @@ -540,11 +571,14 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr start := time.Now() err = statement.Send() - ctx, _ = tag.New(ctx, - tag.Upsert(exporterKey, pipeline.SignalMetrics.String()), - tag.Upsert(tableKey, DISTRIBUTED_EXP_HIST_TABLE), + ch.durationHistogram.Record( + ctx, + float64(time.Since(start).Milliseconds()), + metric.WithAttributes( + attribute.String("exporter", pipeline.SignalMetrics.String()), + attribute.String("table", DISTRIBUTED_EXP_HIST_TABLE), + ), ) - stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds()))) return err }() if err != nil { diff --git a/exporter/clickhousemetricsexporter/exporter.go b/exporter/clickhousemetricsexporter/exporter.go index d899a622..8704081a 100644 --- a/exporter/clickhousemetricsexporter/exporter.go +++ b/exporter/clickhousemetricsexporter/exporter.go @@ -95,6 +95,7 @@ func NewPrwExporter(cfg *Config, set exporter.Settings) (*PrwExporter, error) { WriteTSToV4: cfg.WriteTSToV4, DisableV2: cfg.DisableV2, ExporterId: id, + Settings: set, } ch, err := NewClickHouse(params) if err != nil { diff --git a/exporter/clickhousemetricsexporter/factory.go b/exporter/clickhousemetricsexporter/factory.go index 4fa867e7..d75f9ff2 100644 --- a/exporter/clickhousemetricsexporter/factory.go +++ b/exporter/clickhousemetricsexporter/factory.go @@ -20,9 +20,6 @@ import ( "time" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry" - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" @@ -36,26 +33,9 @@ const ( typeStr = "clickhousemetricswrite" ) -var ( - writeLatencyMillis = stats.Int64("exporter_db_write_latency", "Time taken (in millis) for exporter to write batch", "ms") - exporterKey = tag.MustNewKey("exporter") - tableKey = tag.MustNewKey("table") -) - // NewFactory creates a new Prometheus Remote Write exporter. func NewFactory() exporter.Factory { - writeLatencyDistribution := view.Distribution(100, 250, 500, 750, 1000, 2000, 4000, 8000, 16000, 32000, 64000, 128000, 256000, 512000) - - writeLatencyView := &view.View{ - Name: "exporter_db_write_latency", - Measure: writeLatencyMillis, - Description: writeLatencyMillis.Description(), - TagKeys: []tag.Key{exporterKey, tableKey}, - Aggregation: writeLatencyDistribution, - } - - view.Register(writeLatencyView) return exporter.NewFactory( component.MustNewType(typeStr), createDefaultConfig, diff --git a/exporter/clickhousemetricsexporterv2/exporter.go b/exporter/clickhousemetricsexporterv2/exporter.go index cc6a379b..72baad88 100644 --- a/exporter/clickhousemetricsexporterv2/exporter.go +++ b/exporter/clickhousemetricsexporterv2/exporter.go @@ -15,7 +15,10 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pipeline" semconv "go.opentelemetry.io/collector/semconv/v1.5.0" + "go.opentelemetry.io/otel/attribute" + metricapi "go.opentelemetry.io/otel/metric" "go.uber.org/zap" ) @@ -38,6 +41,7 @@ var ( type clickhouseMetricsExporter struct { cfg *Config logger *zap.Logger + meter metricapi.Meter cache *ttlcache.Cache[string, bool] conn clickhouse.Conn wg sync.WaitGroup @@ -47,6 +51,9 @@ type clickhouseMetricsExporter struct { timeSeriesSQL string expHistSQL string metadataSQL string + + processMetricsDuration metricapi.Float64Histogram + exportMetricsDuration metricapi.Float64Histogram } // sample represents a single metric sample @@ -127,6 +134,13 @@ func WithLogger(logger *zap.Logger) ExporterOption { } } +func WithMeter(meter metricapi.Meter) ExporterOption { + return func(e *clickhouseMetricsExporter) error { + e.meter = meter + return nil + } +} + func WithEnableExpHist(enableExpHist bool) ExporterOption { return func(e *clickhouseMetricsExporter) error { e.enableExpHist = enableExpHist @@ -187,6 +201,25 @@ func NewClickHouseExporter(opts ...ExporterOption) (*clickhouseMetricsExporter, chExporter.expHistSQL = fmt.Sprintf(expHistSQLTmpl, chExporter.cfg.Database, chExporter.cfg.ExpHistTable) chExporter.metadataSQL = fmt.Sprintf(metadataSQLTmpl, chExporter.cfg.Database, chExporter.cfg.MetadataTable) + var err error + chExporter.processMetricsDuration, err = chExporter.meter.Float64Histogram( + "exporter_prepare_metrics_duration", + metricapi.WithDescription("Time taken (in millis) for exporter to prepare metrics"), + ) + if err != nil { + return nil, err + } + + chExporter.exportMetricsDuration, err = chExporter.meter.Float64Histogram( + "exporter_db_write_latency", + metricapi.WithDescription("Time taken to write data to ClickHouse"), + metricapi.WithUnit("ms"), + metricapi.WithExplicitBucketBoundaries(250, 500, 750, 1000, 2000, 2500, 3000, 4000, 5000, 6000, 8000, 10000, 15000, 25000, 30000), + ) + if err != nil { + return nil, err + } + return chExporter, nil } @@ -725,6 +758,7 @@ func (c *clickhouseMetricsExporter) processExponentialHistogram(batch *writeBatc func (c *clickhouseMetricsExporter) prepareBatch(md pmetric.Metrics) *writeBatch { batch := &writeBatch{metaSeen: make(map[string]struct{})} + start := time.Now() for i := 0; i < md.ResourceMetrics().Len(); i++ { rm := md.ResourceMetrics().At(i) resAttrs := pcommon.NewMap() @@ -758,6 +792,10 @@ func (c *clickhouseMetricsExporter) prepareBatch(md pmetric.Metrics) *writeBatch } } } + c.processMetricsDuration.Record( + context.Background(), + float64(time.Since(start).Milliseconds()), + ) return batch } @@ -770,6 +808,19 @@ func (c *clickhouseMetricsExporter) PushMetrics(ctx context.Context, md pmetric. func (c *clickhouseMetricsExporter) writeBatch(ctx context.Context, batch *writeBatch) error { writeTimeSeries := func(ctx context.Context, timeSeries []ts) error { + start := time.Now() + + defer func() { + c.exportMetricsDuration.Record( + ctx, + float64(time.Since(start).Milliseconds()), + metricapi.WithAttributes( + attribute.String("table", c.cfg.TimeSeriesTable), + attribute.String("exporter", pipeline.SignalMetrics.String()), + ), + ) + }() + if len(timeSeries) == 0 { return nil } @@ -813,6 +864,19 @@ func (c *clickhouseMetricsExporter) writeBatch(ctx context.Context, batch *write } writeSamples := func(ctx context.Context, samples []sample) error { + start := time.Now() + + defer func() { + c.exportMetricsDuration.Record( + ctx, + float64(time.Since(start).Milliseconds()), + metricapi.WithAttributes( + attribute.String("table", c.cfg.SamplesTable), + attribute.String("exporter", pipeline.SignalMetrics.String()), + ), + ) + }() + if len(samples) == 0 { return nil } @@ -841,6 +905,19 @@ func (c *clickhouseMetricsExporter) writeBatch(ctx context.Context, batch *write } writeExpHist := func(ctx context.Context, expHist []exponentialHistogramSample) error { + start := time.Now() + + defer func() { + c.exportMetricsDuration.Record( + ctx, + float64(time.Since(start).Milliseconds()), + metricapi.WithAttributes( + attribute.String("table", c.cfg.ExpHistTable), + attribute.String("exporter", pipeline.SignalMetrics.String()), + ), + ) + }() + if len(expHist) == 0 { return nil } @@ -873,6 +950,19 @@ func (c *clickhouseMetricsExporter) writeBatch(ctx context.Context, batch *write } writeMetadata := func(ctx context.Context, metadata []metadata) error { + start := time.Now() + + defer func() { + c.exportMetricsDuration.Record( + ctx, + float64(time.Since(start).Milliseconds()), + metricapi.WithAttributes( + attribute.String("table", c.cfg.MetadataTable), + attribute.String("exporter", pipeline.SignalMetrics.String()), + ), + ) + }() + if len(metadata) == 0 { return nil } diff --git a/exporter/clickhousemetricsexporterv2/exporter_test.go b/exporter/clickhousemetricsexporterv2/exporter_test.go index 895704b2..80ea8be8 100644 --- a/exporter/clickhousemetricsexporterv2/exporter_test.go +++ b/exporter/clickhousemetricsexporterv2/exporter_test.go @@ -7,14 +7,21 @@ import ( "testing" chproto "github.com/ClickHouse/ch-go/proto" + "github.com/stretchr/testify/require" "github.com/zeebo/assert" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/otel/metric/noop" "go.uber.org/zap" ) func Test_prepareBatchGauge(t *testing.T) { metrics := generateGaugeMetrics(1, 1, 1, 1, 1) - exp := &clickhouseMetricsExporter{} + exp, err := NewClickHouseExporter( + WithLogger(zap.NewNop()), + WithConfig(&Config{}), + WithMeter(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporterv2")), + ) + require.NoError(t, err) batch := exp.prepareBatch(metrics) assert.NotNil(t, batch) expectedSamples := []sample{ @@ -75,7 +82,12 @@ func Test_prepareBatchGauge(t *testing.T) { func Test_prepareBatchSum(t *testing.T) { metrics := generateSumMetrics(1, 1, 1, 1, 1) - exp := &clickhouseMetricsExporter{} + exp, err := NewClickHouseExporter( + WithLogger(zap.NewNop()), + WithConfig(&Config{}), + WithMeter(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporterv2")), + ) + require.NoError(t, err) batch := exp.prepareBatch(metrics) assert.NotNil(t, batch) expectedSamples := []sample{ @@ -136,7 +148,12 @@ func Test_prepareBatchSum(t *testing.T) { func Test_prepareBatchHistogram(t *testing.T) { metrics := generateHistogramMetrics(1, 1, 1, 1, 1) - exp := &clickhouseMetricsExporter{} + exp, err := NewClickHouseExporter( + WithLogger(zap.NewNop()), + WithConfig(&Config{}), + WithMeter(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporterv2")), + ) + require.NoError(t, err) batch := exp.prepareBatch(metrics) assert.NotNil(t, batch) // there should be 4 (count, sum, min, max) + 20 (for each bucket) + 1 (for the inf bucket) = 25 samples @@ -314,7 +331,13 @@ func Test_prepareBatchHistogram(t *testing.T) { func Test_prepareBatchExponentialHistogram(t *testing.T) { metrics := generateExponentialHistogramMetrics(2, 1, 1, 1, 1) - exp := &clickhouseMetricsExporter{enableExpHist: true, logger: zap.NewNop()} + exp, err := NewClickHouseExporter( + WithEnableExpHist(true), + WithLogger(zap.NewNop()), + WithConfig(&Config{}), + WithMeter(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporterv2")), + ) + require.NoError(t, err) batch := exp.prepareBatch(metrics) assert.NotNil(t, batch) @@ -391,7 +414,12 @@ func Test_prepareBatchExponentialHistogram(t *testing.T) { func Test_prepareBatchSummary(t *testing.T) { metrics := generateSummaryMetrics(1, 2, 1, 1, 1) - exp := &clickhouseMetricsExporter{} + exp, err := NewClickHouseExporter( + WithLogger(zap.NewNop()), + WithConfig(&Config{}), + WithMeter(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporterv2")), + ) + require.NoError(t, err) batch := exp.prepareBatch(metrics) assert.NotNil(t, batch) @@ -456,7 +484,12 @@ func Benchmark_prepareBatchGauge(b *testing.B) { metrics := generateGaugeMetrics(10000, 10, 10, 10, 10) b.ResetTimer() b.ReportAllocs() - exp := &clickhouseMetricsExporter{} + exp, err := NewClickHouseExporter( + WithLogger(zap.NewNop()), + WithConfig(&Config{}), + WithMeter(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporterv2")), + ) + require.NoError(b, err) for i := 0; i < b.N; i++ { exp.prepareBatch(metrics) } @@ -468,7 +501,12 @@ func Benchmark_prepareBatchSum(b *testing.B) { metrics := generateSumMetrics(10000, 10, 10, 10, 10) b.ResetTimer() b.ReportAllocs() - exp := &clickhouseMetricsExporter{} + exp, err := NewClickHouseExporter( + WithLogger(zap.NewNop()), + WithConfig(&Config{}), + WithMeter(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporterv2")), + ) + require.NoError(b, err) for i := 0; i < b.N; i++ { exp.prepareBatch(metrics) } @@ -480,7 +518,12 @@ func Benchmark_prepareBatchHistogram(b *testing.B) { metrics := generateHistogramMetrics(1000, 10, 10, 10, 10) b.ResetTimer() b.ReportAllocs() - exp := &clickhouseMetricsExporter{} + exp, err := NewClickHouseExporter( + WithLogger(zap.NewNop()), + WithConfig(&Config{}), + WithMeter(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporterv2")), + ) + require.NoError(b, err) for i := 0; i < b.N; i++ { exp.prepareBatch(metrics) } @@ -492,7 +535,13 @@ func Benchmark_prepareBatchExponentialHistogram(b *testing.B) { metrics := generateExponentialHistogramMetrics(10000, 10, 10, 10, 10) b.ResetTimer() b.ReportAllocs() - exp := &clickhouseMetricsExporter{enableExpHist: true, logger: zap.NewNop()} + exp, err := NewClickHouseExporter( + WithEnableExpHist(true), + WithLogger(zap.NewNop()), + WithConfig(&Config{}), + WithMeter(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporterv2")), + ) + require.NoError(b, err) for i := 0; i < b.N; i++ { exp.prepareBatch(metrics) } @@ -504,7 +553,12 @@ func Benchmark_prepareBatchSummary(b *testing.B) { metrics := generateSummaryMetrics(10000, 10, 10, 10, 10) b.ResetTimer() b.ReportAllocs() - exp := &clickhouseMetricsExporter{} + exp, err := NewClickHouseExporter( + WithLogger(zap.NewNop()), + WithConfig(&Config{}), + WithMeter(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporterv2")), + ) + require.NoError(b, err) for i := 0; i < b.N; i++ { exp.prepareBatch(metrics) } diff --git a/exporter/clickhousemetricsexporterv2/factory.go b/exporter/clickhousemetricsexporterv2/factory.go index dbc3306c..4b91e617 100644 --- a/exporter/clickhousemetricsexporterv2/factory.go +++ b/exporter/clickhousemetricsexporterv2/factory.go @@ -5,9 +5,6 @@ import ( "errors" "github.com/ClickHouse/clickhouse-go/v2" - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" @@ -19,25 +16,9 @@ const ( typeStr = "clickhousemetricswritev2" ) -var ( - writeLatencyMillis = stats.Int64("exporter_db_write_latency", "Time taken (in millis) for exporter to write batch", "ms") - exporterKey = tag.MustNewKey("exporter") - tableKey = tag.MustNewKey("table") -) - // NewFactory creates a new ClickHouse Metrics exporter. func NewFactory() exporter.Factory { - writeLatencyDistribution := view.Distribution(100, 500, 750, 1000, 1500, 2000, 2500, 3000, 4000, 8000, 16000, 32000, 64000) - writeLatencyView := &view.View{ - Name: "exporter_db_write_latency", - Measure: writeLatencyMillis, - Description: writeLatencyMillis.Description(), - TagKeys: []tag.Key{exporterKey, tableKey}, - Aggregation: writeLatencyDistribution, - } - - view.Register(writeLatencyView) return exporter.NewFactory( component.MustNewType(typeStr), createDefaultConfig, diff --git a/exporter/clickhousetracesexporter/clickhouse_exporter.go b/exporter/clickhousetracesexporter/clickhouse_exporter.go index 73a42dca..8fb35340 100644 --- a/exporter/clickhousetracesexporter/clickhouse_exporter.go +++ b/exporter/clickhousetracesexporter/clickhouse_exporter.go @@ -31,6 +31,7 @@ import ( "github.com/google/uuid" "go.opencensus.io/stats/view" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" conventions "go.opentelemetry.io/collector/semconv/v1.5.0" @@ -43,7 +44,7 @@ const ( ) // Crete new exporter. -func newExporter(cfg component.Config, logger *zap.Logger) (*storage, error) { +func newExporter(cfg component.Config, logger *zap.Logger, settings exporter.Settings) (*storage, error) { if err := component.ValidateConfig(cfg); err != nil { return nil, err @@ -53,7 +54,7 @@ func newExporter(cfg component.Config, logger *zap.Logger) (*storage, error) { id := uuid.New() - f := ClickHouseNewFactory(id, *configClickHouse) + f := ClickHouseNewFactory(id, *configClickHouse, settings) err := f.Initialize(logger) if err != nil { diff --git a/exporter/clickhousetracesexporter/clickhouse_factory.go b/exporter/clickhousetracesexporter/clickhouse_factory.go index 2d30a263..4b124c7c 100644 --- a/exporter/clickhousetracesexporter/clickhouse_factory.go +++ b/exporter/clickhousetracesexporter/clickhouse_factory.go @@ -21,15 +21,15 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" "github.com/SigNoz/signoz-otel-collector/usage" "github.com/google/uuid" - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" + "go.opentelemetry.io/collector/exporter" + metricapi "go.opentelemetry.io/otel/metric" "go.uber.org/zap" ) // Factory implements storage.Factory for Clickhouse backend. type Factory struct { logger *zap.Logger + meter metricapi.Meter Options *Options db clickhouse.Conn archive clickhouse.Conn @@ -45,26 +45,11 @@ type Writer interface { type writerMaker func(WriterOptions) (Writer, error) -var ( - writeLatencyMillis = stats.Int64("exporter_db_write_latency", "Time taken (in millis) for exporter to write batch", "ms") - exporterKey = tag.MustNewKey("exporter") - tableKey = tag.MustNewKey("table") -) - // NewFactory creates a new Factory. -func ClickHouseNewFactory(exporterId uuid.UUID, config Config) *Factory { - writeLatencyDistribution := view.Distribution(100, 250, 500, 750, 1000, 2000, 4000, 8000, 16000, 32000, 64000, 128000, 256000, 512000) - - writeLatencyView := &view.View{ - Name: "exporter_db_write_latency", - Measure: writeLatencyMillis, - Description: writeLatencyMillis.Description(), - TagKeys: []tag.Key{exporterKey, tableKey}, - Aggregation: writeLatencyDistribution, - } +func ClickHouseNewFactory(exporterId uuid.UUID, config Config, settings exporter.Settings) *Factory { - view.Register(writeLatencyView) return &Factory{ + meter: settings.MeterProvider.Meter("github.com/SigNoz/signoz-otel-collector/exporter/clickhousetracesexporter"), Options: NewOptions(exporterId, config, primaryNamespace, config.UseNewSchema, archiveNamespace), // makeReader: func(db *clickhouse.Conn, operationsTable, indexTable, spansTable string) (spanstore.Reader, error) { // return store.NewTraceReader(db, operationsTable, indexTable, spansTable), nil @@ -111,6 +96,7 @@ func (f *Factory) CreateSpanWriter() (Writer, error) { cfg := f.Options.getPrimary() return f.makeWriter(WriterOptions{ logger: f.logger, + meter: f.meter, db: f.db, traceDatabase: cfg.TraceDatabase, spansTable: cfg.SpansTable, diff --git a/exporter/clickhousetracesexporter/factory.go b/exporter/clickhousetracesexporter/factory.go index 14d9543e..0bfc15e7 100644 --- a/exporter/clickhousetracesexporter/factory.go +++ b/exporter/clickhousetracesexporter/factory.go @@ -59,7 +59,7 @@ func createTracesExporter( ) (exporter.Traces, error) { c := cfg.(*Config) - oce, err := newExporter(cfg, params.Logger) + oce, err := newExporter(cfg, params.Logger, params) if err != nil { return nil, err } diff --git a/exporter/clickhousetracesexporter/writer.go b/exporter/clickhousetracesexporter/writer.go index a545c73f..ca201fce 100644 --- a/exporter/clickhousetracesexporter/writer.go +++ b/exporter/clickhousetracesexporter/writer.go @@ -32,6 +32,9 @@ import ( "go.opencensus.io/stats/view" "go.opencensus.io/tag" "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + metricapi "go.opentelemetry.io/otel/metric" "go.uber.org/zap" ) @@ -69,6 +72,7 @@ type SpanWriter struct { attributeKeyTable string encoding Encoding exporterId uuid.UUID + durationHistogram metricapi.Float64Histogram indexTableV3 string resourceTableV3 string @@ -86,6 +90,7 @@ type SpanWriter struct { type WriterOptions struct { logger *zap.Logger + meter metricapi.Meter db clickhouse.Conn traceDatabase string spansTable string @@ -110,6 +115,15 @@ func NewSpanWriter(options WriterOptions) *SpanWriter { return nil } + durationHistogram, err := options.meter.Float64Histogram( + "exporter_db_write_latency", + metric.WithDescription("Time taken to write data to ClickHouse"), + metric.WithUnit("ms"), + metric.WithExplicitBucketBoundaries(250, 500, 750, 1000, 2000, 2500, 3000, 4000, 5000, 6000, 8000, 10000, 15000, 25000, 30000), + ) + if err != nil { + return nil + } // keys cache is used to avoid duplicate inserts for the same attribute key. keysCache := ttlcache.New[string, struct{}]( ttlcache.WithTTL[string, struct{}](240*time.Minute), @@ -139,12 +153,12 @@ func NewSpanWriter(options WriterOptions) *SpanWriter { attributeKeyTable: options.attributeKeyTable, encoding: options.encoding, exporterId: options.exporterId, - - indexTableV3: options.indexTableV3, - resourceTableV3: options.resourceTableV3, - useNewSchema: options.useNewSchema, - keysCache: keysCache, - rfCache: rfCache, + durationHistogram: durationHistogram, + indexTableV3: options.indexTableV3, + resourceTableV3: options.resourceTableV3, + useNewSchema: options.useNewSchema, + keysCache: keysCache, + rfCache: rfCache, maxDistinctValues: options.maxDistinctValues, fetchKeysInterval: options.fetchKeysInterval, @@ -255,11 +269,14 @@ func (w *SpanWriter) writeIndexBatch(ctx context.Context, batchSpans []*Span) er err = statement.Send() - ctx, _ = tag.New(ctx, - tag.Upsert(exporterKey, pipeline.SignalTraces.String()), - tag.Upsert(tableKey, w.indexTable), + w.durationHistogram.Record( + ctx, + float64(time.Since(start).Milliseconds()), + metricapi.WithAttributes( + attribute.String("table", w.indexTable), + attribute.String("exporter", pipeline.SignalTraces.String()), + ), ) - stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds()))) return err } @@ -310,11 +327,14 @@ func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) er start := time.Now() err = statement.Send() - ctx, _ = tag.New(ctx, - tag.Upsert(exporterKey, pipeline.SignalTraces.String()), - tag.Upsert(tableKey, w.spansTable), + w.durationHistogram.Record( + ctx, + float64(time.Since(start).Milliseconds()), + metricapi.WithAttributes( + attribute.String("table", w.spansTable), + attribute.String("exporter", pipeline.SignalTraces.String()), + ), ) - stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds()))) if err != nil { return fmt.Errorf("could not send batch to model table: %w", err) } diff --git a/exporter/clickhousetracesexporter/writerV3.go b/exporter/clickhousetracesexporter/writerV3.go index b85a31a4..7119b2cb 100644 --- a/exporter/clickhousetracesexporter/writerV3.go +++ b/exporter/clickhousetracesexporter/writerV3.go @@ -15,6 +15,8 @@ import ( "go.opencensus.io/tag" "go.opentelemetry.io/collector/pipeline" semconv "go.opentelemetry.io/collector/semconv/v1.13.0" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" ) @@ -76,11 +78,14 @@ func (w *SpanWriter) writeIndexBatchV3(ctx context.Context, batchSpans []*SpanV3 err = statement.Send() - ctx, _ = tag.New(ctx, - tag.Upsert(exporterKey, pipeline.SignalTraces.String()), - tag.Upsert(tableKey, w.indexTableV3), + w.durationHistogram.Record( + ctx, + float64(time.Since(start).Milliseconds()), + metric.WithAttributes( + attribute.String("exporter", pipeline.SignalTraces.String()), + attribute.String("table", w.indexTableV3), + ), ) - stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds()))) return err } @@ -126,11 +131,14 @@ func (w *SpanWriter) writeErrorBatchV3(ctx context.Context, batchSpans []*SpanV3 err = statement.Send() - ctx, _ = tag.New(ctx, - tag.Upsert(exporterKey, pipeline.SignalTraces.String()), - tag.Upsert(tableKey, w.errorTable), + w.durationHistogram.Record( + ctx, + float64(time.Since(start).Milliseconds()), + metric.WithAttributes( + attribute.String("exporter", pipeline.SignalTraces.String()), + attribute.String("table", w.errorTable), + ), ) - stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds()))) return err } @@ -250,12 +258,13 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3) tagStart := time.Now() err = tagStatementV2.Send() - stats.RecordWithTags(ctx, - []tag.Mutator{ - tag.Upsert(exporterKey, pipeline.SignalTraces.String()), - tag.Upsert(tableKey, w.attributeTable), - }, - writeLatencyMillis.M(int64(time.Since(tagStart).Milliseconds())), + w.durationHistogram.Record( + ctx, + float64(time.Since(tagStart).Milliseconds()), + metric.WithAttributes( + attribute.String("exporter", pipeline.SignalTraces.String()), + attribute.String("table", w.attributeTable), + ), ) if err != nil { return fmt.Errorf("could not write to span attributes table due to error: %w", err) @@ -263,12 +272,13 @@ func (w *SpanWriter) writeTagBatchV3(ctx context.Context, batchSpans []*SpanV3) tagKeyStart := time.Now() err = tagKeyStatement.Send() - stats.RecordWithTags(ctx, - []tag.Mutator{ - tag.Upsert(exporterKey, pipeline.SignalTraces.String()), - tag.Upsert(tableKey, w.attributeKeyTable), - }, - writeLatencyMillis.M(int64(time.Since(tagKeyStart).Milliseconds())), + w.durationHistogram.Record( + ctx, + float64(time.Since(tagKeyStart).Milliseconds()), + metric.WithAttributes( + attribute.String("exporter", pipeline.SignalTraces.String()), + attribute.String("table", w.attributeKeyTable), + ), ) if err != nil { return fmt.Errorf("could not write to span attributes key table due to error: %w", err) @@ -371,11 +381,14 @@ func (w *SpanWriter) WriteResourcesV3(ctx context.Context, resourcesSeen map[int return fmt.Errorf("couldn't send resource fingerprints :%w", err) } - ctx, _ = tag.New(ctx, - tag.Upsert(exporterKey, pipeline.SignalTraces.String()), - tag.Upsert(tableKey, w.resourceTableV3), + w.durationHistogram.Record( + ctx, + float64(time.Since(start).Milliseconds()), + metric.WithAttributes( + attribute.String("exporter", pipeline.SignalTraces.String()), + attribute.String("table", w.resourceTableV3), + ), ) - stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds()))) return nil } diff --git a/go.mod b/go.mod index 3c99abfe..0b70c8c7 100644 --- a/go.mod +++ b/go.mod @@ -189,7 +189,6 @@ require ( github.com/prometheus/common v0.60.0 github.com/prometheus/prometheus v0.54.1 github.com/segmentio/ksuid v1.0.4 - github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.19.0 @@ -533,6 +532,7 @@ require ( github.com/signalfx/com_signalfx_metrics_protobuf v0.0.3 // indirect github.com/signalfx/sapm-proto v0.14.0 // indirect github.com/sijms/go-ora/v2 v2.8.19 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect github.com/snowflakedb/gosnowflake v1.11.1 // indirect github.com/soheilhy/cmux v0.1.5 // indirect github.com/sourcegraph/conc v0.3.0 // indirect @@ -695,9 +695,9 @@ require ( go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 // indirect go.opentelemetry.io/contrib/propagators/b3 v1.30.0 // indirect go.opentelemetry.io/contrib/zpages v0.55.0 // indirect - go.opentelemetry.io/otel v1.30.0 // indirect + go.opentelemetry.io/otel v1.30.0 go.opentelemetry.io/otel/exporters/prometheus v0.52.0 // indirect - go.opentelemetry.io/otel/metric v1.30.0 // indirect + go.opentelemetry.io/otel/metric v1.30.0 go.opentelemetry.io/otel/sdk v1.30.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect golang.org/x/crypto v0.31.0 // indirect @@ -714,7 +714,6 @@ require ( replace ( github.com/ClickHouse/ch-go v0.58.2 => github.com/SigNoz/ch-go v0.61.2-dd github.com/ClickHouse/clickhouse-go/v2 v2.15.0 => github.com/SigNoz/clickhouse-go/v2 v2.15.2 - github.com/golang-migrate/migrate/v4 => github.com/SigNoz/golang-migrate/v4 v4.16.4 github.com/vjeantet/grok => github.com/signoz/grok v1.0.3 // using 0.23.0 as there is an issue with 0.24.0 stats that results in diff --git a/receiver/signozkafkareceiver/factory.go b/receiver/signozkafkareceiver/factory.go index 9f8f483b..11db5bbe 100644 --- a/receiver/signozkafkareceiver/factory.go +++ b/receiver/signozkafkareceiver/factory.go @@ -7,7 +7,6 @@ import ( "context" "time" - "go.opencensus.io/stats/view" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" @@ -69,7 +68,6 @@ func WithLogsUnmarshalers(logsUnmarshalers ...LogsUnmarshaler) FactoryOption { // NewFactory creates Kafka receiver factory. func NewFactory(options ...FactoryOption) receiver.Factory { - _ = view.Register(MetricViews()...) f := &kafkaReceiverFactory{ tracesUnmarshalers: defaultTracesUnmarshalers(), diff --git a/receiver/signozkafkareceiver/kafka_receiver.go b/receiver/signozkafkareceiver/kafka_receiver.go index 66f250d9..9e061af8 100644 --- a/receiver/signozkafkareceiver/kafka_receiver.go +++ b/receiver/signozkafkareceiver/kafka_receiver.go @@ -12,13 +12,13 @@ import ( "time" "github.com/IBM/sarama" - "go.opencensus.io/stats" - "go.opencensus.io/tag" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "github.com/SigNoz/signoz-otel-collector/internal/kafka" @@ -139,6 +139,11 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro if err != nil { return err } + metrics, err := NewKafkaReceiverMetrics(c.settings.MeterProvider.Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + if err != nil { + return err + } + consumerGroup := &tracesConsumerGroupHandler{ id: c.settings.ID, logger: c.settings.Logger, @@ -154,6 +159,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro retryInterval: 1 * time.Second, pausePartition: make(chan struct{}), resumePartition: make(chan struct{}), + metrics: metrics, }, } go func() { @@ -245,6 +251,12 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err if err != nil { return err } + + metrics, err := NewKafkaReceiverMetrics(c.settings.MeterProvider.Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + if err != nil { + return err + } + metricsConsumerGroup := &metricsConsumerGroupHandler{ id: c.settings.ID, logger: c.settings.Logger, @@ -260,6 +272,7 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err retryInterval: 1 * time.Second, pausePartition: make(chan struct{}), resumePartition: make(chan struct{}), + metrics: metrics, }, } go func() { @@ -379,6 +392,11 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error return err } + metrics, err := NewKafkaReceiverMetrics(c.settings.MeterProvider.Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + if err != nil { + return err + } + logsConsumerGroup := &logsConsumerGroupHandler{ id: c.settings.ID, logger: c.settings.Logger, @@ -394,6 +412,7 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error retryInterval: 1 * time.Second, pausePartition: make(chan struct{}), resumePartition: make(chan struct{}), + metrics: metrics, }, } go func() { @@ -433,6 +452,8 @@ type baseConsumerGroupHandler struct { retryInterval time.Duration pausePartition chan struct{} resumePartition chan struct{} + + metrics *KafkaReceiverMetrics } // wrap is now a method of BaseConsumerGroupHandler @@ -529,14 +550,20 @@ func (c *tracesConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) c.readyCloser.Do(func() { close(c.ready) }) - statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.Name())} - _ = stats.RecordWithTags(session.Context(), statsTags, statPartitionStart.M(1)) + c.metrics.partitionStart.Add( + session.Context(), + 1, + metric.WithAttributes(attribute.String("name", c.id.String())), + ) return nil } func (c *tracesConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { - statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.Name())} - _ = stats.RecordWithTags(session.Context(), statsTags, statPartitionClose.M(1)) + c.metrics.partitionClose.Add( + session.Context(), + 1, + metric.WithAttributes(attribute.String("name", c.id.String())), + ) return nil } @@ -564,11 +591,21 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe } ctx := c.obsrecv.StartTracesOp(session.Context()) - statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())} - _ = stats.RecordWithTags(ctx, statsTags, - statMessageCount.M(1), - statMessageOffset.M(message.Offset), - statMessageOffsetLag.M(claim.HighWaterMarkOffset()-message.Offset-1)) + c.metrics.messagesCount.Add( + session.Context(), + 1, + metric.WithAttributes(attribute.String("name", c.id.String())), + ) + c.metrics.messageOffset.Record( + session.Context(), + int64(message.Offset), + metric.WithAttributes(attribute.String("name", c.id.String())), + ) + c.metrics.messageOffsetLag.Record( + session.Context(), + int64(claim.HighWaterMarkOffset()-message.Offset-1), + metric.WithAttributes(attribute.String("name", c.id.String())), + ) traces, err := c.unmarshaler.Unmarshal(message.Value) if err != nil { @@ -597,7 +634,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe if !c.autocommitEnabled { session.Commit() } - err = stats.RecordWithTags(ctx, statsTags, processingTime.M(time.Since(start).Milliseconds())) + c.metrics.processingTime.Record(session.Context(), float64(time.Since(start).Milliseconds())) if err != nil { c.logger.Error("failed to record processing time", zap.Error(err)) } @@ -615,14 +652,20 @@ func (c *metricsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) c.readyCloser.Do(func() { close(c.ready) }) - statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.Name())} - _ = stats.RecordWithTags(session.Context(), statsTags, statPartitionStart.M(1)) + c.metrics.partitionStart.Add( + session.Context(), + 1, + metric.WithAttributes(attribute.String("name", c.id.String())), + ) return nil } func (c *metricsConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { - statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.Name())} - _ = stats.RecordWithTags(session.Context(), statsTags, statPartitionClose.M(1)) + c.metrics.partitionClose.Add( + session.Context(), + 1, + metric.WithAttributes(attribute.String("name", c.id.String())), + ) return nil } @@ -647,11 +690,21 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS } ctx := c.obsrecv.StartMetricsOp(session.Context()) - statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())} - _ = stats.RecordWithTags(ctx, statsTags, - statMessageCount.M(1), - statMessageOffset.M(message.Offset), - statMessageOffsetLag.M(claim.HighWaterMarkOffset()-message.Offset-1)) + c.metrics.messagesCount.Add( + session.Context(), + 1, + metric.WithAttributes(attribute.String("name", c.id.String())), + ) + c.metrics.messageOffset.Record( + session.Context(), + int64(message.Offset), + metric.WithAttributes(attribute.String("name", c.id.String())), + ) + c.metrics.messageOffsetLag.Record( + session.Context(), + int64(claim.HighWaterMarkOffset()-message.Offset-1), + metric.WithAttributes(attribute.String("name", c.id.String())), + ) metrics, err := c.unmarshaler.Unmarshal(message.Value) if err != nil { @@ -678,7 +731,11 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS if !c.autocommitEnabled { session.Commit() } - err = stats.RecordWithTags(ctx, statsTags, processingTime.M(time.Since(start).Milliseconds())) + c.metrics.processingTime.Record( + session.Context(), + float64(time.Since(start).Milliseconds()), + metric.WithAttributes(attribute.String("name", c.id.String())), + ) if err != nil { c.logger.Error("failed to record processing time", zap.Error(err)) } @@ -696,18 +753,20 @@ func (c *logsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) er c.readyCloser.Do(func() { close(c.ready) }) - _ = stats.RecordWithTags( + c.metrics.partitionStart.Add( session.Context(), - []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())}, - statPartitionStart.M(1)) + 1, + metric.WithAttributes(attribute.String("name", c.id.String())), + ) return nil } func (c *logsConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { - _ = stats.RecordWithTags( + c.metrics.partitionClose.Add( session.Context(), - []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())}, - statPartitionClose.M(1)) + 1, + metric.WithAttributes(attribute.String("name", c.id.String())), + ) return nil } @@ -735,13 +794,21 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess } ctx := c.obsrecv.StartLogsOp(session.Context()) - statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())} - _ = stats.RecordWithTags( - ctx, - statsTags, - statMessageCount.M(1), - statMessageOffset.M(message.Offset), - statMessageOffsetLag.M(claim.HighWaterMarkOffset()-message.Offset-1)) + c.metrics.messagesCount.Add( + session.Context(), + 1, + metric.WithAttributes(attribute.String("name", c.id.String())), + ) + c.metrics.messageOffset.Record( + session.Context(), + int64(message.Offset), + metric.WithAttributes(attribute.String("name", c.id.String())), + ) + c.metrics.messageOffsetLag.Record( + session.Context(), + int64(claim.HighWaterMarkOffset()-message.Offset-1), + metric.WithAttributes(attribute.String("name", c.id.String())), + ) logs, err := c.unmarshaler.Unmarshal(message.Value) if err != nil { @@ -768,7 +835,11 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess if !c.autocommitEnabled { session.Commit() } - err = stats.RecordWithTags(ctx, statsTags, processingTime.M(time.Since(start).Milliseconds())) + c.metrics.processingTime.Record( + session.Context(), + float64(time.Since(start).Milliseconds()), + metric.WithAttributes(attribute.String("name", c.id.String())), + ) if err != nil { c.logger.Error("failed to record processing time", zap.Error(err)) } diff --git a/receiver/signozkafkareceiver/kafka_receiver_test.go b/receiver/signozkafkareceiver/kafka_receiver_test.go index 1ec2dce1..fd3e6e9b 100644 --- a/receiver/signozkafkareceiver/kafka_receiver_test.go +++ b/receiver/signozkafkareceiver/kafka_receiver_test.go @@ -14,7 +14,6 @@ import ( "github.com/IBM/sarama" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opencensus.io/stats/view" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer/consumertest" @@ -23,6 +22,7 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/otel/metric/noop" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" @@ -105,8 +105,13 @@ func TestTracesReceiverStartConsume(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancelFunc require.NoError(t, c.Shutdown(context.Background())) - err := c.consumeLoop(ctx, &tracesConsumerGroupHandler{ + metrics, err := NewKafkaReceiverMetrics(c.settings.MeterProvider.Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) + err = c.consumeLoop(ctx, &tracesConsumerGroupHandler{ ready: make(chan bool), + baseConsumerGroupHandler: baseConsumerGroupHandler{ + metrics: metrics, + }, }) assert.EqualError(t, err, context.Canceled.Error()) } @@ -132,37 +137,28 @@ func TestTracesReceiver_error(t *testing.T) { } func TestTracesConsumerGroupHandler(t *testing.T) { - view.Unregister(MetricViews()...) - views := MetricViews() - require.NoError(t, view.Register(views...)) - defer view.Unregister(views...) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) + metrics, err := NewKafkaReceiverMetrics(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) c := tracesConsumerGroupHandler{ unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewNop(), obsrecv: obsrecv, + baseConsumerGroupHandler: baseConsumerGroupHandler{ + metrics: metrics, + }, } testSession := testConsumerGroupSession{ctx: context.Background()} require.NoError(t, c.Setup(testSession)) _, ok := <-c.ready assert.False(t, ok) - viewData, err := view.RetrieveData(statPartitionStart.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData := viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) require.NoError(t, c.Cleanup(testSession)) - viewData, err = view.RetrieveData(statPartitionClose.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData = viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) groupClaim := testConsumerGroupClaim{ messageChan: make(chan *sarama.ConsumerMessage), @@ -185,6 +181,8 @@ func TestTracesConsumerGroupHandlerWithMemoryLimiter(t *testing.T) { require.NoError(t, err) group := &testConsumerGroup{} + metrics, err := NewKafkaReceiverMetrics(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) c := tracesConsumerGroupHandler{ unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), @@ -197,6 +195,7 @@ func TestTracesConsumerGroupHandlerWithMemoryLimiter(t *testing.T) { retryInterval: 1 * time.Second, pausePartition: make(chan struct{}), resumePartition: make(chan struct{}), + metrics: metrics, }, } @@ -225,19 +224,20 @@ func TestTracesConsumerGroupHandlerWithMemoryLimiter(t *testing.T) { } func TestTracesConsumerGroupHandler_session_done(t *testing.T) { - view.Unregister(MetricViews()...) - views := MetricViews() - require.NoError(t, view.Register(views...)) - defer view.Unregister(views...) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) + metrics, err := NewKafkaReceiverMetrics(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) c := tracesConsumerGroupHandler{ unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewNop(), obsrecv: obsrecv, + baseConsumerGroupHandler: baseConsumerGroupHandler{ + metrics: metrics, + }, } ctx, cancelFunc := context.WithCancel(context.Background()) @@ -245,18 +245,8 @@ func TestTracesConsumerGroupHandler_session_done(t *testing.T) { require.NoError(t, c.Setup(testSession)) _, ok := <-c.ready assert.False(t, ok) - viewData, err := view.RetrieveData(statPartitionStart.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData := viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) require.NoError(t, c.Cleanup(testSession)) - viewData, err = view.RetrieveData(statPartitionClose.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData = viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) groupClaim := testConsumerGroupClaim{ messageChan: make(chan *sarama.ConsumerMessage), @@ -278,12 +268,17 @@ func TestTracesConsumerGroupHandler_session_done(t *testing.T) { func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) + metrics, err := NewKafkaReceiverMetrics(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) c := tracesConsumerGroupHandler{ unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewNop(), obsrecv: obsrecv, + baseConsumerGroupHandler: baseConsumerGroupHandler{ + metrics: metrics, + }, } wg := sync.WaitGroup{} @@ -305,12 +300,17 @@ func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) { consumerError := errors.New("failed to consume") obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) + metrics, err := NewKafkaReceiverMetrics(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) c := tracesConsumerGroupHandler{ unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewErr(consumerError), obsrecv: obsrecv, + baseConsumerGroupHandler: baseConsumerGroupHandler{ + metrics: metrics, + }, } wg := sync.WaitGroup{} @@ -406,8 +406,13 @@ func TestMetricsReceiverStartConsume(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancelFunc require.NoError(t, c.Shutdown(context.Background())) - err := c.consumeLoop(ctx, &logsConsumerGroupHandler{ + metrics, err := NewKafkaReceiverMetrics(c.settings.MeterProvider.Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) + err = c.consumeLoop(ctx, &metricsConsumerGroupHandler{ ready: make(chan bool), + baseConsumerGroupHandler: baseConsumerGroupHandler{ + metrics: metrics, + }, }) assert.EqualError(t, err, context.Canceled.Error()) } @@ -433,37 +438,28 @@ func TestMetricsReceiver_error(t *testing.T) { } func TestMetricsConsumerGroupHandler(t *testing.T) { - view.Unregister(MetricViews()...) - views := MetricViews() - require.NoError(t, view.Register(views...)) - defer view.Unregister(views...) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) + metrics, err := NewKafkaReceiverMetrics(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) c := metricsConsumerGroupHandler{ unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewNop(), obsrecv: obsrecv, + baseConsumerGroupHandler: baseConsumerGroupHandler{ + metrics: metrics, + }, } testSession := testConsumerGroupSession{ctx: context.Background()} require.NoError(t, c.Setup(testSession)) _, ok := <-c.ready assert.False(t, ok) - viewData, err := view.RetrieveData(statPartitionStart.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData := viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) require.NoError(t, c.Cleanup(testSession)) - viewData, err = view.RetrieveData(statPartitionClose.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData = viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) groupClaim := testConsumerGroupClaim{ messageChan: make(chan *sarama.ConsumerMessage), @@ -486,6 +482,8 @@ func TestMetricsConsumerGroupHandlerWithMemoryLimiter(t *testing.T) { require.NoError(t, err) group := &testConsumerGroup{} + metrics, err := NewKafkaReceiverMetrics(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) c := metricsConsumerGroupHandler{ unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), @@ -498,6 +496,7 @@ func TestMetricsConsumerGroupHandlerWithMemoryLimiter(t *testing.T) { retryInterval: 1 * time.Second, pausePartition: make(chan struct{}), resumePartition: make(chan struct{}), + metrics: metrics, }, } @@ -526,19 +525,20 @@ func TestMetricsConsumerGroupHandlerWithMemoryLimiter(t *testing.T) { } func TestMetricsConsumerGroupHandler_session_done(t *testing.T) { - view.Unregister(MetricViews()...) - views := MetricViews() - require.NoError(t, view.Register(views...)) - defer view.Unregister(views...) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) + metrics, err := NewKafkaReceiverMetrics(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) c := metricsConsumerGroupHandler{ unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewNop(), obsrecv: obsrecv, + baseConsumerGroupHandler: baseConsumerGroupHandler{ + metrics: metrics, + }, } ctx, cancelFunc := context.WithCancel(context.Background()) @@ -546,18 +546,8 @@ func TestMetricsConsumerGroupHandler_session_done(t *testing.T) { require.NoError(t, c.Setup(testSession)) _, ok := <-c.ready assert.False(t, ok) - viewData, err := view.RetrieveData(statPartitionStart.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData := viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) require.NoError(t, c.Cleanup(testSession)) - viewData, err = view.RetrieveData(statPartitionClose.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData = viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) groupClaim := testConsumerGroupClaim{ messageChan: make(chan *sarama.ConsumerMessage), @@ -578,12 +568,17 @@ func TestMetricsConsumerGroupHandler_session_done(t *testing.T) { func TestMetricsConsumerGroupHandler_error_unmarshal(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) + metrics, err := NewKafkaReceiverMetrics(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) c := metricsConsumerGroupHandler{ unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewNop(), obsrecv: obsrecv, + baseConsumerGroupHandler: baseConsumerGroupHandler{ + metrics: metrics, + }, } wg := sync.WaitGroup{} @@ -605,12 +600,17 @@ func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) { consumerError := errors.New("failed to consume") obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) + metrics, err := NewKafkaReceiverMetrics(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) c := metricsConsumerGroupHandler{ unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewErr(consumerError), obsrecv: obsrecv, + baseConsumerGroupHandler: baseConsumerGroupHandler{ + metrics: metrics, + }, } wg := sync.WaitGroup{} @@ -705,8 +705,13 @@ func TestLogsReceiverStartConsume(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancelFunc require.NoError(t, c.Shutdown(context.Background())) - err := c.consumeLoop(ctx, &logsConsumerGroupHandler{ + metrics, err := NewKafkaReceiverMetrics(c.settings.MeterProvider.Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) + err = c.consumeLoop(ctx, &logsConsumerGroupHandler{ ready: make(chan bool), + baseConsumerGroupHandler: baseConsumerGroupHandler{ + metrics: metrics, + }, }) assert.EqualError(t, err, context.Canceled.Error()) } @@ -732,37 +737,28 @@ func TestLogsReceiver_error(t *testing.T) { } func TestLogsConsumerGroupHandler(t *testing.T) { - view.Unregister(MetricViews()...) - views := MetricViews() - require.NoError(t, view.Register(views...)) - defer view.Unregister(views...) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) + metrics, err := NewKafkaReceiverMetrics(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) c := logsConsumerGroupHandler{ unmarshaler: NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewNop(), obsrecv: obsrecv, + baseConsumerGroupHandler: baseConsumerGroupHandler{ + metrics: metrics, + }, } testSession := testConsumerGroupSession{ctx: context.Background()} require.NoError(t, c.Setup(testSession)) _, ok := <-c.ready assert.False(t, ok) - viewData, err := view.RetrieveData(statPartitionStart.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData := viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) require.NoError(t, c.Cleanup(testSession)) - viewData, err = view.RetrieveData(statPartitionClose.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData = viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) groupClaim := testConsumerGroupClaim{ messageChan: make(chan *sarama.ConsumerMessage), @@ -785,6 +781,8 @@ func TestLogsConsumerGroupHandlerWithMemoryLimiter(t *testing.T) { require.NoError(t, err) group := &testConsumerGroup{} + metrics, err := NewKafkaReceiverMetrics(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) c := logsConsumerGroupHandler{ unmarshaler: NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), @@ -797,6 +795,7 @@ func TestLogsConsumerGroupHandlerWithMemoryLimiter(t *testing.T) { retryInterval: 1 * time.Second, pausePartition: make(chan struct{}), resumePartition: make(chan struct{}), + metrics: metrics, }, } @@ -825,19 +824,20 @@ func TestLogsConsumerGroupHandlerWithMemoryLimiter(t *testing.T) { } func TestLogsConsumerGroupHandler_session_done(t *testing.T) { - view.Unregister(MetricViews()...) - views := MetricViews() - require.NoError(t, view.Register(views...)) - defer view.Unregister(views...) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) + metrics, err := NewKafkaReceiverMetrics(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) c := logsConsumerGroupHandler{ unmarshaler: NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewNop(), obsrecv: obsrecv, + baseConsumerGroupHandler: baseConsumerGroupHandler{ + metrics: metrics, + }, } ctx, cancelFunc := context.WithCancel(context.Background()) @@ -845,18 +845,8 @@ func TestLogsConsumerGroupHandler_session_done(t *testing.T) { require.NoError(t, c.Setup(testSession)) _, ok := <-c.ready assert.False(t, ok) - viewData, err := view.RetrieveData(statPartitionStart.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData := viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) require.NoError(t, c.Cleanup(testSession)) - viewData, err = view.RetrieveData(statPartitionClose.Name()) - require.NoError(t, err) - assert.Equal(t, 1, len(viewData)) - distData = viewData[0].Data.(*view.SumData) - assert.Equal(t, float64(1), distData.Value) groupClaim := testConsumerGroupClaim{ messageChan: make(chan *sarama.ConsumerMessage), @@ -877,12 +867,17 @@ func TestLogsConsumerGroupHandler_session_done(t *testing.T) { func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) + metrics, err := NewKafkaReceiverMetrics(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) c := logsConsumerGroupHandler{ unmarshaler: NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewNop(), obsrecv: obsrecv, + baseConsumerGroupHandler: baseConsumerGroupHandler{ + metrics: metrics, + }, } wg := sync.WaitGroup{} @@ -904,12 +899,17 @@ func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) { consumerError := errors.New("failed to consume") obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) + metrics, err := NewKafkaReceiverMetrics(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) c := logsConsumerGroupHandler{ unmarshaler: NewPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), logger: zap.NewNop(), ready: make(chan bool), nextConsumer: consumertest.NewErr(consumerError), obsrecv: obsrecv, + baseConsumerGroupHandler: baseConsumerGroupHandler{ + metrics: metrics, + }, } wg := sync.WaitGroup{} @@ -973,6 +973,8 @@ func TestLogsConsumerGroupHandler_unmarshal_text(t *testing.T) { unmarshaler := newTextLogsUnmarshaler() unmarshaler, err = unmarshaler.WithEnc(test.enc) require.NoError(t, err) + metrics, err := NewKafkaReceiverMetrics(noop.NewMeterProvider().Meter("github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver")) + require.NoError(t, err) sink := &consumertest.LogsSink{} c := logsConsumerGroupHandler{ unmarshaler: unmarshaler, @@ -980,6 +982,9 @@ func TestLogsConsumerGroupHandler_unmarshal_text(t *testing.T) { ready: make(chan bool), nextConsumer: sink, obsrecv: obsrecv, + baseConsumerGroupHandler: baseConsumerGroupHandler{ + metrics: metrics, + }, } wg := sync.WaitGroup{} diff --git a/receiver/signozkafkareceiver/metrics.go b/receiver/signozkafkareceiver/metrics.go index 62e38c83..7856cdb6 100644 --- a/receiver/signozkafkareceiver/metrics.go +++ b/receiver/signozkafkareceiver/metrics.go @@ -4,82 +4,73 @@ package signozkafkareceiver // import "github.com/SigNoz/signoz-otel-collector/receiver/signozkafkareceiver" import ( - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" + "go.opentelemetry.io/otel/metric" ) -var ( - tagInstanceName, _ = tag.NewKey("name") +type KafkaReceiverMetrics struct { + messagesCount metric.Int64Counter + messageOffset metric.Int64Gauge + messageOffsetLag metric.Int64Gauge - statMessageCount = stats.Int64("kafka_receiver_messages", "Number of received messages", stats.UnitDimensionless) - statMessageOffset = stats.Int64("kafka_receiver_current_offset", "Current message offset", stats.UnitDimensionless) - statMessageOffsetLag = stats.Int64("kafka_receiver_offset_lag", "Current offset lag", stats.UnitDimensionless) + partitionStart metric.Int64Counter + partitionClose metric.Int64Counter - statPartitionStart = stats.Int64("kafka_receiver_partition_start", "Number of started partitions", stats.UnitDimensionless) - statPartitionClose = stats.Int64("kafka_receiver_partition_close", "Number of finished partitions", stats.UnitDimensionless) - - processingTime = stats.Int64("kafka_receiver_processing_time_milliseconds", "Time taken to process a kafka message in ms", stats.UnitMilliseconds) -) - -// MetricViews return metric views for Kafka receiver. -func MetricViews() []*view.View { - tagKeys := []tag.Key{tagInstanceName} + processingTime metric.Float64Histogram +} - countMessages := &view.View{ - Name: statMessageCount.Name(), - Measure: statMessageCount, - Description: statMessageCount.Description(), - TagKeys: tagKeys, - Aggregation: view.Sum(), +func NewKafkaReceiverMetrics(meter metric.Meter) (*KafkaReceiverMetrics, error) { + messagesCount, err := meter.Int64Counter( + "kafka_receiver_messages", + metric.WithDescription("Number of received messages"), + ) + if err != nil { + return nil, err } - - lastValueOffset := &view.View{ - Name: statMessageOffset.Name(), - Measure: statMessageOffset, - Description: statMessageOffset.Description(), - TagKeys: tagKeys, - Aggregation: view.LastValue(), + messageOffset, err := meter.Int64Gauge( + "kafka_receiver_current_offset", + metric.WithDescription("Current message offset"), + ) + if err != nil { + return nil, err } - - lastValueOffsetLag := &view.View{ - Name: statMessageOffsetLag.Name(), - Measure: statMessageOffsetLag, - Description: statMessageOffsetLag.Description(), - TagKeys: tagKeys, - Aggregation: view.LastValue(), + messageOffsetLag, err := meter.Int64Gauge( + "kafka_receiver_offset_lag", + metric.WithDescription("Current offset lag"), + ) + if err != nil { + return nil, err } - - countPartitionStart := &view.View{ - Name: statPartitionStart.Name(), - Measure: statPartitionStart, - Description: statPartitionStart.Description(), - TagKeys: tagKeys, - Aggregation: view.Sum(), + partitionStart, err := meter.Int64Counter( + "kafka_receiver_partition_start", + metric.WithDescription("Number of started partitions"), + ) + if err != nil { + return nil, err } - - countPartitionClose := &view.View{ - Name: statPartitionClose.Name(), - Measure: statPartitionClose, - Description: statPartitionClose.Description(), - TagKeys: tagKeys, - Aggregation: view.Sum(), + partitionClose, err := meter.Int64Counter( + "kafka_receiver_partition_close", + metric.WithDescription("Number of finished partitions"), + ) + if err != nil { + return nil, err } - processingTimeView := &view.View{ - Name: processingTime.Name(), - Measure: processingTime, - Description: processingTime.Description(), - TagKeys: tagKeys, - Aggregation: view.Distribution(100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300, 1400, 1500, 1750, 2000, 4000, 8000, 16000, 32000, 64000, 128000, 256000, 512000), + processingTime, err := meter.Float64Histogram( + "kafka_receiver_processing_time_milliseconds", + metric.WithDescription("Time taken to process a kafka message in ms"), + metric.WithUnit("ms"), + metric.WithExplicitBucketBoundaries(250, 500, 750, 1000, 2000, 2500, 3000, 4000, 5000, 6000, 8000, 10000, 15000, 25000, 30000), + ) + if err != nil { + return nil, err } - return []*view.View{ - countMessages, - lastValueOffset, - lastValueOffsetLag, - countPartitionStart, - countPartitionClose, - processingTimeView, - } + return &KafkaReceiverMetrics{ + messagesCount: messagesCount, + messageOffset: messageOffset, + messageOffsetLag: messageOffsetLag, + partitionStart: partitionStart, + partitionClose: partitionClose, + processingTime: processingTime, + }, nil } diff --git a/receiver/signozkafkareceiver/metrics_test.go b/receiver/signozkafkareceiver/metrics_test.go deleted file mode 100644 index 7a7978e8..00000000 --- a/receiver/signozkafkareceiver/metrics_test.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package signozkafkareceiver - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestMetrics(t *testing.T) { - metricViews := MetricViews() - viewNames := []string{ - "kafka_receiver_messages", - "kafka_receiver_current_offset", - "kafka_receiver_offset_lag", - "kafka_receiver_partition_start", - "kafka_receiver_partition_close", - } - for i, viewName := range viewNames { - assert.Equal(t, viewName, metricViews[i].Name) - } -}