Skip to content

Commit

Permalink
add attributes to kafka spans
Browse files Browse the repository at this point in the history
Signed-off-by: Pierre Tessier <[email protected]>
  • Loading branch information
puckpuck committed Dec 3, 2023
1 parent 8124353 commit dedde4f
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/accountingservice/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func StartConsumerGroup(ctx context.Context, brokers []string, log *logrus.Logge
saramaConfig.Version = ProtocolVersion
// So we can know the partition and offset of messages.
saramaConfig.Producer.Return.Successes = true
saramaConfig.Consumer.Interceptors = []sarama.ConsumerInterceptor{NewOTelInterceptor()}
saramaConfig.Consumer.Interceptors = []sarama.ConsumerInterceptor{NewOTelInterceptor(GroupID)}

consumerGroup, err := sarama.NewConsumerGroup(brokers, GroupID, saramaConfig)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion src/accountingservice/kafka/trace_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ type OTelInterceptor struct {

// NewOTelInterceptor processes span for intercepted messages and add some
// headers with the span data.
func NewOTelInterceptor() *OTelInterceptor {
func NewOTelInterceptor(groupID string) *OTelInterceptor {
oi := OTelInterceptor{}
oi.tracer = otel.Tracer("github.com/open-telemetry/opentelemetry-demo/accountingservice/sarama")

oi.fixedAttrs = []attribute.KeyValue{
semconv.MessagingSystem("kafka"),
semconv.MessagingKafkaConsumerGroup(groupID),
semconv.NetTransportTCP,
}
return &oi
Expand All @@ -50,6 +51,8 @@ func (oi *OTelInterceptor) OnConsume(msg *sarama.ConsumerMessage) {
trace.WithAttributes(
semconv.MessagingDestinationKindTopic,
semconv.MessagingDestinationName(msg.Topic),
semconv.MessagingKafkaMessageOffset(int(msg.Offset)),
semconv.MessagingMessagePayloadSizeBytes(len(msg.Value)),
semconv.MessagingOperationReceive,
semconv.MessagingKafkaDestinationPartition(int(msg.Partition)),
),
Expand Down

0 comments on commit dedde4f

Please sign in to comment.