Skip to content

Commit

Permalink
Implemented merged context with link
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Feb 12, 2025
1 parent 2493b6e commit 3f6082e
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 4 deletions.
9 changes: 5 additions & 4 deletions exporter/exporterhelper/internal/batcher/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

type batch struct {
ctx context.Context
ctx mergedContext
req request.Request
done multiDone
}
Expand Down Expand Up @@ -86,7 +86,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
// Do not flush the last item and add it to the current batch.
reqList = reqList[:len(reqList)-1]
qb.currentBatch = &batch{
ctx: ctx,
ctx: NewMergedContext(ctx),
req: lastReq,
done: multiDone{done},
}
Expand Down Expand Up @@ -120,9 +120,10 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
// - Last result may not have enough data to be flushed.

// Logic on how to deal with the current batch:
// TODO: Deal with merging Context.
qb.currentBatch.req = reqList[0]
qb.currentBatch.done = append(qb.currentBatch.done, done)
qb.currentBatch.ctx = qb.currentBatch.ctx.Merge(qb.currentBatch.ctx)

// Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and
// cannot unlock and re-lock because we are not done processing all the responses.
var firstBatch *batch
Expand All @@ -141,7 +142,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
// Do not flush the last item and add it to the current batch.
reqList = reqList[:len(reqList)-1]
qb.currentBatch = &batch{
ctx: ctx,
ctx: NewMergedContext(ctx),
req: lastReq,
done: multiDone{done},
}
Expand Down
58 changes: 58 additions & 0 deletions exporter/exporterhelper/internal/batcher/merged_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package batcher // import "go.opentelemetry.io/collector/exporter/internal/queue"
import (
"context"
"time"

"go.opentelemetry.io/otel/trace"
)

type mergedContext struct {
deadline time.Time
deadlineOk bool
ctx context.Context
}

func NewMergedContext(ctx context.Context) mergedContext {
deadline, ok := ctx.Deadline()
return mergedContext{
deadline: deadline,
deadlineOk: ok,
ctx: ctx,
}
}

// ContextWithSpan returns a copy of parent with span set as the current Span.
func (mc *mergedContext) Merge(other context.Context) mergedContext {
deadline, deadlineOk := mc.Deadline()
if otherDeadline, ok := other.Deadline(); ok {
deadlineOk = true
if deadline.Before(otherDeadline) {
deadline = otherDeadline
}
}
trace.SpanFromContext(other).AddLink(trace.LinkFromContext(mc.ctx))
return mergedContext{
deadline: deadline,
deadlineOk: deadlineOk,
ctx: mc.ctx,
}
}

func (mc mergedContext) Deadline() (time.Time, bool) {
return mc.deadline, mc.deadlineOk
}

func (mc mergedContext) Done() <-chan struct{} {
return nil
}

func (mc mergedContext) Err() error {
return nil
}

func (mc mergedContext) Value(key any) any {
return mc.ctx.Value(key)
}
65 changes: 65 additions & 0 deletions exporter/exporterhelper/internal/batcher/merged_context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package batcher

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
)

func TestMergedContextDeadline(t *testing.T) {
now := time.Now()
ctx1 := context.Background()
mergedContext := NewMergedContext(ctx1)

deadline, ok := mergedContext.Deadline()
require.False(t, ok)

ctx2, cancel2 := context.WithDeadline(context.Background(), now.Add(200))
defer cancel2()
mergedContext = mergedContext.Merge(ctx2)

deadline, ok = mergedContext.Deadline()
require.True(t, ok)
require.Equal(t, now.Add(200), deadline)

ctx3, cancel3 := context.WithDeadline(context.Background(), now.Add(300))
defer cancel3()
ctx4, cancel4 := context.WithDeadline(context.Background(), now.Add(100))
defer cancel4()
mergedContext = mergedContext.Merge(ctx3)
mergedContext = mergedContext.Merge(ctx4)

deadline, ok = mergedContext.Deadline()
require.True(t, ok)
require.Equal(t, now.Add(300), deadline)
}

func TestMergedContextLink(t *testing.T) {
tracerProvider := componenttest.NewTelemetry().NewTelemetrySettings().TracerProvider
tracer := tracerProvider.Tracer("go.opentelemetry.io/collector/exporter/exporterhelper")

ctx1 := context.WithValue(context.Background(), "key", "value")
ctx2, span2 := tracer.Start(ctx1, "span2")

mergedContext := NewMergedContext(ctx2)
ctx3, span3 := tracer.Start(ctx1, "span3")
mergedContext = mergedContext.Merge(ctx3)
ctx4, span4 := tracer.Start(ctx1, "span3")
mergedContext = mergedContext.Merge(ctx4)

span2.AddEvent("This is an event.")

spanFromMergedContext := trace.SpanFromContext(mergedContext)
require.Equal(t, span2, spanFromMergedContext)

require.Equal(t, trace.SpanContextFromContext(ctx2), span3.(sdktrace.ReadOnlySpan).Links()[0].SpanContext)
require.Equal(t, trace.SpanContextFromContext(ctx2), span4.(sdktrace.ReadOnlySpan).Links()[0].SpanContext)
}

0 comments on commit 3f6082e

Please sign in to comment.