Skip to content

Commit

Permalink
refactor testclient pkg and bench
Browse files Browse the repository at this point in the history
  • Loading branch information
tomershafir committed Jan 14, 2024
1 parent 1fa6b1c commit 3e37e31
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package pyroscopereceiver

import (
"fmt"
"path/filepath"
"testing"

"github.com/metrico/otel-collector/receiver/pyroscopereceiver/testclient"
)

type request struct {
Expand All @@ -13,7 +14,7 @@ type request struct {

// Benchmarks a running otelcol pyroscope write pipeline (collector and Clickhouse).
// Adjust collectorAddr to bench a your target if needed.
// Example: go test -bench ^BenchmarkPyroscopePipeline$ github.com/metrico/otel-collector/receiver/pyroscopereceiver -benchtime 10s -count 6 -cpu 1 | tee bench.txt
// Example: GOMAXPROCS=1 go test -bench ^BenchmarkPyroscopePipeline$ github.com/metrico/otel-collector/receiver/pyroscopereceiver -benchtime 10s -count 6
func BenchmarkPyroscopePipeline(b *testing.B) {
dist := []request{
{
Expand All @@ -36,14 +37,14 @@ func BenchmarkPyroscopePipeline(b *testing.B) {
jfr: filepath.Join("testdata", "memory_alloc_live_example.jfr"),
},
}
collectorAddr := fmt.Sprintf("http://%s%s", defaultHttpAddr, ingestPath)
collectorAddr := "http://0.0.0.0:8062"

b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
j := 0
for pb.Next() {
send(collectorAddr, dist[j].urlParams, dist[j].jfr)
testclient.Ingest(collectorAddr, dist[j].urlParams, dist[j].jfr)
j = (j + 1) % len(dist)
}
})
Expand Down
53 changes: 3 additions & 50 deletions receiver/pyroscopereceiver/receiver_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package pyroscopereceiver

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"mime/multipart"
"net"
"net/http"
"os"
"path/filepath"
"testing"

"github.com/metrico/otel-collector/receiver/pyroscopereceiver/testclient"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -46,7 +43,7 @@ func loadTestData(t *testing.T, filename string) []byte {
func run(t *testing.T, tests []jfrtest, collectorAddr string, sink *consumertest.LogsSink) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.NoError(t, send(collectorAddr, tt.urlParams, tt.jfr), "send shouldn't have been failed")
assert.NoError(t, testclient.Ingest(collectorAddr, tt.urlParams, tt.jfr), "send shouldn't have been failed")
actual := sink.AllLogs()
assert.NoError(t, plogtest.CompareLogs(tt.expected, actual[0]))
sink.Reset()
Expand Down Expand Up @@ -77,50 +74,6 @@ func startHttpServer(t *testing.T) (string, *consumertest.LogsSink) {
return addr, sink
}

func send(addr string, urlParams map[string]string, jfr string) error {
data, err := os.ReadFile(jfr)
if err != nil {
return err
}

body := new(bytes.Buffer)

mw := multipart.NewWriter(body)
part, err := mw.CreateFormFile("jfr", "jfr")
if err != nil {
return fmt.Errorf("failed to create form file: %w", err)
}
gw := gzip.NewWriter(part)
if _, err := gw.Write(data); err != nil {
return err
}
gw.Close()
mw.Close()

req, err := http.NewRequest("POST", addr, body)
if err != nil {
return err
}
req.Header.Add("Content-Type", mw.FormDataContentType())

q := req.URL.Query()
for k, v := range urlParams {
q.Add(k, v)
}
req.URL.RawQuery = q.Encode()

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}

resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("failed to upload profile; http status code: %d", resp.StatusCode)
}
return nil
}

func TestPyroscopeIngestJfrCpu(t *testing.T) {
tests := make([]jfrtest, 1)
pb := loadTestData(t, "cortex-dev-01__kafka-0__cpu__0.pb")
Expand Down Expand Up @@ -211,7 +164,7 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) {
}

addr, sink := startHttpServer(t)
collectorAddr := fmt.Sprintf("http://%s%s", addr, ingestPath)
collectorAddr := fmt.Sprintf("http://%s", addr)
run(t, tests, collectorAddr, sink)
}

Expand Down
54 changes: 54 additions & 0 deletions receiver/pyroscopereceiver/testclient/ingest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package testclient

import (
"bytes"
"compress/gzip"
"fmt"
"mime/multipart"
"net/http"
"os"
)

func Ingest(addr string, urlParams map[string]string, jfr string) error {
data, err := os.ReadFile(jfr)
if err != nil {
return err
}

body := new(bytes.Buffer)

mw := multipart.NewWriter(body)
part, err := mw.CreateFormFile("jfr", "jfr")
if err != nil {
return fmt.Errorf("failed to create form file: %w", err)
}
gw := gzip.NewWriter(part)
if _, err := gw.Write(data); err != nil {
return err
}
gw.Close()
mw.Close()

req, err := http.NewRequest("POST", fmt.Sprintf("%s/ingest", addr), body)
if err != nil {
return err
}
req.Header.Add("Content-Type", mw.FormDataContentType())

q := req.URL.Query()
for k, v := range urlParams {
q.Add(k, v)
}
req.URL.RawQuery = q.Encode()

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}

resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("failed to upload profile; http status code: %d", resp.StatusCode)
}
return nil
}

0 comments on commit 3e37e31

Please sign in to comment.