Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
package dbmodel

import (
"bytes"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"

"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -200,7 +202,29 @@ func AttributesGroupToMap(group AttributesGroup) (pcommon.Map, error) {
for i := range group.StrKeys {
key := group.StrKeys[i]
value := group.StrValues[i]
result.PutStr(key, value)

var parsed interface{}
decoder := json.NewDecoder(bytes.NewReader([]byte(value)))
decoder.UseNumber()

if err := decoder.Decode(&parsed); err == nil {
switch v := parsed.(type) {
case map[string]interface{}:
m := result.PutEmptyMap(key)
if err := interfaceToPcommonMap(v, m); err != nil {
return result, err
}
case []interface{}:
s := result.PutEmptySlice(key)
if err := interfaceToPcommonSlice(v, s); err != nil {
return result, err
}
default:
result.PutStr(key, value)
}
} else {
result.PutStr(key, value)
}
}
for i := range group.BytesKeys {
key := group.BytesKeys[i]
Expand All @@ -213,3 +237,50 @@ func AttributesGroupToMap(group AttributesGroup) (pcommon.Map, error) {
}
return result, nil
}

func interfaceToPcommonMap(data map[string]interface{}, m pcommon.Map) error {
for k, v := range data {
val := m.PutEmpty(k)
if err := interfaceToPcommonValue(v, val); err != nil {
return err
}
}
return nil
}

func interfaceToPcommonSlice(data []interface{}, s pcommon.Slice) error {
s.EnsureCapacity(len(data))
for _, item := range data {
val := s.AppendEmpty()
if err := interfaceToPcommonValue(item, val); err != nil {
return err
}
}
return nil
}

func interfaceToPcommonValue(data interface{}, val pcommon.Value) error {
switch v := data.(type) {
case bool:
val.SetBool(v)
case json.Number:
if i, err := v.Int64(); err == nil {
val.SetInt(i)
} else if f, err := v.Float64(); err == nil {
val.SetDouble(f)
}
case float64:
val.SetDouble(v)
case string:
val.SetStr(v)
case []byte:
val.SetEmptyBytes().FromRaw(v)
case map[string]interface{}:
m := val.SetEmptyMap()
return interfaceToPcommonMap(v, m)
case []interface{}:
s := val.SetEmptySlice()
return interfaceToPcommonSlice(v, s)
}
return nil
}
89 changes: 82 additions & 7 deletions internal/storage/v2/clickhouse/tracestore/dbmodel/to_dbmodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package dbmodel
import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"time"

"github.com/ClickHouse/ch-go/proto"
Expand Down Expand Up @@ -180,16 +182,89 @@ func attributesToMap(attrs pcommon.Map) map[pcommon.ValueType]map[string]pcommon
} {
result[valueType] = make(map[string]pcommon.Value)
}
// Fill according to the data type of the value
for k, v := range attrs.All() {
typ := v.Type()
// For basic data types (such as bool, uint64, and float64) we can make sure type safe.
// TODO: For non-basic types (such as Map, Slice), they should be serialized and stored as OTLP/JSON strings
result[typ][k] = v
}

attrs.Range(func(k string, v pcommon.Value) bool {
switch v.Type() {
case pcommon.ValueTypeMap:
jsonStr, err := marshalValueToJSON(v)
if err == nil {
result[ValueTypeStr][k] = pcommon.NewValueStr(jsonStr)
}
case pcommon.ValueTypeSlice:
jsonStr, err := marshalValueToJSON(v)
if err == nil {
result[ValueTypeStr][k] = pcommon.NewValueStr(jsonStr)
}
default:
typ := v.Type()
if _, exists := result[typ]; exists {
result[typ][k] = v
}
}
return true
})
return result
}

func marshalValueToJSON(v pcommon.Value) (string, error) {
var val interface{}
switch v.Type() {
case pcommon.ValueTypeMap:
val = valueToInterface(v.Map())
case pcommon.ValueTypeSlice:
val = valueToInterface(v.Slice())
default:
return "", fmt.Errorf("unsupported type for JSON serialization: %s", v.Type())
}

jsonBytes, err := json.Marshal(val)
if err != nil {
return "", err
}
return string(jsonBytes), nil
}

func valueToInterface(val interface{}) interface{} {
switch v := val.(type) {
case pcommon.Map:
m := make(map[string]interface{})
v.Range(func(k string, val pcommon.Value) bool {
m[k] = pcommonValueToInterface(val)
return true
})
return m
case pcommon.Slice:
s := make([]interface{}, v.Len())
for i := 0; i < v.Len(); i++ {
s[i] = pcommonValueToInterface(v.At(i))
}
return s
default:
return val
}
}

func pcommonValueToInterface(v pcommon.Value) interface{} {
switch v.Type() {
case pcommon.ValueTypeBool:
return v.Bool()
case pcommon.ValueTypeDouble:
return v.Double()
case pcommon.ValueTypeInt:
return v.Int()
case pcommon.ValueTypeStr:
return v.Str()
case pcommon.ValueTypeBytes:
return v.Bytes().AsRaw()
case pcommon.ValueTypeMap:
return valueToInterface(v.Map())
case pcommon.ValueTypeSlice:
return valueToInterface(v.Slice())
default:
return nil
}
}

// AttributeColumnPair maps Attribute/Attributes to table init. Instead of directly storing the entire Attribute/Attributes into a single independent Column,
// it splits them based on the value type.
// Assuming the value type here is string (since the key is always string, there's no need to consider it separately).
Expand Down