From 3b5de3b8758f094a00edeae350e6971ab5dbfc5b Mon Sep 17 00:00:00 2001 From: Rory Crispin Date: Tue, 5 Sep 2023 10:39:19 +0100 Subject: [PATCH] ClickHouse patches Fix auth to GCP, drop building RPM/DEBs Drop RPM/DEB from build flow --- .github/workflows/build-test-publish.yaml | 65 +++-- cmd/otelcontribcol/processors_test.go | 0 exporter/clickhouseexporter/exporter_logs.go | 272 ++++++++++++------ .../clickhouseexporter/exporter_logs_test.go | 33 ++- .../clickhouseexporter/exporter_metrics.go | 8 +- .../clickhouseexporter/exporter_traces.go | 48 ++-- 6 files changed, 282 insertions(+), 144 deletions(-) create mode 100644 cmd/otelcontribcol/processors_test.go diff --git a/.github/workflows/build-test-publish.yaml b/.github/workflows/build-test-publish.yaml index abceff84f954a..9f57c2a344d61 100644 --- a/.github/workflows/build-test-publish.yaml +++ b/.github/workflows/build-test-publish.yaml @@ -27,7 +27,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v3 with: - go-version: "1.21.12" + go-version: 1.21 - name: Cache Go id: go-cache uses: actions/cache@v3 @@ -70,7 +70,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v3 with: - go-version: "1.21.12" + go-version: 1.21 - name: Cache Go id: go-cache uses: actions/cache@v3 @@ -117,7 +117,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v3 with: - go-version: "1.21.12" + go-version: 1.21 - name: Cache Go id: go-cache uses: actions/cache@v3 @@ -160,12 +160,20 @@ jobs: run: | make -j2 generate git diff --exit-code ':!*go.sum' || (echo 'Generated code is out of date, please run "make generate" and commit the changes in this PR.' && exit 1) + - name: Check gendependabot + run: | + make -j2 gendependabot + git diff --exit-code ':!*go.sum' || (echo 'dependabot.yml is out of date, please run "make gendependabot" and commit the changes in this PR.' && exit 1) - name: MultimodVerify run: make multimod-verify + - name: Components dropdown in issue templates + run: | + make generate-gh-issue-templates + git diff --exit-code '.github/ISSUE_TEMPLATE' || (echo 'Dropdowns in issue templates are out of date, please run "make generate-gh-issue-templates" and commit the changes in this PR.' && exit 1) unittest-matrix: strategy: matrix: - go-version: ["1.21.12"] + go-version: ["1.21", "1.20"] # 1.20 is interpreted as 1.2 without quotes group: - receiver-0 - receiver-1 @@ -214,7 +222,7 @@ jobs: if: ${{ github.actor != 'dependabot[bot]' && always() }} strategy: matrix: - go-version: ["1.21.12"] + go-version: ["1.21", "1.20"] # 1.20 is interpreted as 1.2 without quotes runs-on: ubuntu-latest needs: [setup-environment, unittest-matrix] steps: @@ -239,7 +247,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v3 with: - go-version: "1.21.12" + go-version: 1.21 - name: Cache Go id: go-cache uses: actions/cache@v3 @@ -260,7 +268,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v3 with: - go-version: "1.21.12" + go-version: 1.21 - name: Cache Go id: go-cache uses: actions/cache@v3 @@ -287,7 +295,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v3 with: - go-version: "1.21.12" + go-version: 1.21 - name: Cache Go id: go-cache uses: actions/cache@v3 @@ -339,7 +347,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v3 with: - go-version: "1.21.12" + go-version: 1.21 - name: Cache Go id: go-cache uses: actions/cache@v3 @@ -375,7 +383,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v3 with: - go-version: "1.21.12" + go-version: 1.21 - name: Set up QEMU uses: docker/setup-qemu-action@v3 - name: Set up Docker Buildx @@ -417,6 +425,13 @@ jobs: BUILD_VERSION="${BASE_VERSION}-${SANITIZED_BRANCH}-${RUN_NUMBER}" echo "BUILD_VERSION=${BUILD_VERSION}" | tee -a $GITHUB_ENV $GITHUB_OUTPUT + - name: Build Docker Image + run: | + make docker-otelcontribcol + docker tag otelcontribcol:latest 609927696493.dkr.ecr.us-west-2.amazonaws.com/opentelemetry-collector-contrib:${{ steps.create-version.outputs.BUILD_VERSION }} + - name: Validate Docker Image + run: | + docker run 609927696493.dkr.ecr.us-west-2.amazonaws.com/opentelemetry-collector-contrib:${{ steps.create-version.outputs.BUILD_VERSION }} --version - id: login-azure name: Authenticate with Azure uses: azure/login@v2 @@ -463,16 +478,20 @@ jobs: - name: Login to Amazon ECR id: login-ecr uses: aws-actions/amazon-ecr-login@v1 - - name: build and push - uses: docker/build-push-action@v5 - with: - push: true - context: . - file: ./cmd/otelcontribcol/Dockerfile - platforms: linux/amd64,linux/arm64 - tags: | - 609927696493.dkr.ecr.us-west-2.amazonaws.com/opentelemetry-collector-contrib:${{steps.create-version.outputs.BUILD_VERSION}} - ${{secrets.AAR_REPO_IMAGE}}:${{steps.create-version.outputs.BUILD_VERSION}} - ${{secrets.GCR_ASIA_IMAGE}}:${{steps.create-version.outputs.BUILD_VERSION}} - ${{secrets.GCR_EUROPE_IMAGE}}:${{steps.create-version.outputs.BUILD_VERSION}} - ${{secrets.GCR_US_IMAGE}}:${{steps.create-version.outputs.BUILD_VERSION}} + - name: Push Docker Image + run: | + docker push 609927696493.dkr.ecr.us-west-2.amazonaws.com/opentelemetry-collector-contrib:${{ steps.create-version.outputs.BUILD_VERSION }} + - name: Push image to GCP + env: + DOCKER_IMAGE: otelcontribcol + BUILD_VERSION: ${{steps.create-version.outputs.BUILD_VERSION}} + GCR_ASIA_IMAGE: ${{secrets.GCR_ASIA_IMAGE}} + GCR_EUROPE_IMAGE: ${{secrets.GCR_EUROPE_IMAGE}} + GCR_US_IMAGE: ${{secrets.GCR_US_IMAGE}} + run: | + docker tag $DOCKER_IMAGE:latest ${GCR_ASIA_IMAGE}:${BUILD_VERSION} + docker tag $DOCKER_IMAGE:latest ${GCR_EUROPE_IMAGE}:${BUILD_VERSION} + docker tag $DOCKER_IMAGE:latest ${GCR_US_IMAGE}:${BUILD_VERSION} + docker push -a ${GCR_ASIA_IMAGE} + docker push -a ${GCR_EUROPE_IMAGE} + docker push -a ${GCR_US_IMAGE} diff --git a/cmd/otelcontribcol/processors_test.go b/cmd/otelcontribcol/processors_test.go new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/exporter/clickhouseexporter/exporter_logs.go b/exporter/clickhouseexporter/exporter_logs.go index 12a815ef1c292..e118e32e4fb20 100644 --- a/exporter/clickhouseexporter/exporter_logs.go +++ b/exporter/clickhouseexporter/exporter_logs.go @@ -7,9 +7,11 @@ import ( "context" "database/sql" "fmt" + "net/url" + "strings" "time" - _ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver. + "github.com/ClickHouse/clickhouse-go/v2" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -28,7 +30,7 @@ type logsExporter struct { } func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) { - client, err := newClickhouseClient(cfg) + client, err := newClickHouseConn(cfg) if err != nil { return nil, err } @@ -61,39 +63,76 @@ func (e *logsExporter) shutdown(_ context.Context) error { return nil } -func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { +func (e *logsExporter) pushLogsData(_ context.Context, ld plog.Logs) error { start := time.Now() - err := doWithTx(ctx, e.client, func(tx *sql.Tx) error { - statement, err := tx.PrepareContext(ctx, e.insertSQL) + err := func() error { + scope, err := e.client.Begin() if err != nil { - return fmt.Errorf("PrepareContext:%w", err) + return fmt.Errorf("Begin:%w", err) } - defer func() { - _ = statement.Close() - }() + + batch, err := scope.Prepare(e.insertSQL) + if err != nil { + return fmt.Errorf("Prepare:%w", err) + } + var serviceName string - for i := 0; i < ld.ResourceLogs().Len(); i++ { - logs := ld.ResourceLogs().At(i) + var podName string + var containerName string + var region string + var cloudProvider string + var cell string + + resAttr := make(map[string]string) + + resourceLogs := ld.ResourceLogs() + for i := 0; i < resourceLogs.Len(); i++ { + logs := resourceLogs.At(i) res := logs.Resource() resURL := logs.SchemaUrl() - resAttr := attributesToMap(res.Attributes()) - if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok { - serviceName = v.Str() - } + + attrs := res.Attributes() + attributesToMap(attrs, resAttr) + + attrs.Range(func(key string, value pcommon.Value) bool { + switch key { + case conventions.AttributeServiceName: + serviceName = value.Str() + case conventions.AttributeK8SPodName: + podName = value.AsString() + case conventions.AttributeK8SContainerName: + containerName = value.AsString() + // TODO use AttributeCloudRegion 'cloud.region' + // https://github.com/ClickHouse/data-plane-application/issues/4155 + case "region": + fallthrough + case conventions.AttributeCloudRegion: + region = value.AsString() + case conventions.AttributeCloudProvider: + cloudProvider = value.AsString() + case "cell": + cell = value.AsString() + } + return true + }) for j := 0; j < logs.ScopeLogs().Len(); j++ { rs := logs.ScopeLogs().At(j).LogRecords() scopeURL := logs.ScopeLogs().At(j).SchemaUrl() scopeName := logs.ScopeLogs().At(j).Scope().Name() scopeVersion := logs.ScopeLogs().At(j).Scope().Version() - scopeAttr := attributesToMap(logs.ScopeLogs().At(j).Scope().Attributes()) + scopeAttr := make(map[string]string, attrs.Len()) + attributesToMap(logs.ScopeLogs().At(j).Scope().Attributes(), scopeAttr) for k := 0; k < rs.Len(); k++ { r := rs.At(k) - insertTimestamp := r.Timestamp() + + insertTimestamp := r.Timestamp() if insertTimestamp == 0 { insertTimestamp = r.ObservedTimestamp() } - logAttr := attributesToMap(r.Attributes()) - _, err = statement.ExecContext(ctx, + logAttr := make(map[string]string, attrs.Len()) + attributesToMap(r.Attributes(), logAttr) + + _, err = batch.Exec( insertTimestamp.AsTime(), traceutil.TraceIDToHexOrEmptyString(r.TraceID()), traceutil.SpanIDToHexOrEmptyString(r.SpanID()), @@ -103,6 +142,11 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { serviceName, r.Body().AsString(), resURL, + podName, + containerName, + region, + cloudProvider, + cell, resAttr, scopeURL, scopeName, @@ -111,64 +155,72 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { logAttr, ) if err != nil { - return fmt.Errorf("ExecContext:%w", err) + return fmt.Errorf("Append:%w", err) } } } + + // clear map for reuse + for k := range resAttr { + delete(resAttr, k) + } } - return nil - }) + + return scope.Commit() + }() + duration := time.Since(start) e.logger.Debug("insert logs", zap.Int("records", ld.LogRecordCount()), zap.String("cost", duration.String())) return err } -func attributesToMap(attributes pcommon.Map) map[string]string { - m := make(map[string]string, attributes.Len()) +func attributesToMap(attributes pcommon.Map, dest map[string]string) { attributes.Range(func(k string, v pcommon.Value) bool { - m[k] = v.AsString() + dest[k] = v.AsString() return true }) - return m } const ( // language=ClickHouse SQL createLogsTableSQL = ` -CREATE TABLE IF NOT EXISTS %s %s ( - Timestamp DateTime64(9) CODEC(Delta(8), ZSTD(1)), - TimestampDate Date DEFAULT toDate(Timestamp), - TimestampTime DateTime DEFAULT toDateTime(Timestamp), - TraceId String CODEC(ZSTD(1)), - SpanId String CODEC(ZSTD(1)), - TraceFlags UInt8, - SeverityText LowCardinality(String) CODEC(ZSTD(1)), - SeverityNumber UInt8, - ServiceName LowCardinality(String) CODEC(ZSTD(1)), - Body String CODEC(ZSTD(1)), - ResourceSchemaUrl LowCardinality(String) CODEC(ZSTD(1)), - ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)), - ScopeSchemaUrl LowCardinality(String) CODEC(ZSTD(1)), - ScopeName String CODEC(ZSTD(1)), - ScopeVersion LowCardinality(String) CODEC(ZSTD(1)), - ScopeAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)), - LogAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)), - - INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, - INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - INDEX idx_scope_attr_key mapKeys(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - INDEX idx_log_attr_value mapValues(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1 -) ENGINE = %s -PARTITION BY toYYYYMM(TimestampDate) -ORDER BY (ServiceName, TimestampDate, TimestampTime) +CREATE TABLE IF NOT EXISTS %s ( + Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)), + TraceId String CODEC(ZSTD(1)), + SpanId String CODEC(ZSTD(1)), + TraceFlags UInt32 CODEC(ZSTD(1)), + SeverityText LowCardinality(String) CODEC(ZSTD(1)), + SeverityNumber Int32 CODEC(ZSTD(1)), + ServiceName LowCardinality(String) CODEC(ZSTD(1)), + Body LowCardinality(String) CODEC(ZSTD(1)), + PodName LowCardinality(String), + ContainerName LowCardinality(String), + Region LowCardinality(String), + CloudProvider LowCardinality(String), + Cell LowCardinality(String), + ResourceSchemaUrl String CODEC(ZSTD(1)), + ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)), + ScopeSchemaUrl String CODEC(ZSTD(1)), + ScopeName String CODEC(ZSTD(1)), + ScopeVersion String CODEC(ZSTD(1)), + ScopeAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)), + LogAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)), + INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_scope_attr_key mapKeys(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_log_attr_value mapValues(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1 +) ENGINE MergeTree() %s -SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; +PARTITION BY toYYYYMM(Timestamp) +ORDER BY (PodName, ContainerName, SeverityText, Timestamp) +SETTINGS index_granularity=8192, ttl_only_drop_parts = 1; ` + // language=ClickHouse SQL insertLogsSQLTemplate = `INSERT INTO %s ( Timestamp, @@ -180,35 +232,47 @@ SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; ServiceName, Body, ResourceSchemaUrl, + PodName, + ContainerName, + Region, + CloudProvider, + Cell, ResourceAttributes, ScopeSchemaUrl, ScopeName, ScopeVersion, ScopeAttributes, LogAttributes - ) VALUES ( - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ? - )` + )` + inlineinsertLogsSQLTemplate = `INSERT INTO %s SETTINGS async_insert=1, wait_for_async_insert=0 ( + Timestamp, + TraceId, + SpanId, + TraceFlags, + SeverityText, + SeverityNumber, + ServiceName, + Body, + ResourceSchemaUrl, + PodName, + ContainerName, + Region, + CloudProvider, + Cell, + ResourceAttributes, + ScopeSchemaUrl, + ScopeName, + ScopeVersion, + ScopeAttributes, + LogAttributes + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` ) var driverName = "clickhouse" // for testing -// newClickhouseClient create a clickhouse client. -func newClickhouseClient(cfg *Config) (*sql.DB, error) { +// newClickHouseClient create a clickhouse client. +// used by metrics and traces: +func newClickHouseClient(cfg *Config) (*sql.DB, error) { db, err := cfg.buildDB() if err != nil { return nil, err @@ -216,20 +280,55 @@ func newClickhouseClient(cfg *Config) (*sql.DB, error) { return db, nil } +// used by logs: +func newClickHouseConn(cfg *Config) (*sql.DB, error) { + endpoint := cfg.Endpoint + + if len(cfg.ConnectionParams) > 0 { + values := make(url.Values, len(cfg.ConnectionParams)) + for k, v := range cfg.ConnectionParams { + values.Add(k, v) + } + + if !strings.Contains(endpoint, "?") { + endpoint += "?" + } else if !strings.HasSuffix(endpoint, "&") { + endpoint += "&" + } + + endpoint += values.Encode() + } + + opts, err := clickhouse.ParseDSN(endpoint) + if err != nil { + return nil, fmt.Errorf("unable to parse endpoint: %w", err) + } + + opts.Auth = clickhouse.Auth{ + Database: cfg.Database, + Username: cfg.Username, + Password: string(cfg.Password), + } + + // can return a "bad" connection if misconfigured, we won't know + // until a Ping, Exec, etc.. is done + return clickhouse.OpenDB(opts), nil +} + func createDatabase(ctx context.Context, cfg *Config) error { // use default database to create new database if cfg.Database == defaultDatabase { return nil } - db, err := cfg.buildDB() + db, err := cfg.buildDB(defaultDatabase) if err != nil { return err } defer func() { _ = db.Close() }() - query := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s %s", cfg.Database, cfg.clusterString()) + query := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", cfg.Database) _, err = db.ExecContext(ctx, query) if err != nil { return fmt.Errorf("create database: %w", err) @@ -250,19 +349,8 @@ func renderCreateLogsTableSQL(cfg *Config) string { } func renderInsertLogsSQL(cfg *Config) string { - return fmt.Sprintf(insertLogsSQLTemplate, cfg.LogsTableName) -} - -func doWithTx(_ context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error { - tx, err := db.Begin() - if err != nil { - return fmt.Errorf("db.Begin: %w", err) + if strings.HasPrefix(cfg.Endpoint, "tcp") && cfg.ConnectionParams["async_insert"] == "1" { + return fmt.Sprintf(inlineinsertLogsSQLTemplate, cfg.LogsTableName) } - defer func() { - _ = tx.Rollback() - }() - if err := fn(tx); err != nil { - return err - } - return tx.Commit() + return fmt.Sprintf(insertLogsSQLTemplate, cfg.LogsTableName) } diff --git a/exporter/clickhouseexporter/exporter_logs_test.go b/exporter/clickhouseexporter/exporter_logs_test.go index aa3ff11acded2..44679997877ab 100644 --- a/exporter/clickhouseexporter/exporter_logs_test.go +++ b/exporter/clickhouseexporter/exporter_logs_test.go @@ -51,7 +51,7 @@ func TestLogsExporter_New(t *testing.T) { }{ "no dsn": { config: withDefaultConfig(), - want: failWithMsg("exec create logs table sql: parse dsn address failed"), + want: failWithMsg("parse dsn address failed"), }, } @@ -78,6 +78,7 @@ func TestExporter_pushLogsData(t *testing.T) { t.Run("push success", func(t *testing.T) { var items int initClickhouseTestServer(t, func(query string, values []driver.Value) error { + t.Logf(query) t.Logf("%d, values:%+v", items, values) if strings.HasPrefix(query, "INSERT") { items++ @@ -97,7 +98,7 @@ func TestExporter_pushLogsData(t *testing.T) { require.Equal(t, "https://opentelemetry.io/schemas/1.4.0", values[8]) require.Equal(t, map[string]string{ "service.name": "test-service", - }, values[9]) + }, values[14]) } return nil }) @@ -107,12 +108,12 @@ func TestExporter_pushLogsData(t *testing.T) { t.Run("test check scope metadata", func(t *testing.T) { initClickhouseTestServer(t, func(query string, values []driver.Value) error { if strings.HasPrefix(query, "INSERT") { - require.Equal(t, "https://opentelemetry.io/schemas/1.7.0", values[10]) - require.Equal(t, "io.opentelemetry.contrib.clickhouse", values[11]) - require.Equal(t, "1.0.0", values[12]) + require.Equal(t, "https://opentelemetry.io/schemas/1.7.0", values[15]) + require.Equal(t, "io.opentelemetry.contrib.clickhouse", values[16]) + require.Equal(t, "1.0.0", values[17]) require.Equal(t, map[string]string{ "lib": "clickhouse", - }, values[13]) + }, values[18]) } return nil }) @@ -136,7 +137,12 @@ func TestLogsTableEngineConfig(t *testing.T) { } func newTestLogsExporter(t *testing.T, dsn string, fns ...func(*Config)) *logsExporter { - exporter, err := newLogsExporter(zaptest.NewLogger(t), withTestExporterConfig(fns...)(dsn)) + cfg := withTestExporterConfig(fns...)(dsn) + exporter, err := newLogsExporter(zaptest.NewLogger(t), cfg) + require.NoError(t, err) + + // need to use the dummy driver driver for testing + exporter.client, err = newClickHouseClient(cfg) require.NoError(t, err) require.NoError(t, exporter.start(context.TODO(), nil)) @@ -238,7 +244,18 @@ func (*testClickhouseDriverStmt) Close() error { } func (t *testClickhouseDriverStmt) NumInput() int { - return strings.Count(t.query, "?") + if !strings.HasPrefix(t.query, `INSERT`) { + return 0 + } + + n := strings.Count(t.query, "?") + if n > 0 { + return n + } + + // no ? in batched queries but column are separated with "," + // except for the last one + return strings.Count(t.query, ",") + 1 } func (t *testClickhouseDriverStmt) Exec(args []driver.Value) (driver.Result, error) { diff --git a/exporter/clickhouseexporter/exporter_metrics.go b/exporter/clickhouseexporter/exporter_metrics.go index 6f11ba940d575..64c3f5b6f8a87 100644 --- a/exporter/clickhouseexporter/exporter_metrics.go +++ b/exporter/clickhouseexporter/exporter_metrics.go @@ -24,7 +24,7 @@ type metricsExporter struct { } func newMetricsExporter(logger *zap.Logger, cfg *Config) (*metricsExporter, error) { - client, err := newClickhouseClient(cfg) + client, err := newClickHouseClient(cfg) if err != nil { return nil, err } @@ -63,7 +63,11 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric metricsMap := internal.NewMetricsModel(e.cfg.MetricsTableName) for i := 0; i < md.ResourceMetrics().Len(); i++ { metrics := md.ResourceMetrics().At(i) - resAttr := attributesToMap(metrics.Resource().Attributes()) + res := metrics.Resource() + + resAttr := make(map[string]string, res.Attributes().Len()) + attributesToMap(metrics.Resource().Attributes(), resAttr) + for j := 0; j < metrics.ScopeMetrics().Len(); j++ { rs := metrics.ScopeMetrics().At(j).Metrics() scopeInstr := metrics.ScopeMetrics().At(j).Scope() diff --git a/exporter/clickhouseexporter/exporter_traces.go b/exporter/clickhouseexporter/exporter_traces.go index ff2aafb82f148..03baab0523b90 100644 --- a/exporter/clickhouseexporter/exporter_traces.go +++ b/exporter/clickhouseexporter/exporter_traces.go @@ -28,7 +28,7 @@ type tracesExporter struct { } func newTracesExporter(logger *zap.Logger, cfg *Config) (*tracesExporter, error) { - client, err := newClickhouseClient(cfg) + client, err := newClickHouseClient(cfg) if err != nil { return nil, err } @@ -63,8 +63,8 @@ func (e *tracesExporter) shutdown(_ context.Context) error { func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error { start := time.Now() - err := doWithTx(ctx, e.client, func(tx *sql.Tx) error { - statement, err := tx.PrepareContext(ctx, e.insertSQL) + err := func() error { + statement, err := e.client.PrepareContext(ctx, e.insertSQL) if err != nil { return fmt.Errorf("PrepareContext:%w", err) } @@ -74,7 +74,10 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er for i := 0; i < td.ResourceSpans().Len(); i++ { spans := td.ResourceSpans().At(i) res := spans.Resource() - resAttr := attributesToMap(res.Attributes()) + attr := res.Attributes() + resAttr := make(map[string]string, attr.Len()) + attributesToMap(attr, resAttr) + var serviceName string if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok { serviceName = v.Str() @@ -85,7 +88,9 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er scopeVersion := spans.ScopeSpans().At(j).Scope().Version() for k := 0; k < rs.Len(); k++ { r := rs.At(k) - spanAttr := attributesToMap(r.Attributes()) + + spanAttr := make(map[string]string, res.Attributes().Len()) + attributesToMap(r.Attributes(), spanAttr) status := r.Status() eventTimes, eventNames, eventAttrs := convertEvents(r.Events()) linksTraceIDs, linksSpanIDs, linksTraceStates, linksAttrs := convertLinks(r.Links()) @@ -120,7 +125,7 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er } } return nil - }) + }() duration := time.Since(start) e.logger.Debug("insert traces", zap.Int("records", td.SpanCount()), zap.String("cost", duration.String())) @@ -128,33 +133,38 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er } func convertEvents(events ptrace.SpanEventSlice) ([]time.Time, []string, []map[string]string) { - var ( - times []time.Time - names []string - attrs []map[string]string - ) + times := make([]time.Time, events.Len()) + names := make([]string, events.Len()) + attrs := make([]map[string]string, events.Len()) + for i := 0; i < events.Len(); i++ { event := events.At(i) times = append(times, event.Timestamp().AsTime()) names = append(names, event.Name()) - attrs = append(attrs, attributesToMap(event.Attributes())) + + eventAttrs := event.Attributes() + dest := make(map[string]string, eventAttrs.Len()) + attributesToMap(eventAttrs, dest) + attrs = append(attrs, dest) } return times, names, attrs } func convertLinks(links ptrace.SpanLinkSlice) ([]string, []string, []string, []map[string]string) { - var ( - traceIDs []string - spanIDs []string - states []string - attrs []map[string]string - ) + traceIDs := make([]string, links.Len()) + spanIDs := make([]string, links.Len()) + states := make([]string, links.Len()) + attrs := make([]map[string]string, links.Len()) + for i := 0; i < links.Len(); i++ { link := links.At(i) traceIDs = append(traceIDs, traceutil.TraceIDToHexOrEmptyString(link.TraceID())) spanIDs = append(spanIDs, traceutil.SpanIDToHexOrEmptyString(link.SpanID())) states = append(states, link.TraceState().AsRaw()) - attrs = append(attrs, attributesToMap(link.Attributes())) + + linkAttrs := link.Attributes() + dest := make(map[string]string, linkAttrs.Len()) + attrs = append(attrs, dest) } return traceIDs, spanIDs, states, attrs }