Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump golang.org/x/net from 0.14.0 to 0.17.0 in /exporter/parquetexporter #94

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
480 changes: 480 additions & 0 deletions .github/workflows/build-test-publish.yaml

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions cmd/otelcontribcol/processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func TestDefaultProcessors(t *testing.T) {
processor: "k8sattributes",
skipLifecycle: true, // Requires a k8s API to communicate with
},
{
processor: "logstransform",
skipLifecycle: true,
},
{
processor: "memory_limiter",
getConfigFn: func() component.Config {
Expand Down
213 changes: 152 additions & 61 deletions exporter/clickhouseexporter/exporter_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"context"
"database/sql"
"fmt"
"net/url"
"strings"
"time"

_ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver.
"github.com/ClickHouse/clickhouse-go/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -28,7 +30,7 @@ type logsExporter struct {
}

func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) {
client, err := newClickhouseClient(cfg)
client, err := newClickHouseConn(cfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -57,35 +59,72 @@ func (e *logsExporter) shutdown(_ context.Context) error {
return nil
}

func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
func (e *logsExporter) pushLogsData(_ context.Context, ld plog.Logs) error {
start := time.Now()
err := doWithTx(ctx, e.client, func(tx *sql.Tx) error {
statement, err := tx.PrepareContext(ctx, e.insertSQL)
err := func() error {
scope, err := e.client.Begin()
if err != nil {
return fmt.Errorf("PrepareContext:%w", err)
return fmt.Errorf("Begin:%w", err)
}
defer func() {
_ = statement.Close()
}()

batch, err := scope.Prepare(e.insertSQL)
if err != nil {
return fmt.Errorf("Prepare:%w", err)
}

var serviceName string
for i := 0; i < ld.ResourceLogs().Len(); i++ {
logs := ld.ResourceLogs().At(i)
var podName string
var containerName string
var region string
var cloudProvider string
var cell string

resAttr := make(map[string]string)

resourceLogs := ld.ResourceLogs()
for i := 0; i < resourceLogs.Len(); i++ {
logs := resourceLogs.At(i)
res := logs.Resource()
resURL := logs.SchemaUrl()
resAttr := attributesToMap(res.Attributes())
if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok {
serviceName = v.Str()
}

attrs := res.Attributes()
attributesToMap(attrs, resAttr)

attrs.Range(func(key string, value pcommon.Value) bool {
switch key {
case conventions.AttributeServiceName:
serviceName = value.Str()
case conventions.AttributeK8SPodName:
podName = value.AsString()
case conventions.AttributeK8SContainerName:
containerName = value.AsString()
// TODO use AttributeCloudRegion 'cloud.region'
// https://github.com/ClickHouse/data-plane-application/issues/4155
case "region":
fallthrough
case conventions.AttributeCloudRegion:
region = value.AsString()
case conventions.AttributeCloudProvider:
cloudProvider = value.AsString()
case "cell":
cell = value.AsString()
}
return true
})
for j := 0; j < logs.ScopeLogs().Len(); j++ {
rs := logs.ScopeLogs().At(j).LogRecords()
scopeURL := logs.ScopeLogs().At(j).SchemaUrl()
scopeName := logs.ScopeLogs().At(j).Scope().Name()
scopeVersion := logs.ScopeLogs().At(j).Scope().Version()
scopeAttr := attributesToMap(logs.ScopeLogs().At(j).Scope().Attributes())
scopeAttr := make(map[string]string, attrs.Len())
attributesToMap(logs.ScopeLogs().At(j).Scope().Attributes(), scopeAttr)
for k := 0; k < rs.Len(); k++ {
r := rs.At(k)
logAttr := attributesToMap(r.Attributes())
_, err = statement.ExecContext(ctx,

logAttr := make(map[string]string, attrs.Len())
attributesToMap(r.Attributes(), logAttr)

_, err = batch.Exec(
r.Timestamp().AsTime(),
traceutil.TraceIDToHexOrEmptyString(r.TraceID()),
traceutil.SpanIDToHexOrEmptyString(r.SpanID()),
Expand All @@ -95,6 +134,11 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
serviceName,
r.Body().AsString(),
resURL,
podName,
containerName,
region,
cloudProvider,
cell,
resAttr,
scopeURL,
scopeName,
Expand All @@ -103,26 +147,31 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
logAttr,
)
if err != nil {
return fmt.Errorf("ExecContext:%w", err)
return fmt.Errorf("Append:%w", err)
}
}
}

// clear map for reuse
for k := range resAttr {
delete(resAttr, k)
}
}
return nil
})

return scope.Commit()
}()

duration := time.Since(start)
e.logger.Debug("insert logs", zap.Int("records", ld.LogRecordCount()),
zap.String("cost", duration.String()))
return err
}

func attributesToMap(attributes pcommon.Map) map[string]string {
m := make(map[string]string, attributes.Len())
func attributesToMap(attributes pcommon.Map, dest map[string]string) {
attributes.Range(func(k string, v pcommon.Value) bool {
m[k] = v.AsString()
dest[k] = v.AsString()
return true
})
return m
}

const (
Expand All @@ -136,7 +185,12 @@ CREATE TABLE IF NOT EXISTS %s (
SeverityText LowCardinality(String) CODEC(ZSTD(1)),
SeverityNumber Int32 CODEC(ZSTD(1)),
ServiceName LowCardinality(String) CODEC(ZSTD(1)),
Body String CODEC(ZSTD(1)),
Body LowCardinality(String) CODEC(ZSTD(1)),
PodName LowCardinality(String),
ContainerName LowCardinality(String),
Region LowCardinality(String),
CloudProvider LowCardinality(String),
Cell LowCardinality(String),
ResourceSchemaUrl String CODEC(ZSTD(1)),
ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
ScopeSchemaUrl String CODEC(ZSTD(1)),
Expand All @@ -154,10 +208,11 @@ CREATE TABLE IF NOT EXISTS %s (
INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1
) ENGINE MergeTree()
%s
PARTITION BY toDate(Timestamp)
ORDER BY (ServiceName, SeverityText, toUnixTimestamp(Timestamp), TraceId)
PARTITION BY toYYYYMM(Timestamp)
ORDER BY (PodName, ContainerName, SeverityText, Timestamp)
SETTINGS index_granularity=8192, ttl_only_drop_parts = 1;
`

// language=ClickHouse SQL
insertLogsSQLTemplate = `INSERT INTO %s (
Timestamp,
Expand All @@ -169,42 +224,89 @@ SETTINGS index_granularity=8192, ttl_only_drop_parts = 1;
ServiceName,
Body,
ResourceSchemaUrl,
PodName,
ContainerName,
Region,
CloudProvider,
Cell,
ResourceAttributes,
ScopeSchemaUrl,
ScopeName,
ScopeVersion,
ScopeAttributes,
LogAttributes
)`
inlineinsertLogsSQLTemplate = `INSERT INTO %s SETTINGS async_insert=1, wait_for_async_insert=0 (
Timestamp,
TraceId,
SpanId,
TraceFlags,
SeverityText,
SeverityNumber,
ServiceName,
Body,
ResourceSchemaUrl,
PodName,
ContainerName,
Region,
CloudProvider,
Cell,
ResourceAttributes,
ScopeSchemaUrl,
ScopeName,
ScopeVersion,
ScopeAttributes,
LogAttributes
) VALUES (
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?
)`
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
)

var driverName = "clickhouse" // for testing

// newClickhouseClient create a clickhouse client.
func newClickhouseClient(cfg *Config) (*sql.DB, error) {
// newClickHouseClient create a clickhouse client.
// used by metrics and traces:
func newClickHouseClient(cfg *Config) (*sql.DB, error) {
db, err := cfg.buildDB(cfg.Database)
if err != nil {
return nil, err
}
return db, nil
}

// used by logs:
func newClickHouseConn(cfg *Config) (*sql.DB, error) {
endpoint := cfg.Endpoint

if len(cfg.ConnectionParams) > 0 {
values := make(url.Values, len(cfg.ConnectionParams))
for k, v := range cfg.ConnectionParams {
values.Add(k, v)
}

if !strings.Contains(endpoint, "?") {
endpoint += "?"
} else if !strings.HasSuffix(endpoint, "&") {
endpoint += "&"
}

endpoint += values.Encode()
}

opts, err := clickhouse.ParseDSN(endpoint)
if err != nil {
return nil, fmt.Errorf("unable to parse endpoint: %w", err)
}

opts.Auth = clickhouse.Auth{
Database: cfg.Database,
Username: cfg.Username,
Password: string(cfg.Password),
}

// can return a "bad" connection if misconfigured, we won't know
// until a Ping, Exec, etc.. is done
return clickhouse.OpenDB(opts), nil
}

func createDatabase(ctx context.Context, cfg *Config) error {
// use default database to create new database
if cfg.Database == defaultDatabase {
Expand Down Expand Up @@ -242,19 +344,8 @@ func renderCreateLogsTableSQL(cfg *Config) string {
}

func renderInsertLogsSQL(cfg *Config) string {
return fmt.Sprintf(insertLogsSQLTemplate, cfg.LogsTableName)
}

func doWithTx(_ context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("db.Begin: %w", err)
if strings.HasPrefix(cfg.Endpoint, "tcp") && cfg.ConnectionParams["async_insert"] == "1" {
return fmt.Sprintf(inlineinsertLogsSQLTemplate, cfg.LogsTableName)
}
defer func() {
_ = tx.Rollback()
}()
if err := fn(tx); err != nil {
return err
}
return tx.Commit()
return fmt.Sprintf(insertLogsSQLTemplate, cfg.LogsTableName)
}
Loading