77 "context"
88 "encoding/json"
99 "fmt"
10+ semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
1011 "net"
1112 "net/http"
1213 "os"
@@ -310,7 +311,7 @@ func (cs *checkoutService) PlaceOrder(ctx context.Context, req *pb.PlaceOrderReq
310311
311312 // send to kafka only if kafka broker address is set
312313 if cs .kafkaBrokerSvcAddr != "" {
313- cs .sendToPostProcessor (orderResult )
314+ cs .sendToPostProcessor (ctx , orderResult )
314315 }
315316
316317 resp := & pb.PlaceOrderResponse {Order : orderResult }
@@ -473,7 +474,7 @@ func (cs *checkoutService) shipOrder(ctx context.Context, address *pb.Address, i
473474 return resp .GetTrackingId (), nil
474475}
475476
476- func (cs * checkoutService ) sendToPostProcessor (result * pb.OrderResult ) {
477+ func (cs * checkoutService ) sendToPostProcessor (ctx context. Context , result * pb.OrderResult ) {
477478 message , err := proto .Marshal (result )
478479 if err != nil {
479480 log .Errorf ("Failed to marshal message to protobuf: %+v" , err )
@@ -485,7 +486,37 @@ func (cs *checkoutService) sendToPostProcessor(result *pb.OrderResult) {
485486 Value : sarama .ByteEncoder (message ),
486487 }
487488
489+ // Inject tracing info into message
490+ span := createProducerSpan (ctx , & msg )
491+ defer span .End ()
492+
488493 cs .KafkaProducerClient .Input () <- & msg
489494 successMsg := <- cs .KafkaProducerClient .Successes ()
490495 log .Infof ("Successful to write message. offset: %v" , successMsg .Offset )
491496}
497+
498+ func createProducerSpan (ctx context.Context , msg * sarama.ProducerMessage ) trace.Span {
499+ spanContext , span := tracer .Start (
500+ ctx ,
501+ fmt .Sprintf ("%s publish" , msg .Topic ),
502+ trace .WithSpanKind (trace .SpanKindProducer ),
503+ trace .WithAttributes (
504+ semconv .PeerService ("kafka" ),
505+ semconv .NetworkTransportTCP ,
506+ semconv .MessagingSystemKafka ,
507+ semconv .MessagingDestinationName (msg .Topic ),
508+ semconv .MessagingOperationPublish ,
509+ semconv .MessagingKafkaDestinationPartition (int (msg .Partition )),
510+ ),
511+ )
512+
513+ carrier := propagation.MapCarrier {}
514+ propagator := otel .GetTextMapPropagator ()
515+ propagator .Inject (spanContext , carrier )
516+
517+ for key , value := range carrier {
518+ msg .Headers = append (msg .Headers , sarama.RecordHeader {Key : []byte (key ), Value : []byte (value )})
519+ }
520+
521+ return span
522+ }
0 commit comments