Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Feb 12, 2025
1 parent 2493b6e commit b7080a2
Show file tree
Hide file tree
Showing 17 changed files with 149 additions and 25 deletions.
2 changes: 1 addition & 1 deletion exporter/debugexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250102185135-69823020774d // indirect
google.golang.org/grpc v1.70.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions exporter/debugexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions exporter/exporterhelper/internal/batcher/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

type batch struct {
ctx context.Context
ctx mergedContext
req request.Request
done multiDone
}
Expand Down Expand Up @@ -86,7 +86,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
// Do not flush the last item and add it to the current batch.
reqList = reqList[:len(reqList)-1]
qb.currentBatch = &batch{
ctx: ctx,
ctx: NewMergedContext(ctx),
req: lastReq,
done: multiDone{done},
}
Expand Down Expand Up @@ -120,9 +120,10 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
// - Last result may not have enough data to be flushed.

// Logic on how to deal with the current batch:
// TODO: Deal with merging Context.
qb.currentBatch.req = reqList[0]
qb.currentBatch.done = append(qb.currentBatch.done, done)
qb.currentBatch.ctx = qb.currentBatch.ctx.Merge(qb.currentBatch.ctx)

// Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and
// cannot unlock and re-lock because we are not done processing all the responses.
var firstBatch *batch
Expand All @@ -141,7 +142,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
// Do not flush the last item and add it to the current batch.
reqList = reqList[:len(reqList)-1]
qb.currentBatch = &batch{
ctx: ctx,
ctx: NewMergedContext(ctx),
req: lastReq,
done: multiDone{done},
}
Expand Down
58 changes: 58 additions & 0 deletions exporter/exporterhelper/internal/batcher/merged_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package batcher // import "go.opentelemetry.io/collector/exporter/internal/queue"
import (
"context"
"time"

"go.opentelemetry.io/otel/trace"
)

type mergedContext struct {
deadline time.Time
deadlineOk bool
ctx context.Context
}

func NewMergedContext(ctx context.Context) mergedContext {
deadline, ok := ctx.Deadline()
return mergedContext{
deadline: deadline,
deadlineOk: ok,
ctx: ctx,
}
}

// ContextWithSpan returns a copy of parent with span set as the current Span.
func (mc *mergedContext) Merge(other context.Context) mergedContext {
deadline, deadlineOk := mc.Deadline()
if otherDeadline, ok := other.Deadline(); ok {
deadlineOk = true
if deadline.Before(otherDeadline) {
deadline = otherDeadline
}
}
trace.SpanFromContext(other).AddLink(trace.LinkFromContext(mc.ctx))
return mergedContext{
deadline: deadline,
deadlineOk: deadlineOk,
ctx: mc.ctx,
}
}

func (mc mergedContext) Deadline() (time.Time, bool) {
return mc.deadline, mc.deadlineOk
}

func (mc mergedContext) Done() <-chan struct{} {
return nil
}

func (mc mergedContext) Err() error {
return nil
}

func (mc mergedContext) Value(key any) any {
return mc.ctx.Value(key)
}
65 changes: 65 additions & 0 deletions exporter/exporterhelper/internal/batcher/merged_context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package batcher

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
)

func TestMergedContextDeadline(t *testing.T) {
now := time.Now()
ctx1 := context.Background()
mergedContext := NewMergedContext(ctx1)

deadline, ok := mergedContext.Deadline()
require.False(t, ok)

ctx2, cancel2 := context.WithDeadline(context.Background(), now.Add(200))
defer cancel2()
mergedContext = mergedContext.Merge(ctx2)

deadline, ok = mergedContext.Deadline()
require.True(t, ok)
require.Equal(t, now.Add(200), deadline)

ctx3, cancel3 := context.WithDeadline(context.Background(), now.Add(300))
defer cancel3()
ctx4, cancel4 := context.WithDeadline(context.Background(), now.Add(100))
defer cancel4()
mergedContext = mergedContext.Merge(ctx3)
mergedContext = mergedContext.Merge(ctx4)

deadline, ok = mergedContext.Deadline()
require.True(t, ok)
require.Equal(t, now.Add(300), deadline)
}

func TestMergedContextLink(t *testing.T) {
tracerProvider := componenttest.NewTelemetry().NewTelemetrySettings().TracerProvider
tracer := tracerProvider.Tracer("go.opentelemetry.io/collector/exporter/exporterhelper")

ctx1 := context.WithValue(context.Background(), "key", "value")
ctx2, span2 := tracer.Start(ctx1, "span2")

mergedContext := NewMergedContext(ctx2)
ctx3, span3 := tracer.Start(ctx1, "span3")
mergedContext = mergedContext.Merge(ctx3)
ctx4, span4 := tracer.Start(ctx1, "span3")
mergedContext = mergedContext.Merge(ctx4)

span2.AddEvent("This is an event.")

spanFromMergedContext := trace.SpanFromContext(mergedContext)
require.Equal(t, span2, spanFromMergedContext)

require.Equal(t, trace.SpanContextFromContext(ctx2), span3.(sdktrace.ReadOnlySpan).Links()[0].SpanContext)
require.Equal(t, trace.SpanContextFromContext(ctx2), span4.(sdktrace.ReadOnlySpan).Links()[0].SpanContext)
}
2 changes: 1 addition & 1 deletion exporter/exporterhelper/xexporterhelper/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250102185135-69823020774d // indirect
google.golang.org/grpc v1.70.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/xexporterhelper/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion exporter/exportertest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ require (
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250102185135-69823020774d // indirect
google.golang.org/protobuf v1.36.5 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
4 changes: 2 additions & 2 deletions exporter/exportertest/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion exporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ require (
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250102185135-69823020774d // indirect
google.golang.org/grpc v1.70.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions exporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion exporter/nopexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ require (
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250102185135-69823020774d // indirect
google.golang.org/grpc v1.70.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions exporter/nopexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion exporter/otlphttpexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
go.opentelemetry.io/collector/pdata/pprofile v0.119.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a
google.golang.org/genproto/googleapis/rpc v0.0.0-20250102185135-69823020774d
google.golang.org/grpc v1.70.0
google.golang.org/protobuf v1.36.5
)
Expand Down
4 changes: 2 additions & 2 deletions exporter/otlphttpexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion exporter/xexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250102185135-69823020774d // indirect
google.golang.org/grpc v1.70.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions exporter/xexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b7080a2

Please sign in to comment.