diff --git a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go index 0bc6703..ae52e44 100644 --- a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go +++ b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go @@ -13,6 +13,20 @@ import ( ) // schema reference: https://github.com/metrico/qryn/blob/master/lib/db/maintain/scripts.js +const ( + columnTimestampNs = "timestamp_ns" + columnType = "type" + columnServiceName = "service_name" + columnSampleTypesUnits = "sample_types_units" + columnPeriodType = "period_type" + columnPeriodUnit = "period_unit" + columnTags = "tags" + columnDurationNs = "duration_ns" + columnPayloadType = "payload_type" + columnPayloaf = "payload" + columnValuesAgg = "values_agg" +) + type clickhouseAccessNativeColumnar struct { conn driver.Conn @@ -92,10 +106,10 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { m = r.Attributes() timestamp_ns[i] = uint64(r.Timestamp()) - tmp, _ = m.Get("type") + tmp, _ = m.Get(columnType) typ[i] = tmp.AsString() - tmp, _ = m.Get("service_name") + tmp, _ = m.Get(columnServiceName) service_name[i] = tmp.AsString() sample_types, _ := m.Get("sample_types") @@ -112,7 +126,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { return err } - values_agg_raw, ok := m.Get("values_agg") + values_agg_raw, ok := m.Get(columnValuesAgg) if ok { values_agg_tuple, err := valueAggToTuple(&values_agg_raw) if err != nil { @@ -127,13 +141,13 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { sample_types_units_item[i] = tuple{v, sample_units_array[i]} } sample_types_units[i] = sample_types_units_item - tmp, _ = m.Get("period_type") + tmp, _ = m.Get(columnPeriodType) period_type[i] = tmp.AsString() - tmp, _ = m.Get("period_unit") + tmp, _ = m.Get(columnPeriodUnit) period_unit[i] = tmp.AsString() - tmp, _ = m.Get("tags") + tmp, _ = m.Get(columnTags) tm = tmp.Map().AsRaw() tag, j := make([]tuple, len(tm)), 0 for k, v := range tm { @@ -142,10 +156,10 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { } tags[i] = tag - tmp, _ = m.Get("duration_ns") + tmp, _ = m.Get(columnDurationNs) duration_ns[i], _ = strconv.ParseUint(tmp.Str(), 10, 64) - tmp, _ = m.Get("payload_type") + tmp, _ = m.Get(columnPeriodType) payload_type[i] = tmp.AsString() payload[i] = r.Body().Bytes().AsRaw() diff --git a/exporter/clickhouseprofileexporter/exporter.go b/exporter/clickhouseprofileexporter/exporter.go index 7258847..deae555 100644 --- a/exporter/clickhouseprofileexporter/exporter.go +++ b/exporter/clickhouseprofileexporter/exporter.go @@ -64,11 +64,11 @@ func newClickhouseProfileExporter(ctx context.Context, set *exporter.CreateSetti func (exp *clickhouseProfileExporter) send(ctx context.Context, logs plog.Logs) error { start := time.Now().UnixMilli() if err := exp.ch.InsertBatch(logs); err != nil { - otelcolExporterClickhouseProfileBatchInsertTimeMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError))) + otelcolExporterClickhouseProfileBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError))) exp.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err } - otelcolExporterClickhouseProfileBatchInsertTimeMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess))) + otelcolExporterClickhouseProfileBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess))) exp.logger.Info("inserted batch", zap.Int("size", logs.ResourceLogs().Len())) return nil } diff --git a/exporter/clickhouseprofileexporter/metrics.go b/exporter/clickhouseprofileexporter/metrics.go index d494573..5c6026a 100644 --- a/exporter/clickhouseprofileexporter/metrics.go +++ b/exporter/clickhouseprofileexporter/metrics.go @@ -9,14 +9,14 @@ import ( const prefix = "exporter_clickhouse_profile_" var ( - otelcolExporterClickhouseProfileBatchInsertTimeMillis metric.Int64Histogram + otelcolExporterClickhouseProfileBatchInsertDurationMillis metric.Int64Histogram ) func initMetrics(meter metric.Meter) error { var err error - if otelcolExporterClickhouseProfileBatchInsertTimeMillis, err = meter.Int64Histogram( - fmt.Sprint(prefix, "batch_insert_time_millis"), - metric.WithDescription("Clickhouse profile exporter batch insert time in millis"), + if otelcolExporterClickhouseProfileBatchInsertDurationMillis, err = meter.Int64Histogram( + fmt.Sprint(prefix, "batch_insert_duration_millis"), + metric.WithDescription("Clickhouse profile exporter batch insert duration in millis"), metric.WithExplicitBucketBoundaries(0, 5, 10, 20, 50, 100, 200, 500, 1000, 5000), ); err != nil { return err diff --git a/go.mod b/go.mod index c7b29eb..65e6e2b 100644 --- a/go.mod +++ b/go.mod @@ -183,9 +183,6 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/extension/sigv4authextension v0.93.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/solarwindsapmsettingsextension v0.93.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.93.0 - github.com/open-telemetry/opentelemetry-collector-contrib/processor/datadogprocessor v0.93.0 - github.com/open-telemetry/opentelemetry-collector-contrib/processor/remotetapprocessor v0.93.0 - github.com/open-telemetry/opentelemetry-collector-contrib/processor/sumologicprocessor v0.93.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachesparkreceiver v0.93.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azuremonitorreceiver v0.93.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/cloudflarereceiver v0.93.0 @@ -469,7 +466,7 @@ require ( github.com/mongodb-forks/digest v1.0.5 // indirect github.com/montanaflynn/stats v0.7.0 // indirect github.com/mostynb/go-grpc-compression v1.2.2 // indirect - github.com/mrunalp/fileutils v0.5.0 // indirect + github.com/mrunalp/fileutils v0.5.1 // indirect github.com/mtibben/percent v0.2.1 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect @@ -515,7 +512,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/winperfcounters v0.93.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0-rc5 // indirect - github.com/opencontainers/runc v1.1.9 // indirect + github.com/opencontainers/runc v1.1.12 // indirect github.com/opencontainers/runtime-spec v1.1.0-rc.3 // indirect github.com/opencontainers/selinux v1.10.0 // indirect github.com/openshift/api v3.9.0+incompatible // indirect diff --git a/go.sum b/go.sum index a929c27..e8fc1e5 100644 --- a/go.sum +++ b/go.sum @@ -994,8 +994,8 @@ github.com/montanaflynn/stats v0.7.0/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/mostynb/go-grpc-compression v1.2.2 h1:XaDbnRvt2+1vgr0b/l0qh4mJAfIxE0bKXtz2Znl3GGI= github.com/mostynb/go-grpc-compression v1.2.2/go.mod h1:GOCr2KBxXcblCuczg3YdLQlcin1/NfyDA348ckuCH6w= -github.com/mrunalp/fileutils v0.5.0 h1:NKzVxiH7eSk+OQ4M+ZYW1K6h27RUV3MI6NUTsHhU6Z4= -github.com/mrunalp/fileutils v0.5.0/go.mod h1:M1WthSahJixYnrXQl/DFQuteStB1weuxD2QJNHXfbSQ= +github.com/mrunalp/fileutils v0.5.1 h1:F+S7ZlNKnrwHfSwdlgNSkKo67ReVf8o9fel6C3dkm/Q= +github.com/mrunalp/fileutils v0.5.1/go.mod h1:M1WthSahJixYnrXQl/DFQuteStB1weuxD2QJNHXfbSQ= github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= @@ -1201,8 +1201,6 @@ github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributespr github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.93.0/go.mod h1:KnnM8ZRrnw/qnZdbSvjthAiQJ1EEZsiETK8oRZwvOSQ= github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor v0.93.0 h1:mTd5tZuTZmWJqOlsx+jBHIYGCBjqaHGiXGVQqhdCHWQ= github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor v0.93.0/go.mod h1:5hu95tq6qsyDdQuYTrspDVopsUw9wF1yKu0WkPcEzH4= -github.com/open-telemetry/opentelemetry-collector-contrib/processor/datadogprocessor v0.93.0 h1:gtWcUE0qc0oMywv/s6FR2B948MPbDBkl4YCl0gg29LY= -github.com/open-telemetry/opentelemetry-collector-contrib/processor/datadogprocessor v0.93.0/go.mod h1:NQZzKri0UxX9S/HhsbZKMKdQrJpB/Bn5HDcN0e/oF94= github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatorateprocessor v0.93.0 h1:d/Pu3F+bj6DAbhvf9kPM1KZ+FqTn0TcF3ZyfzGWO7oY= github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatorateprocessor v0.93.0/go.mod h1:u3GH5HdDWpdPzaWISf5yNNEy2DHhOa2hgk8Nl88QRzE= github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.93.0 h1:n70zH3TmXoZdr+rhZbzsSHzrfDnPVQSsaSFE6Nfp95Y= @@ -1223,8 +1221,6 @@ github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisti github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.93.0/go.mod h1:h31MYz9/QEi0thihnl8a0Fo/2WClc6tccYXD2/2MNU4= github.com/open-telemetry/opentelemetry-collector-contrib/processor/redactionprocessor v0.93.0 h1:7ck193H0hb86MVFT/+g4ObVW4JXfxWpIIhEYgTtAyiA= github.com/open-telemetry/opentelemetry-collector-contrib/processor/redactionprocessor v0.93.0/go.mod h1:w16u3zH02UU+62VXAMF6dTRjXmUnaELMnUVdxIw5xEM= -github.com/open-telemetry/opentelemetry-collector-contrib/processor/remotetapprocessor v0.93.0 h1:kBsFo3gSVuzSsCumGH6XhFDm4um1BXpiKw3LzEWSY/g= -github.com/open-telemetry/opentelemetry-collector-contrib/processor/remotetapprocessor v0.93.0/go.mod h1:fUb1sGJD0wteoVbARUu8AqaokiXHzwUP64Q7W20TACY= github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor v0.93.0 h1:x8ugfAGWANdUHElSvtdxHRG/c3ZynuoJlETVnqQZDfM= github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor v0.93.0/go.mod h1:Q4iKTdTWPFxV65vJS419mIoTohsG+vi8MnZRyfOPhak= github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.93.0 h1:0Or54ATMcceixFRnLi21pFa9jwplSME2e2PmlG6vNDg= @@ -1239,8 +1235,6 @@ github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsp github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor v0.93.0/go.mod h1:GqHZgI2n2zUDO806W7tQANw8wRpF1iDgMIYfROFp1fg= github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor v0.93.0 h1:OCfcUZ5xOuCnAa7dJFAfd4g6ZBL94NxAarzihviqxWI= github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor v0.93.0/go.mod h1:egqvFyRKTDmozm04s9Gq3JPWXMbgSJQDipMEyO4SysU= -github.com/open-telemetry/opentelemetry-collector-contrib/processor/sumologicprocessor v0.93.0 h1:I/rKYTvV47YbrUYcJgC7aaosesz84VKPzsuku8mKGyk= -github.com/open-telemetry/opentelemetry-collector-contrib/processor/sumologicprocessor v0.93.0/go.mod h1:oiMfivph4L5Kd5XY6zdUSafsV/a255JFKomIdWIz2qk= github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor v0.93.0 h1:PnEghq9Xrff6SxvL4CLOt+UYIOZQUZwifsQ7UnoxFVM= github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor v0.93.0/go.mod h1:9DYv6oFclofxtRRKuojMrQlikbSUhPG0JyOcKqtswGY= github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.93.0 h1:o3H1wxCXp3aAmwUFv/xnWMPYXuwxPUp+iY6OhYwWdOE= @@ -1424,8 +1418,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0-rc5 h1:Ygwkfw9bpDvs+c9E34SdgGOj41dX/cbdlwvlWt0pnFI= github.com/opencontainers/image-spec v1.1.0-rc5/go.mod h1:X4pATf0uXsnn3g5aiGIsVnJBR4mxhKzfwmvK/B2NTm8= -github.com/opencontainers/runc v1.1.9 h1:XR0VIHTGce5eWPkaPesqTBrhW2yAcaraWfsEalNwQLM= -github.com/opencontainers/runc v1.1.9/go.mod h1:CbUumNnWCuTGFukNXahoo/RFBZvDAgRh/smNYNOhA50= +github.com/opencontainers/runc v1.1.12 h1:BOIssBaW1La0/qbNZHXOOa71dZfZEQOzW7dqQf3phss= +github.com/opencontainers/runc v1.1.12/go.mod h1:S+lQwSfncpBha7XTy/5lBwWgm5+y5Ma/O44Ekby9FK8= github.com/opencontainers/runtime-spec v1.1.0-rc.3 h1:l04uafi6kxByhbxev7OWiuUv0LZxEsYUfDWZ6bztAuU= github.com/opencontainers/runtime-spec v1.1.0-rc.3/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/selinux v1.10.0 h1:rAiKF8hTcgLI3w0DHm6i0ylVVcOrlgR1kK99DRLDhyU= diff --git a/receiver/pyroscopereceiver/jfrparser/parser.go b/receiver/pyroscopereceiver/jfrparser/parser.go index c956c45..1cde2ca 100644 --- a/receiver/pyroscopereceiver/jfrparser/parser.go +++ b/receiver/pyroscopereceiver/jfrparser/parser.go @@ -114,6 +114,7 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata) ([ if nil == pr { continue } + // assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof pr.prof.Payload = new(bytes.Buffer) pr.pprof.WriteUncompressed(pr.prof.Payload) @@ -222,7 +223,7 @@ func calculateValuesAgg(samples *pprof_proto.Profile) []profile_types.SampleType // Loop through each sample type for j, st := range samples.SampleType { sum, count := calculateSumAndCount(samples, j) - valuesAgg = append(valuesAgg, profile_types.SampleType{fmt.Sprintf("%s:%s", st.Type, st.Unit), sum, count}) + valuesAgg = append(valuesAgg, profile_types.SampleType{Key: fmt.Sprintf("%s:%s", st.Type, st.Unit), Sum: sum, Count: count}) } return valuesAgg diff --git a/receiver/pyroscopereceiver/metrics.go b/receiver/pyroscopereceiver/metrics.go index f77d1c9..c7dbc6f 100644 --- a/receiver/pyroscopereceiver/metrics.go +++ b/receiver/pyroscopereceiver/metrics.go @@ -12,7 +12,6 @@ var ( otelcolReceiverPyroscopeHttpRequestTotal metric.Int64Counter otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes metric.Int64Histogram otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes metric.Int64Histogram - otelcolReceiverPyroscopeHttpResponseTimeMillis metric.Int64Histogram ) func initMetrics(meter metric.Meter) error { @@ -37,12 +36,5 @@ func initMetrics(meter metric.Meter) error { ); err != nil { return err } - if otelcolReceiverPyroscopeHttpResponseTimeMillis, err = meter.Int64Histogram( - fmt.Sprint(prefix, "http_response_time_millis"), - metric.WithDescription("Pyroscope receiver http response time in millis"), - metric.WithExplicitBucketBoundaries(0, 5, 10, 20, 50, 100, 200, 500, 1000, 5000), - ); err != nil { - return err - } return nil } diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index 5065025..91ee8f0 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -12,7 +12,6 @@ import ( "strconv" "strings" "sync" - "time" "github.com/metrico/otel-collector/receiver/pyroscopereceiver/compress" "github.com/metrico/otel-collector/receiver/pyroscopereceiver/jfrparser" @@ -95,7 +94,7 @@ func newPyroscopeReceiver(cfg *Config, consumer consumer.Logs, set *receiver.Cre // TODO: rate limit clients func (recv *pyroscopeReceiver) httpHandlerIngest(resp http.ResponseWriter, req *http.Request) { - ctx, cancel := context.WithTimeout(contextWithStart(req.Context(), time.Now().UnixMilli()), recv.cfg.Timeout) + ctx, cancel := context.WithTimeout(req.Context(), recv.cfg.Timeout) defer cancel() // all compute should be bounded by timeout, so dont add compute here @@ -108,14 +107,6 @@ func (recv *pyroscopeReceiver) httpHandlerIngest(resp http.ResponseWriter, req * } } -func startTimeFromContext(ctx context.Context) int64 { - return ctx.Value(keyStart).(int64) -} - -func contextWithStart(ctx context.Context, start int64) context.Context { - return context.WithValue(ctx, keyStart, start) -} - func (recv *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWriter, req *http.Request) <-chan struct{} { c := make(chan struct{}) go func() { @@ -139,6 +130,7 @@ func (recv *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWri recv.handleError(ctx, resp, "text/plain", http.StatusBadRequest, err.Error(), pm.name, errorCodeError) return } + // if no profiles have been parsed, dont error but return if pl.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len() == 0 { writeResponseNoContent(resp) @@ -153,16 +145,14 @@ func (recv *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWri return } - otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess))) - otelcolReceiverPyroscopeHttpResponseTimeMillis.Record(ctx, time.Now().UnixMilli()-startTimeFromContext(ctx), metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess))) + otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess, http.StatusNoContent))) writeResponseNoContent(resp) }() return c } func (recv *pyroscopeReceiver) handleError(ctx context.Context, resp http.ResponseWriter, contentType string, statusCode int, msg string, service string, errorCode string) { - otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(service, errorCode))) - otelcolReceiverPyroscopeHttpResponseTimeMillis.Record(ctx, time.Now().Unix()-startTimeFromContext(ctx), metric.WithAttributeSet(*newOtelcolAttrSetHttp(service, errorCode))) + otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(service, errorCode, statusCode))) recv.logger.Error(msg) writeResponse(resp, "text/plain", statusCode, []byte(msg)) } @@ -219,8 +209,12 @@ func readParams(qs *url.Values) (params, error) { return p, nil } -func newOtelcolAttrSetHttp(service string, errorCode string) *attribute.Set { - s := attribute.NewSet(attribute.KeyValue{Key: keyService, Value: attribute.StringValue(service)}, attribute.KeyValue{Key: "error_code", Value: attribute.StringValue(errorCode)}) +func newOtelcolAttrSetHttp(service string, errorCode string, statusCode int) *attribute.Set { + s := attribute.NewSet( + attribute.KeyValue{Key: keyService, Value: attribute.StringValue(service)}, + attribute.KeyValue{Key: "error_code", Value: attribute.StringValue(errorCode)}, + attribute.KeyValue{Key: "status_code", Value: attribute.IntValue(statusCode)}, + ) return &s }