Skip to content

Commit

Permalink
Ensure tracing propagation fixes #114 (#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
mantzas authored Jun 26, 2018
1 parent d30f508 commit f6c63d9
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 9 deletions.
4 changes: 2 additions & 2 deletions async/amqp/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ func (c *Component) Run(ctx context.Context) error {
log.Infof("processing message %s", d.MessageId)

go func(d *amqp.Delivery, a *agr_errors.Aggregate) {
sp := trace.StartConsumerSpan(c.name, trace.AMQPConsumerComponent, mapHeader(d.Headers))
sp, chCtx := trace.StartConsumerSpan(ctx, c.name, trace.AMQPConsumerComponent, mapHeader(d.Headers))

dec, err := async.DetermineDecoder(d.ContentType)
if err != nil {
handlerMessageError(d, a, err, fmt.Sprintf("failed to determine encoding %s. Sending NACK", d.ContentType))
trace.FinishSpan(sp, true)
return
}
err = c.proc(ctx, async.NewMessage(d.Body, dec))
err = c.proc(chCtx, async.NewMessage(d.Body, dec))
if err != nil {
handlerMessageError(d, a, err, fmt.Sprintf("failed to process message %s. Sending NACK", d.MessageId))
trace.FinishSpan(sp, true)
Expand Down
4 changes: 2 additions & 2 deletions async/kafka/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (c *Component) Run(ctx context.Context) error {
case msg := <-chMsg:
log.Debugf("data received from topic %s", msg.Topic)
go func() {
sp := trace.StartConsumerSpan(c.name, trace.KafkaConsumerComponent, mapHeader(msg.Headers))
sp, chCtx := trace.StartConsumerSpan(ctx, c.name, trace.KafkaConsumerComponent, mapHeader(msg.Headers))

var ct string
if c.contentType != "" {
Expand All @@ -94,7 +94,7 @@ func (c *Component) Run(ctx context.Context) error {
return
}

err = c.proc(ctx, async.NewMessage(msg.Value, dec))
err = c.proc(chCtx, async.NewMessage(msg.Value, dec))
if err != nil {
failCh <- errors.Wrap(err, "failed to process message")
trace.FinishSpan(sp, true)
Expand Down
8 changes: 4 additions & 4 deletions trace/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ func Close() error {
}

// StartConsumerSpan start a new kafka consumer span.
func StartConsumerSpan(name string, cmp Component, hdr map[string]string) opentracing.Span {
ctx, _ := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders, opentracing.TextMapCarrier(hdr))
sp := opentracing.StartSpan(name, consumerOption{ctx: ctx})
func StartConsumerSpan(ctx context.Context, name string, cmp Component, hdr map[string]string) (opentracing.Span, context.Context) {
spCtx, _ := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders, opentracing.TextMapCarrier(hdr))
sp := opentracing.StartSpan(name, consumerOption{ctx: spCtx})
ext.Component.Set(sp, string(cmp))
return sp
return sp, opentracing.ContextWithSpan(ctx, sp)
}

// FinishSpan finished a kafka consumer span.
Expand Down
3 changes: 2 additions & 1 deletion trace/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ func TestStartFinishConsumerSpan(t *testing.T) {
mtr := mocktracer.New()
opentracing.SetGlobalTracer(mtr)
hdr := map[string]string{"key": "val"}
sp := StartConsumerSpan("test", AMQPConsumerComponent, hdr)
sp, ctx := StartConsumerSpan(context.Background(), "test", AMQPConsumerComponent, hdr)
assert.NotNil(sp)
assert.NotNil(ctx)
assert.IsType(&mocktracer.MockSpan{}, sp)
jsp := sp.(*mocktracer.MockSpan)
assert.NotNil(jsp)
Expand Down

0 comments on commit f6c63d9

Please sign in to comment.