Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 72 additions & 2 deletions cmd/collector/app/handler/zipkin_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ package handler

import (
"context"
"errors"
"fmt"
"net/http"
"reflect"
"unsafe"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver"
"go.opentelemetry.io/collector/component"
Expand All @@ -25,6 +29,66 @@ var (
zipkinID = component.NewID(zipkinComponentType)
)

// zipkinReceiverWrapper wraps the Opentelemetry zipkin receiver to apply keep alive settings
type zipkinReceiverWrapper struct {
receiver.Traces
keepAlive bool
logger *zap.Logger
}

// start wraps the original start method to apply keep-alive settings
func (w *zipkinReceiverWrapper) Start(ctx context.Context, host component.Host) error {
err := w.Traces.Start(ctx, host)
if err != nil {
return err
}

if !w.keepAlive {
if err := w.disableKeepAlive(); err != nil {
w.logger.Warn("Failed to disable keep-alive on Zipkin receiver", zap.Error(err))
} else {
w.logger.Info("Disabled keep-alive on Zipkin receiver")
}
}

return nil
}

// disableKeepAlive use reflection and unsafe operations to access the internal HTTP server and disable keep alive
func (w *zipkinReceiverWrapper) disableKeepAlive() error {
if w.Traces == nil {
return errors.New("receiver is nil")
}

receiverValue := reflect.ValueOf(w.Traces)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like you're trying to unwrap the zipkin receiver internals and set a flag. Why not just create a PR in OTEL contrib to expose the keepalive (assuming it's not exposed now)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created the pr as you suggested, now waiting for review:
open-telemetry/opentelemetry-collector-contrib#42531

if receiverValue.Kind() == reflect.Ptr {
receiverValue = receiverValue.Elem()
}

if receiverValue.Kind() != reflect.Struct {
return errors.New("receiver is not a struct")
}

serverField := receiverValue.FieldByName("server")
if !serverField.IsValid() {
return errors.New("server field not found in zipkin receiver")
}

if serverField.Kind() != reflect.Ptr || serverField.Type() != reflect.TypeOf((*http.Server)(nil)) {
return errors.New("server field is not of type *http.Server")
}

if serverField.IsNil() {
return errors.New("server field is nil")
}

serverPtr := (*http.Server)(unsafe.Pointer(serverField.Pointer()))
serverPtr.SetKeepAlivesEnabled(false)
w.logger.Debug("Successfully disabled keep-alive on Zipkin HTTP server")

return nil
}

