diff --git a/CHANGELOG.md b/CHANGELOG.md index 087da9fcc0..2d00c5c5d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ the release. ## Unreleased +* [checkoutservice] add producer interceptor for tracing + ([#1400](https://github.com/open-telemetry/opentelemetry-demo/pull/1400)) * [chore] increase memory for Collector and Jaeger ([#1396](https://github.com/open-telemetry/opentelemetry-demo/pull/1396)) * [chore] fix Make targets for restart and redeploy diff --git a/src/accountingservice/kafka/trace_interceptor.go b/src/accountingservice/kafka/trace_interceptor.go index 16899a5891..5e2bf1254e 100644 --- a/src/accountingservice/kafka/trace_interceptor.go +++ b/src/accountingservice/kafka/trace_interceptor.go @@ -28,6 +28,7 @@ func NewOTelInterceptor(groupID string) *OTelInterceptor { oi.fixedAttrs = []attribute.KeyValue{ semconv.MessagingSystemKafka, + semconv.MessagingOperationReceive, semconv.MessagingKafkaConsumerGroup(groupID), semconv.NetworkTransportTCP, } diff --git a/src/checkoutservice/kafka/producer.go b/src/checkoutservice/kafka/producer.go index 6ee773d8c2..0d9ea5684a 100644 --- a/src/checkoutservice/kafka/producer.go +++ b/src/checkoutservice/kafka/producer.go @@ -17,6 +17,7 @@ func CreateKafkaProducer(brokers []string, log *logrus.Logger) (sarama.AsyncProd saramaConfig.Version = ProtocolVersion // So we can know the partition and offset of messages. saramaConfig.Producer.Return.Successes = true + saramaConfig.Producer.Interceptors = []sarama.ProducerInterceptor{NewOTelInterceptor()} producer, err := sarama.NewAsyncProducer(brokers, saramaConfig) if err != nil { diff --git a/src/checkoutservice/kafka/trace_interceptor.go b/src/checkoutservice/kafka/trace_interceptor.go new file mode 100644 index 0000000000..63b4c3cdcc --- /dev/null +++ b/src/checkoutservice/kafka/trace_interceptor.go @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +package kafka + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" + "go.opentelemetry.io/otel/trace" + + "github.com/IBM/sarama" +) + +type OTelInterceptor struct { + tracer trace.Tracer + fixedAttrs []attribute.KeyValue +} + +// NewOTelInterceptor processes span for intercepted messages and add some +// headers with the span data. +func NewOTelInterceptor() *OTelInterceptor { + oi := OTelInterceptor{} + oi.tracer = otel.Tracer("github.com/open-telemetry/opentelemetry-demo/checkoutservice/sarama") + + oi.fixedAttrs = []attribute.KeyValue{ + semconv.MessagingSystemKafka, + semconv.MessagingOperationPublish, + semconv.NetworkTransportTCP, + } + return &oi +} + +func (oi *OTelInterceptor) OnSend(msg *sarama.ProducerMessage) { + spanContext, span := oi.tracer.Start( + context.Background(), + fmt.Sprintf("%s publish", msg.Topic), + trace.WithSpanKind(trace.SpanKindProducer), + trace.WithAttributes( + semconv.PeerService("kafka"), + semconv.NetworkTransportTCP, + semconv.MessagingSystemKafka, + semconv.MessagingDestinationName(msg.Topic), + semconv.MessagingOperationPublish, + semconv.MessagingKafkaDestinationPartition(int(msg.Partition)), + ), + ) + defer span.End() + + carrier := propagation.MapCarrier{} + propagator := otel.GetTextMapPropagator() + propagator.Inject(spanContext, carrier) + + for key, value := range carrier { + msg.Headers = append(msg.Headers, sarama.RecordHeader{Key: []byte(key), Value: []byte(value)}) + } +} diff --git a/src/checkoutservice/main.go b/src/checkoutservice/main.go index 687b7e307b..35119e4eb3 100644 --- a/src/checkoutservice/main.go +++ b/src/checkoutservice/main.go @@ -7,7 +7,6 @@ import ( "context" "encoding/json" "fmt" - semconv "go.opentelemetry.io/otel/semconv/v1.24.0" "net" "net/http" "os" @@ -311,7 +310,7 @@ func (cs *checkoutService) PlaceOrder(ctx context.Context, req *pb.PlaceOrderReq // send to kafka only if kafka broker address is set if cs.kafkaBrokerSvcAddr != "" { - cs.sendToPostProcessor(ctx, orderResult) + cs.sendToPostProcessor(orderResult) } resp := &pb.PlaceOrderResponse{Order: orderResult} @@ -474,7 +473,7 @@ func (cs *checkoutService) shipOrder(ctx context.Context, address *pb.Address, i return resp.GetTrackingId(), nil } -func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.OrderResult) { +func (cs *checkoutService) sendToPostProcessor(result *pb.OrderResult) { message, err := proto.Marshal(result) if err != nil { log.Errorf("Failed to marshal message to protobuf: %+v", err) @@ -486,37 +485,7 @@ func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.O Value: sarama.ByteEncoder(message), } - // Inject tracing info into message - span := createProducerSpan(ctx, &msg) - defer span.End() - cs.KafkaProducerClient.Input() <- &msg successMsg := <-cs.KafkaProducerClient.Successes() log.Infof("Successful to write message. offset: %v", successMsg.Offset) } - -func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.Span { - spanContext, span := tracer.Start( - ctx, - fmt.Sprintf("%s publish", msg.Topic), - trace.WithSpanKind(trace.SpanKindProducer), - trace.WithAttributes( - semconv.PeerService("kafka"), - semconv.NetworkTransportTCP, - semconv.MessagingSystemKafka, - semconv.MessagingDestinationName(msg.Topic), - semconv.MessagingOperationPublish, - semconv.MessagingKafkaDestinationPartition(int(msg.Partition)), - ), - ) - - carrier := propagation.MapCarrier{} - propagator := otel.GetTextMapPropagator() - propagator.Inject(spanContext, carrier) - - for key, value := range carrier { - msg.Headers = append(msg.Headers, sarama.RecordHeader{Key: []byte(key), Value: []byte(value)}) - } - - return span -}