Skip to content

Commit b214ae3

Browse files
authored
Merge pull request #65 from metrico/pre-release
Pre release
2 parents 07da0ba + 2b41288 commit b214ae3

File tree

8 files changed

+263
-134
lines changed

8 files changed

+263
-134
lines changed

exporter/clickhouseprofileexporter/ch/access_native_columnar.go

+105-103
Original file line numberDiff line numberDiff line change
@@ -71,146 +71,148 @@ func valueToStringArray(v pcommon.Value) ([]string, error) {
7171
}
7272

7373
// Inserts a profile batch into the clickhouse server using columnar native protocol
74-
func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error {
74+
func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error) {
7575
b, err := ch.conn.PrepareBatch(context.Background(), "INSERT INTO profiles_input")
7676
if err != nil {
77-
return fmt.Errorf("failed to prepare batch: %w", err)
78-
}
77+
return 0, fmt.Errorf("failed to prepare batch: %w", err)
78+
}
79+
80+
// this implementation is tightly coupled to how pyroscope-java and pyroscopereceiver work
81+
timestamp_ns := make([]uint64, 0)
82+
typ := make([]string, 0)
83+
service_name := make([]string, 0)
84+
values_agg := make([][]tuple, 0)
85+
sample_types_units := make([][]tuple, 0)
86+
period_type := make([]string, 0)
87+
period_unit := make([]string, 0)
88+
tags := make([][]tuple, 0)
89+
duration_ns := make([]uint64, 0)
90+
payload_type := make([]string, 0)
91+
payload := make([][]byte, 0)
7992

