Skip to content

Commit dd4b63b

Browse files
committed
support octet/binary-stream
1 parent 283b4e8 commit dd4b63b

File tree

1 file changed

+61
-15
lines changed

1 file changed

+61
-15
lines changed

receiver/pyroscopereceiver/receiver.go

+61-15
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"compress/gzip"
66
"context"
7+
"encoding/hex"
78
"errors"
89
"fmt"
910
"io"
@@ -324,6 +325,51 @@ func (r *pyroscopeReceiver) Push(ctx context.Context, req *connect.Request[pushv
324325
return connect.NewResponse(&pushv1.PushResponse{}), nil
325326
}
326327

328+
func (r *pyroscopeReceiver) getProfilesBuff(req *http.Request) (*bytes.Buffer, error) {
329+
var err error
330+
var buf *bytes.Buffer
331+
defer func() {
332+
if err != nil && buf != nil {
333+
releaseBuf(r.uncompressedBufPool, buf)
334+
}
335+
}()
336+
contentType := ""
337+
if len(req.Header["Content-Type"]) > 0 {
338+
contentType = req.Header["Content-Type"][0]
339+
}
340+
if strings.HasPrefix(contentType, "multipart/form-data") {
341+
var f multipart.File
342+
f, err = r.openMultipart(req)
343+
if err != nil {
344+
fmt.Println(req.URL.String())
345+
for k, v := range req.Header {
346+
fmt.Printf("Header: %s: %v", k, v)
347+
}
348+
b, _ := io.ReadAll(req.Body)
349+
//TODO: encode b to hex
350+
fmt.Printf("Body: %s\n", hex.EncodeToString(b))
351+
return nil, err
352+
}
353+
defer f.Close()
354+
355+
buf = acquireBuf(r.uncompressedBufPool)
356+
err = r.decompressor.Decompress(f, compress.Gzip, buf)
357+
if err != nil {
358+
return nil, fmt.Errorf("failed to decompress body: %w", err)
359+
}
360+
return buf, nil
361+
}
362+
if strings.HasPrefix(contentType, "binary/octet-stream") {
363+
buf = acquireBuf(r.uncompressedBufPool)
364+
_, err = io.Copy(buf, req.Body)
365+
if err != nil {
366+
return buf, fmt.Errorf("failed to read body: %w", err)
367+
}
368+
return buf, nil
369+
}
370+
return nil, fmt.Errorf("unsupported content type: %s", contentType)
371+
}
372+
327373
func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, pm params) (plog.Logs, error) {
328374
var (
329375
tmp []string
@@ -342,21 +388,14 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request,
342388
p = pprofparser.NewPprofParser()
343389
}
344390
// support only multipart/form-data
345-
f, err := r.openMultipart(req)
391+
buf, err := r.getProfilesBuff(req)
346392
if err != nil {
347393
return logs, err
348394
}
349-
defer f.Close()
350-
351-
buf := acquireBuf(r.uncompressedBufPool)
352395
defer func() {
353396
releaseBuf(r.uncompressedBufPool, buf)
354397
}()
355398

356-
err = r.decompressor.Decompress(f, compress.Gzip, buf)
357-
if err != nil {
358-
return logs, fmt.Errorf("failed to decompress body: %w", err)
359-
}
360399
// TODO: try measure compressed size
361400
otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes.Record(ctx, int64(buf.Len()), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatJfr, "")))
362401
resetHeaders(req)
@@ -384,15 +423,13 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request,
384423
record := rs.AppendEmpty()
385424
if tmp, ok = qs["format"]; ok && (tmp[0] == "jfr") {
386425
timestampNs = ns(pm.start)
387-
durationNs = pm.end - pm.start
388-
durationNs = ns(durationNs)
426+
durationNs = ns(pm.end) - ns(pm.start)
389427
} else if tmp, ok = qs["spyName"]; ok && (tmp[0] == "nodespy") {
390428
timestampNs = uint64(pr.TimeStampNao)
391429
durationNs = uint64(pr.DurationNano)
392430
} else {
393-
timestampNs = pm.start
394-
durationNs = pm.end - pm.start
395-
durationNs = ns(durationNs)
431+
timestampNs = ns(pm.start)
432+
durationNs = ns(pm.end) - ns(pm.start)
396433
}
397434
record.SetTimestamp(pcommon.Timestamp(timestampNs))
398435
m := record.Attributes()
@@ -409,7 +446,7 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request,
409446
postProcessProf(pr.Profile, &m)
410447
record.Body().SetEmptyBytes().FromRaw(pr.Payload.Bytes())
411448
sz += pr.Payload.Len()
412-
r.logger.Debug(
449+
r.logger.Info(
413450
fmt.Sprintf("parsed profile %d", i),
414451
zap.Uint64("timestamp_ns", timestampNs),
415452
zap.String("type", pr.Type.Type),
@@ -427,7 +464,16 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request,
427464
}
428465

429466
func ns(sec uint64) uint64 {
430-
return sec * 1e9
467+
if sec < 10000000000000 {
468+
return sec * 1e9
469+
}
470+
if sec < 10000000000000000 {
471+
return sec * 1e6
472+
}
473+
if sec < 10000000000000000000 {
474+
return sec * 1e3
475+
}
476+
return sec
431477
}
432478

433479
func newOtelcolAttrSetPayloadSizeBytes(service string, typ string, encoding string) *attribute.Set {

0 commit comments

Comments
 (0)