// StartZipkinReceiver starts Zipkin receiver from OTEL Collector.
func StartZipkinReceiver(
options *flags.CollectorOptions,
Expand Down Expand Up @@ -90,8 +154,14 @@ func startZipkinReceiver(
if err != nil {
return nil, fmt.Errorf("could not create Zipkin receiver: %w", err)
}
if err := rcvr.Start(context.Background(), &otelHost{logger: logger}); err != nil {
wrappedReceiver := &zipkinReceiverWrapper{ // wrap the receiver to apply keep-alive settings
Traces: rcvr,
keepAlive: options.Zipkin.KeepAlive,
logger: logger,
}

if err := wrappedReceiver.Start(context.Background(), &otelHost{logger: logger}); err != nil {
return nil, fmt.Errorf("could not start Zipkin receiver: %w", err)
}
return rcvr, nil
return wrappedReceiver, nil
}
191 changes: 191 additions & 0 deletions cmd/collector/app/handler/zipkin_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"net/http"
"os"
"reflect"
"testing"

gogojsonpb "github.com/gogo/protobuf/jsonpb"
Expand Down Expand Up @@ -160,3 +161,193 @@ func TestStartZipkinReceiver_Error(t *testing.T) {
_, err = startZipkinReceiver(opts, logger, spanProcessor, tm, f, consumer.NewTraces, createTracesReceiver)
assert.ErrorContains(t, err, "could not create Zipkin receiver")
}

func TestZipkinReceiverKeepAlive(t *testing.T) {
spanProcessor := &mockSpanProcessor{}
logger, _ := testutils.NewLogger()
tm := &tenancy.Manager{}

testCases := []struct {
name string
keepAlive bool
}{
{
name: "KeepAlive enabled",
keepAlive: true,
},
{
name: "KeepAlive disabled",
keepAlive: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
opts := &flags.CollectorOptions{}
opts.Zipkin.Endpoint = ":0"
opts.Zipkin.KeepAlive = tc.keepAlive

rec, err := StartZipkinReceiver(opts, logger, spanProcessor, tm)
require.NoError(t, err)
defer func() {
require.NoError(t, rec.Shutdown(context.Background()))
}()

wrapper, ok := rec.(*zipkinReceiverWrapper)
require.True(t, ok, "receiver should be wrapped with zipkinReceiverWrapper")
assert.Equal(t, tc.keepAlive, wrapper.keepAlive, "keepAlive setting should match")

// Try to access the internal server to verify keep-alive setting
// this is a test only verification using reflection
receiverValue := reflect.ValueOf(wrapper.Traces)
if receiverValue.Kind() == reflect.Ptr {
receiverValue = receiverValue.Elem()
}

serverField := receiverValue.FieldByName("server")
if serverField.IsValid() && !serverField.IsNil() {
// we can not directly check the keep alive setting on the server
// because it's an internal state but we can verify if our wrapper was applied correctly or not
t.Logf("Server field found and is not nil for keepAlive=%v", tc.keepAlive)
}
})
}
}

func TestZipkinReceiverWrapper_DisableKeepAlive_ErrorCases(t *testing.T) {
logger, _ := testutils.NewLogger()

tests := []struct {
name string
receiver receiver.Traces
expectedErrMsg string
}{
{
name: "nil receiver",
receiver: nil,
expectedErrMsg: "receiver is nil",
},
{
name: "receiver without server field",
receiver: &mockReceiverWithoutServerField{},
expectedErrMsg: "server field not found in zipkin receiver",
},
{
name: "receiver with wrong server field type",
receiver: &mockReceiverWithWrongServerType{server: "not a server"},
expectedErrMsg: "server field is not of type *http.Server",
},
{
name: "receiver with nil server field",
receiver: &mockReceiverWithNilServer{server: nil},
expectedErrMsg: "server field is nil",
},
{
name: "non-struct receiver (string type)",
receiver: func() receiver.Traces { s := mockStringReceiver("test"); return &s }(),
expectedErrMsg: "receiver is not a struct",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
wrapper := &zipkinReceiverWrapper{
Traces: tt.receiver,
keepAlive: false,
logger: logger,
}

err := wrapper.disableKeepAlive()
require.Error(t, err)
assert.Contains(t, err.Error(), tt.expectedErrMsg)
})
}
}

type mockReceiverWithoutServerField struct{}

func (*mockReceiverWithoutServerField) Start(_ context.Context, _ component.Host) error {
return nil
}

func (*mockReceiverWithoutServerField) Shutdown(_ context.Context) error {
return nil
}

type mockReceiverWithWrongServerType struct {
server string
}

func (*mockReceiverWithWrongServerType) Start(_ context.Context, _ component.Host) error {
return nil
}

func (*mockReceiverWithWrongServerType) Shutdown(_ context.Context) error {
return nil
}

type mockReceiverWithNilServer struct {
server *http.Server
}

func (*mockReceiverWithNilServer) Start(_ context.Context, _ component.Host) error {
return nil
}

func (*mockReceiverWithNilServer) Shutdown(_ context.Context) error {
return nil
}

type mockStringReceiver string

func (*mockStringReceiver) Start(_ context.Context, _ component.Host) error {
return nil
}

func (*mockStringReceiver) Shutdown(_ context.Context) error {
return nil
}

func TestZipkinReceiverWrapper_DisableKeepAlive_SuccessPath(t *testing.T) {
logger, _ := testutils.NewLogger()

mockReceiver := &mockReceiverWithValidServer{
server: &http.Server{},
}

wrapper := &zipkinReceiverWrapper{
Traces: mockReceiver,
keepAlive: false,
logger: logger,
}

err := wrapper.disableKeepAlive()
require.NoError(t, err)
}

type mockReceiverWithValidServer struct {
server *http.Server
}

func (*mockReceiverWithValidServer) Start(_ context.Context, _ component.Host) error {
return nil
}

func (*mockReceiverWithValidServer) Shutdown(_ context.Context) error {
return nil
}

func TestZipkinReceiverWrapper_Start_WithDisableKeepAliveError(t *testing.T) {
logger, _ := testutils.NewLogger()

mockReceiver := &mockReceiverWithoutServerField{}

wrapper := &zipkinReceiverWrapper{
Traces: mockReceiver,
keepAlive: false,
logger: logger,
}

err := wrapper.Start(context.Background(), &otelHost{logger: logger})
require.NoError(t, err)
}
Loading