80-
// this implementation is tightly coupled to how pyroscope-java and pyroscopereciver work,
81-
// specifically receiving a single profile at a time from the agent,
82-
// and thus each batched resource logs slice contains a single log record
8393
rl := ls.ResourceLogs()
84-
sz := rl.Len()
85-
86-
timestamp_ns := make([]uint64, sz)
87-
typ := make([]string, sz)
88-
service_name := make([]string, sz)
89-
values_agg := make([][]tuple, sz)
90-
sample_types_units := make([][]tuple, sz)
91-
period_type := make([]string, sz)
92-
period_unit := make([]string, sz)
93-
tags := make([][]tuple, sz)
94-
duration_ns := make([]uint64, sz)
95-
payload_type := make([]string, sz)
96-
payload := make([][]byte, sz)
97-
9894
var (
99-
r plog.LogRecord
100-
m pcommon.Map
101-
tmp pcommon.Value
102-
tm map[string]any
95+
lr plog.LogRecordSlice
96+
r plog.LogRecord
97+
m pcommon.Map
98+
tmp pcommon.Value
99+
tm map[string]any
100+
offset int
101+
s int
102+
idx int
103103
)
104-
for i := 0; i < sz; i++ {
105-
r = rl.At(i).ScopeLogs().At(0).LogRecords().At(0)
106-
m = r.Attributes()
107-
timestamp_ns[i] = uint64(r.Timestamp())
108-
109-
tmp, _ = m.Get(columnType)
110-
typ[i] = tmp.AsString()
111-
112-
tmp, _ = m.Get(columnServiceName)
113-
service_name[i] = tmp.AsString()
114-
115-
sample_types, _ := m.Get("sample_types")
116-
117-
sample_units, _ := m.Get("sample_units")
118-
119-
sample_types_array, err := valueToStringArray(sample_types)
120-
if err != nil {
121-
return err
122-
}
123-
124-
sample_units_array, err := valueToStringArray(sample_units)
125-
if err != nil {
126-
return err
127-
}
128-
129-
values_agg_raw, ok := m.Get(columnValuesAgg)
130-
if ok {
131-
values_agg_tuple, err := valueAggToTuple(&values_agg_raw)
104+
for i := 0; i < rl.Len(); i++ {
105+
lr = rl.At(i).ScopeLogs().At(0).LogRecords()
106+
for s = 0; s < lr.Len(); s++ {
107+
r = lr.At(s)
108+
m = r.Attributes()
109+
timestamp_ns = append(timestamp_ns, uint64(r.Timestamp()))
110+
111+
tmp, _ = m.Get(columnType)
112+
typ = append(typ, tmp.AsString())
113+
114+
tmp, _ = m.Get(columnServiceName)
115+
service_name = append(service_name, tmp.AsString())
116+
117+
sample_types, _ := m.Get("sample_types")
118+
sample_units, _ := m.Get("sample_units")
119+
sample_types_array, err := valueToStringArray(sample_types)
132120
if err != nil {
133-
return err
121+
return 0, err
134122
}
135-
values_agg[i] = append(values_agg[i], values_agg_tuple...)
136-
}
137-
138-
sample_types_units_item := make([]tuple, len(sample_types_array))
139-
for i, v := range sample_types_array {
140-
141-
sample_types_units_item[i] = tuple{v, sample_units_array[i]}
142-
}
143-
sample_types_units[i] = sample_types_units_item
144-
tmp, _ = m.Get(columnPeriodType)
145-
period_type[i] = tmp.AsString()
146-
147-
tmp, _ = m.Get(columnPeriodUnit)
148-
period_unit[i] = tmp.AsString()
149-
150-
tmp, _ = m.Get(columnTags)
151-
tm = tmp.Map().AsRaw()
152-
tag, j := make([]tuple, len(tm)), 0
153-
for k, v := range tm {
154-
tag[j] = tuple{k, v.(string)}
155-
j++
156-
}
157-
tags[i] = tag
158-
159-
tmp, _ = m.Get(columnDurationNs)
160-
duration_ns[i], _ = strconv.ParseUint(tmp.Str(), 10, 64)
123+
sample_units_array, err := valueToStringArray(sample_units)
124+
if err != nil {
125+
return 0, err
126+
}
127+
values_agg_raw, ok := m.Get(columnValuesAgg)
128+
if ok {
129+
values_agg_tuple, err := valueAggToTuple(&values_agg_raw)
130+
if err != nil {
131+
return 0, err
132+
}
133+
values_agg = append(values_agg, values_agg_tuple)
134+
}
135+
sample_types_units_item := make([]tuple, len(sample_types_array))
136+
for i, v := range sample_types_array {
137+
sample_types_units_item[i] = tuple{v, sample_units_array[i]}
138+
}
139+
sample_types_units = append(sample_types_units, sample_types_units_item)
161140

162-
tmp, _ = m.Get(columnPayloadType)
163-
payload_type[i] = tmp.AsString()
141+
tmp, _ = m.Get(columnPeriodType)
142+
period_type = append(period_type, tmp.AsString())
164143

165-
payload[i] = r.Body().Bytes().AsRaw()
144+
tmp, _ = m.Get(columnPeriodUnit)
145+
period_unit = append(period_unit, tmp.AsString())
166146

167-
ch.logger.Debug(
168-
fmt.Sprintf("batch insert prepared row %d", i),
169-
zap.Uint64(columnTimestampNs, timestamp_ns[i]),
170-
zap.String(columnType, typ[i]),
171-
zap.String(columnServiceName, service_name[i]),
172-
zap.String(columnPeriodType, period_type[i]),
173-
zap.String(columnPeriodUnit, period_unit[i]),
174-
zap.String(columnPayloadType, payload_type[i]),
175-
)
147+
tmp, _ = m.Get(columnTags)
148+
tm = tmp.Map().AsRaw()
149+
tag, j := make([]tuple, len(tm)), 0
150+
for k, v := range tm {
151+
tag[j] = tuple{k, v.(string)}
152+
j++
153+
}
154+
tags = append(tags, tag)
155+
156+
tmp, _ = m.Get(columnDurationNs)
157+
dur, _ := strconv.ParseUint(tmp.Str(), 10, 64)
158+
duration_ns = append(duration_ns, dur)
159+
160+
tmp, _ = m.Get(columnPayloadType)
161+
payload_type = append(payload_type, tmp.AsString())
162+
163+
payload = append(payload, r.Body().Bytes().AsRaw())
164+
165+
idx = offset + s
166+
ch.logger.Debug(
167+
fmt.Sprintf("batch insert prepared row %d", idx),
168+
zap.Uint64(columnTimestampNs, timestamp_ns[idx]),
169+
zap.String(columnType, typ[idx]),
170+
zap.String(columnServiceName, service_name[idx]),
171+
zap.String(columnPeriodType, period_type[idx]),
172+
zap.String(columnPeriodUnit, period_unit[idx]),
173+
zap.Any(columnSampleTypesUnits, sample_types_units[idx]),
174+
zap.String(columnPayloadType, payload_type[idx]),
175+
)
176+
}
177+
offset += s
176178
}
177179

178180
// column order here should match table column order
179181
if err := b.Column(0).Append(timestamp_ns); err != nil {
180-
return err
182+
return 0, err
181183
}
182184
if err := b.Column(1).Append(typ); err != nil {
183-
return err
185+
return 0, err
184186
}
185187
if err := b.Column(2).Append(service_name); err != nil {
186-
return err
188+
return 0, err
187189
}
188190
if err := b.Column(3).Append(sample_types_units); err != nil {
189-
return err
191+
return 0, err
190192
}
191193
if err := b.Column(4).Append(period_type); err != nil {
192-
return err
194+
return 0, err
193195
}
194196
if err := b.Column(5).Append(period_unit); err != nil {
195-
return err
197+
return 0, err
196198
}
197199
if err := b.Column(6).Append(tags); err != nil {
198-
return err
200+
return 0, err
199201
}
200202
if err := b.Column(7).Append(duration_ns); err != nil {
201-
return err
203+
return 0, err
202204
}
203205
if err := b.Column(8).Append(payload_type); err != nil {
204-
return err
206+
return 0, err
205207
}
206208

207209
if err := b.Column(9).Append(payload); err != nil {
208-
return err
210+
return 0, err
209211
}
210212
if err := b.Column(10).Append(values_agg); err != nil {
211-
return err
213+
return 0, err
212214
}
213-
return b.Send()
215+
return offset, b.Send()
214216
}
215217

