Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Feb 15, 2025
1 parent 5998f4e commit df83318
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 9 deletions.
12 changes: 10 additions & 2 deletions exporter/exporterhelper/internal/batcher/batch_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,20 @@ func LinksFromContext(ctx context.Context) []trace.Link {
if links, ok := ctx.Value(batchSpanLinksKey).([]trace.Link); ok {
return links
}
return []trace.Link{trace.LinkFromContext(ctx)}
return []trace.Link{}
}

func contextWithLink(ctx context.Context) context.Context {
return context.WithValue(
context.Background(),
batchSpanLinksKey,
[]trace.Link{trace.LinkFromContext(ctx)})
}

// contextWithMergedLinks expects ctx1 to have
func contextWithMergedLinks(ctx1 context.Context, ctx2 context.Context) context.Context {
return context.WithValue(
context.Background(),
batchSpanLinksKey,
append(LinksFromContext(ctx1), LinksFromContext(ctx2)...))
append(LinksFromContext(ctx1), trace.LinkFromContext(ctx2)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func TestBatchContextLink(t *testing.T) {
ctx4, span4 := tracer.Start(ctx1, "span4")
defer span4.End()

batchContext := contextWithMergedLinks(ctx2, ctx3)
batchContext := contextWithLink(ctx2)
batchContext = contextWithMergedLinks(batchContext, ctx3)
batchContext = contextWithMergedLinks(batchContext, ctx4)

actualLinks := LinksFromContext(batchContext)
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/batcher/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
// Do not flush the last item and add it to the current batch.
reqList = reqList[:len(reqList)-1]
qb.currentBatch = &batch{
ctx: ctx,
ctx: contextWithLink(ctx),
req: lastReq,
done: multiDone{done},
}
Expand Down Expand Up @@ -142,7 +142,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
// Do not flush the last item and add it to the current batch.
reqList = reqList[:len(reqList)-1]
qb.currentBatch = &batch{
ctx: ctx,
ctx: contextWithLink(ctx),
req: lastReq,
done: multiDone{done},
}
Expand Down
10 changes: 6 additions & 4 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ func NewQueueSender(
}

q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg, func(ctx context.Context, req request.Request, done exporterqueue.Done) {
// TODO: move start of span to enqueue instead to dequeue.
// Figure out how to preserve span context across persistent storage.
ctx, _ = metadata.Tracer(qSet.ExporterSettings.TelemetrySettings).Start(ctx, "exporter/enqueue")
done.OnDone(exportFunc(ctx, req))
trace.SpanFromContext(ctx).End()
}))
if err != nil {
return nil, err
Expand All @@ -129,7 +125,13 @@ func NewQueueSender(
// Have to read the number of items before sending the request since the request can
// be modified by the downstream components like the batcher.
itemsCount := req.ItemsCount()

// TODO: move start of span to enqueue instead to dequeue.
// Figure out how to preserve span context across persistent storage.
ctx, _ = metadata.Tracer(qSet.ExporterSettings.TelemetrySettings).Start(ctx, "exporter/enqueue")
err := next.Send(ctx, req)
trace.SpanFromContext(ctx).End()

if err != nil {
qSet.ExporterSettings.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(err), zap.Int("dropped_items", itemsCount))
Expand Down

0 comments on commit df83318

Please sign in to comment.