Skip to content

Commit

Permalink
Implemented merged context with link
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Feb 12, 2025
1 parent 2493b6e commit c2082a8
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 4 deletions.
25 changes: 25 additions & 0 deletions .chloggen/merged_context.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Link batcher context to all batched request's span contexts.

# One or more tracking issues or pull requests related to the change
issues: []

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
9 changes: 5 additions & 4 deletions exporter/exporterhelper/internal/batcher/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

type batch struct {
ctx context.Context
ctx mergedContext
req request.Request
done multiDone
}
Expand Down Expand Up @@ -86,7 +86,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
// Do not flush the last item and add it to the current batch.
reqList = reqList[:len(reqList)-1]
qb.currentBatch = &batch{
ctx: ctx,
ctx: NewMergedContext(ctx),
req: lastReq,
done: multiDone{done},
}
Expand Down Expand Up @@ -120,9 +120,10 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
// - Last result may not have enough data to be flushed.

// Logic on how to deal with the current batch:
// TODO: Deal with merging Context.
qb.currentBatch.req = reqList[0]
qb.currentBatch.done = append(qb.currentBatch.done, done)
qb.currentBatch.ctx = qb.currentBatch.ctx.Merge(qb.currentBatch.ctx)

// Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and
// cannot unlock and re-lock because we are not done processing all the responses.
var firstBatch *batch
Expand All @@ -141,7 +142,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
// Do not flush the last item and add it to the current batch.
reqList = reqList[:len(reqList)-1]
qb.currentBatch = &batch{
ctx: ctx,
ctx: NewMergedContext(ctx),
req: lastReq,
done: multiDone{done},
}
Expand Down
63 changes: 63 additions & 0 deletions exporter/exporterhelper/internal/batcher/merged_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package batcher // import "go.opentelemetry.io/collector/exporter/internal/queue"
import (
"context"
"time"

"go.opentelemetry.io/otel/trace"
)

// mergedContext links the underlying context to all incoming span contexts.
type mergedContext struct {
deadline time.Time
deadlineOk bool
ctx context.Context
}

func NewMergedContext(ctx context.Context) mergedContext {
deadline, ok := ctx.Deadline()
return mergedContext{
deadline: deadline,
deadlineOk: ok,
ctx: ctx,
}
}

// Merge links the span from incoming context to the span from the first context.
func (mc *mergedContext) Merge(other context.Context) mergedContext {
deadline, deadlineOk := mc.Deadline()
if otherDeadline, ok := other.Deadline(); ok {
deadlineOk = true
if deadline.Before(otherDeadline) {
deadline = otherDeadline
}
}

link := trace.LinkFromContext(other)
trace.SpanFromContext(mc.ctx).AddLink(link)
return mergedContext{
deadline: deadline,
deadlineOk: deadlineOk,
ctx: mc.ctx,
}
}

// Deadline returns the latest deadline of all context.
func (mc mergedContext) Deadline() (time.Time, bool) {
return mc.deadline, mc.deadlineOk
}

func (mc mergedContext) Done() <-chan struct{} {
return nil
}

func (mc mergedContext) Err() error {
return nil
}

// Value delegates to the first context.
func (mc mergedContext) Value(key any) any {
return mc.ctx.Value(key)
}
68 changes: 68 additions & 0 deletions exporter/exporterhelper/internal/batcher/merged_context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package batcher

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
)

func TestMergedContextDeadline(t *testing.T) {
now := time.Now()
ctx1 := context.Background()
mergedContext := NewMergedContext(ctx1)

deadline, ok := mergedContext.Deadline()
require.False(t, ok)

ctx2, cancel2 := context.WithDeadline(context.Background(), now.Add(200))
defer cancel2()
mergedContext = mergedContext.Merge(ctx2)

deadline, ok = mergedContext.Deadline()
require.True(t, ok)
require.Equal(t, now.Add(200), deadline)

ctx3, cancel3 := context.WithDeadline(context.Background(), now.Add(300))
defer cancel3()
ctx4, cancel4 := context.WithDeadline(context.Background(), now.Add(100))
defer cancel4()
mergedContext = mergedContext.Merge(ctx3)
mergedContext = mergedContext.Merge(ctx4)

deadline, ok = mergedContext.Deadline()
require.True(t, ok)
require.Equal(t, now.Add(300), deadline)
}

func TestMergedContextLink(t *testing.T) {
tracerProvider := componenttest.NewTelemetry().NewTelemetrySettings().TracerProvider
tracer := tracerProvider.Tracer("go.opentelemetry.io/collector/exporter/exporterhelper")

ctx1 := context.WithValue(context.Background(), "key", "value")
ctx2, span2 := tracer.Start(ctx1, "span2")
defer span2.End()

mergedContext := NewMergedContext(ctx2)
ctx3, span3 := tracer.Start(ctx1, "span3")
defer span3.End()
mergedContext = mergedContext.Merge(ctx3)
ctx4, span4 := tracer.Start(ctx1, "span3")
defer span4.End()
mergedContext = mergedContext.Merge(ctx4)

span2.AddEvent("This is an event.")

spanFromMergedContext := trace.SpanFromContext(mergedContext)
require.Equal(t, span2, spanFromMergedContext)

require.Equal(t, trace.SpanContextFromContext(ctx3), span2.(sdktrace.ReadOnlySpan).Links()[0].SpanContext)
require.Equal(t, trace.SpanContextFromContext(ctx4), span2.(sdktrace.ReadOnlySpan).Links()[1].SpanContext)
}

0 comments on commit c2082a8

Please sign in to comment.