216218
// Closes the clickhouse connection pool

exporter/clickhouseprofileexporter/exporter.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type clickhouseProfileExporter struct {
3030

3131
type clickhouseAccess interface {
3232
// Inserts a profile batch into the clickhouse server
33-
InsertBatch(profiles plog.Logs) error
33+
InsertBatch(profiles plog.Logs) (int, error)
3434

3535
// Shuts down the clickhouse connection
3636
Shutdown() error
@@ -63,13 +63,14 @@ func newClickhouseProfileExporter(ctx context.Context, set *exporter.CreateSetti
6363
// Sends the profiles to clickhouse server using the configured connection
6464
func (exp *clickhouseProfileExporter) send(ctx context.Context, logs plog.Logs) error {
6565
start := time.Now().UnixMilli()
66-
if err := exp.ch.InsertBatch(logs); err != nil {
66+
sz, err := exp.ch.InsertBatch(logs)
67+
if err != nil {
6768
otelcolExporterClickhouseProfileBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError)))
6869
exp.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
6970
return err
7071
}
7172
otelcolExporterClickhouseProfileBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess)))
72-
exp.logger.Info("inserted batch", zap.Int("size", logs.ResourceLogs().Len()))
73+
exp.logger.Info("inserted batch", zap.Int("size", sz))
7374
return nil
7475
}
7576

