Skip to content

Commit

Permalink
Merge branch 'main' into update-to-21
Browse files Browse the repository at this point in the history
  • Loading branch information
julianocosta89 authored Dec 5, 2023
2 parents 7e07aea + 0244786 commit e8d0658
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 11 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ release.
([#1236](https://github.com/open-telemetry/opentelemetry-demo/pull/1236))
* [cartservice] Add .NET memory, CPU, and thread metrics
([#1265](https://github.com/open-telemetry/opentelemetry-demo/pull/1265))
* enable browser traffic in loadgenerator using playwright ([#1266](https://github.com/open-telemetry/opentelemetry-demo/pull/1266))
* enable browser traffic in loadgenerator using playwright
([#1266](https://github.com/open-telemetry/opentelemetry-demo/pull/1266))
* [accountingservice] Add additional attributes to Kafka spans
([#1286](https://github.com/open-telemetry/opentelemetry-demo/pull/1286))

## 1.6.0

Expand Down
2 changes: 1 addition & 1 deletion src/accountingservice/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func StartConsumerGroup(ctx context.Context, brokers []string, log *logrus.Logge
saramaConfig.Version = ProtocolVersion
// So we can know the partition and offset of messages.
saramaConfig.Producer.Return.Successes = true
saramaConfig.Consumer.Interceptors = []sarama.ConsumerInterceptor{NewOTelInterceptor()}
saramaConfig.Consumer.Interceptors = []sarama.ConsumerInterceptor{NewOTelInterceptor(GroupID)}

consumerGroup, err := sarama.NewConsumerGroup(brokers, GroupID, saramaConfig)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion src/accountingservice/kafka/trace_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ type OTelInterceptor struct {

// NewOTelInterceptor processes span for intercepted messages and add some
// headers with the span data.
func NewOTelInterceptor() *OTelInterceptor {
func NewOTelInterceptor(groupID string) *OTelInterceptor {
oi := OTelInterceptor{}
oi.tracer = otel.Tracer("github.com/open-telemetry/opentelemetry-demo/accountingservice/sarama")

oi.fixedAttrs = []attribute.KeyValue{
semconv.MessagingSystem("kafka"),
semconv.MessagingKafkaConsumerGroup(groupID),
semconv.NetTransportTCP,
}
return &oi
Expand All @@ -50,6 +51,8 @@ func (oi *OTelInterceptor) OnConsume(msg *sarama.ConsumerMessage) {
trace.WithAttributes(
semconv.MessagingDestinationKindTopic,
semconv.MessagingDestinationName(msg.Topic),
semconv.MessagingKafkaMessageOffset(int(msg.Offset)),
semconv.MessagingMessagePayloadSizeBytes(len(msg.Value)),
semconv.MessagingOperationReceive,
semconv.MessagingKafkaDestinationPartition(int(msg.Partition)),
),
Expand Down
6 changes: 2 additions & 4 deletions src/checkoutservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ func main() {
}

var srv = grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)
pb.RegisterCheckoutServiceServer(srv, svc)
healthpb.RegisterHealthServer(srv, svc)
Expand Down Expand Up @@ -343,8 +342,7 @@ func (cs *checkoutService) prepareOrderItemsAndShippingQuoteFromCart(ctx context
func createClient(ctx context.Context, svcAddr string) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, svcAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
)
}

Expand Down
6 changes: 2 additions & 4 deletions src/productcatalogservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ func main() {
}

srv := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)

reflection.Register(srv)
Expand Down Expand Up @@ -278,7 +277,6 @@ func (p *productCatalog) checkProductFailure(ctx context.Context, id string) boo
func createClient(ctx context.Context, svcAddr string) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, svcAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
)
}

0 comments on commit e8d0658

Please sign in to comment.