receiver/pyroscopereceiver/jfrparser/parser.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata) ([
8181
case pa.jfrParser.TypeMap.T_EXECUTION_SAMPLE:
8282
values[0] = 1 * int64(period)
8383
ts := pa.jfrParser.GetThreadState(pa.jfrParser.ExecutionSample.State)
84-
if ts != nil && ts.Name == "STATE_RUNNABLE" {
84+
if ts != nil && ts.Name != "STATE_SLEEPING" {
8585
pa.addStacktrace(sampleTypeCpu, pa.jfrParser.ExecutionSample.StackTrace, values[:1])
8686
}
8787
// TODO: this code is from github/grafana/pyroscope, need to validate that the qryn.js query simulator handles this branch as expected for wall
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package pprofparser
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
7+
pprof_proto "github.com/google/pprof/profile"
8+
profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types"
9+
)
10+
11+
type sampleType uint8
12+
13+
const (
14+
sampleTypeCpu sampleType = iota
15+
sampleTypeCount
16+
)
17+
18+
type profileWrapper struct {
19+
pprof *pprof_proto.Profile
20+
prof profile_types.ProfileIR
21+
}
22+
23+
type pProfParser struct {
24+
proftab [sampleTypeCount]*profileWrapper // <sample type, (profile, pprof)>
25+
samptab [sampleTypeCount]map[uint32]uint32 // <extern jfr stacktrace id,matching pprof sample array index>
26+
loctab [sampleTypeCount]map[uint32]*pprof_proto.Location // <extern jfr funcid, pprof location>
27+
}
28+
29+
// Creates a pprof parser that parse the accepted jfr buffer
30+
func NewPprofParser() *pProfParser {
31+
return &pProfParser{}
32+
}
33+
34+
func (pa *pProfParser) Parse(data *bytes.Buffer, md profile_types.Metadata) ([]profile_types.ProfileIR, error) {
35+
// Parse pprof data
36+
pProfData, err := pprof_proto.Parse(data)
37+
if err != nil {
38+
return nil, err
39+
}
40+
41+
// Process pprof data and create SampleType slice
42+
var sampleTypes []string
43+
var sampleUnits []string
44+
var valueAggregates []profile_types.SampleType
45+
46+
for i, st := range pProfData.SampleType {
47+
sampleTypes = append(sampleTypes, pProfData.SampleType[i].Type)
48+
sampleUnits = append(sampleUnits, pProfData.SampleType[i].Unit)
49+
sum, count := calculateSumAndCount(pProfData, i)
50+
valueAggregates = append(valueAggregates, profile_types.SampleType{fmt.Sprintf("%s:%s", st.Type, st.Unit), sum, count})
51+
}
52+
53+
var profiles []profile_types.ProfileIR
54+
var profileType string
55+
56+
switch pProfData.PeriodType.Type {
57+
case "cpu":
58+
profileType = "process_cpu"
59+
case "wall":
60+
profileType = "wall"
61+
case "mutex", "contentions":
62+
profileType = "mutex"
63+
case "goroutine":
64+
profileType = "goroutines"
65+
case "objects", "space", "alloc", "inuse":
66+
profileType = "memory"
67+
case "block":
68+
profileType = "block"
69+
}
70+
71+
profileTypeInfo := profile_types.ProfileType{
72+
PeriodType: pProfData.PeriodType.Type,
73+
PeriodUnit: pProfData.PeriodType.Unit,
74+
SampleType: sampleTypes,
75+
SampleUnit: sampleUnits,
76+
Type: profileType,
77+
}
78+
79+
// Create a new ProfileIR instance
80+
profile := profile_types.ProfileIR{
81+
ValueAggregation: valueAggregates,
82+
Type: profileTypeInfo,
83+
}
84+
profile.Payload = new(bytes.Buffer)
85+
pProfData.WriteUncompressed(profile.Payload)
86+
// Append the profile to the result
87+
profiles = append(profiles, profile)
88+
return profiles, nil
89+
}
90+
91+
func calculateSumAndCount(samples *pprof_proto.Profile, sampleTypeIndex int) (int64, int32) {
92+
var sum int64
93+
count := int32(len(samples.Sample))
94+
for _, sample := range samples.Sample {
95+
// Check if the sample has a value for the specified sample type
96+
if sampleTypeIndex < len(sample.Value) {
97+
// Accumulate the value for the specified sample type
98+
sum += sample.Value[sampleTypeIndex]
99+
}
100+
}
101+
102+
return sum, count
103+
}

0 commit comments

Comments
 (0)