Skip to content

Commit ab2dd9b

Browse files
otel/xxray: add custom X-Ray propagator that preserves X-Amzn-Trace-Id
Updates #3663 Signed-off-by: Alexander Yastrebov <[email protected]>
1 parent 859fe9b commit ab2dd9b

File tree

2 files changed

+178
-5
lines changed

2 files changed

+178
-5
lines changed

otel/otel.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,13 @@ import (
88
"errors"
99
"fmt"
1010
"os"
11+
"slices"
12+
"strings"
1113
"time"
1214

1315
"go.opentelemetry.io/contrib/exporters/autoexport"
1416
"go.opentelemetry.io/contrib/propagators/autoprop"
17+
"go.opentelemetry.io/contrib/propagators/aws/xray"
1518
"go.opentelemetry.io/otel"
1619
"go.opentelemetry.io/otel/attribute"
1720
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
@@ -23,6 +26,7 @@ import (
2326

2427
"github.com/bombsimon/logrusr/v4"
2528
"github.com/sirupsen/logrus"
29+
"github.com/zalando/skipper/otel/xxray"
2630
)
2731

2832
var log = logrus.WithField("package", "otel")
@@ -53,6 +57,10 @@ type BatchSpanProcessor struct {
5357
MaxExportBatchSize int `yaml:"maxExportBatchSize"`
5458
}
5559

60+
func init() {
61+
autoprop.RegisterTextMapPropagator("xxray", xxray.NewPropagatorGenerator())
62+
}
63+
5664
// Init bootstraps the OpenTelemetry pipeline using environment variables and provided options.
5765
// Make sure to call shutdown for proper cleanup if err is nil.
5866
//
@@ -129,17 +137,22 @@ func Init(ctx context.Context, o *Options) (shutdown func(context.Context) error
129137
return handleErr(err)
130138
}
131139

132-
tracerProvider := trace.NewTracerProvider(batcherOpt, resourceOpt)
133-
shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown)
134-
135-
otel.SetTracerProvider(tracerProvider)
136-
137140
propagator, err := textMapPropagator(o)
138141
if err != nil {
139142
return handleErr(err)
140143
}
141144
otel.SetTextMapPropagator(propagator)
142145

146+
var idGenerator trace.IDGenerator
147+
if hasPropagator("xray", o) || hasPropagator("xxray", o) {
148+
idGenerator = xray.NewIDGenerator()
149+
}
150+
151+
tracerProvider := trace.NewTracerProvider(batcherOpt, resourceOpt, trace.WithIDGenerator(idGenerator))
152+
shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown)
153+
154+
otel.SetTracerProvider(tracerProvider)
155+
143156
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { log.Error(err) }))
144157
otel.SetLogger(logrusr.New(log))
145158

@@ -246,6 +259,14 @@ func textMapPropagator(o *Options) (propagation.TextMapPropagator, error) {
246259
}
247260
}
248261

262+
func hasPropagator(name string, o *Options) bool {
263+
if len(o.Propagators) > 0 {
264+
return slices.Contains(o.Propagators, name)
265+
} else {
266+
return slices.Contains(strings.Split(os.Getenv("OTEL_PROPAGATORS"), ","), name)
267+
}
268+
}
269+
249270
func skipperDebugSpanExporter(ctx context.Context) (trace.SpanExporter, error) {
250271
return stdouttrace.New(stdouttrace.WithWriter(writerFunc(func(p []byte) (int, error) {
251272
log.Debugf("Span: %s", p)

otel/xxray/xxray.go

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package xxray
2+
3+
import (
4+
"context"
5+
"errors"
6+
"strings"
7+
8+
"go.opentelemetry.io/contrib/propagators/aws/xray"
9+
"go.opentelemetry.io/otel/propagation"
10+
"go.opentelemetry.io/otel/trace"
11+
)
12+
13+
type Propagator struct {
14+
xray.Propagator
15+
idGenerator *xray.IDGenerator
16+
}
17+
18+
func NewPropagatorGenerator() *Propagator {
19+
return &Propagator{idGenerator: xray.NewIDGenerator()}
20+
}
21+
22+
func (p *Propagator) Extract(ctx context.Context, carrier propagation.TextMapCarrier) context.Context {
23+
newCtx := p.Propagator.Extract(ctx, carrier)
24+
// If failed to extract span context, try to re-use trace id
25+
if newCtx == ctx {
26+
if header := carrier.Get(traceHeaderKey); header != "" {
27+
tsc, err := extract(header)
28+
if err == nil && tsc.TraceID().IsValid() {
29+
// Re-use only trace id
30+
return trace.ContextWithRemoteSpanContext(ctx, trace.NewSpanContext(trace.SpanContextConfig{
31+
TraceID: tsc.TraceID(),
32+
SpanID: p.idGenerator.NewSpanID(ctx, tsc.TraceID()),
33+
}))
34+
}
35+
}
36+
}
37+
return newCtx
38+
}
39+
40+
// The rest is copied from https://github.com/open-telemetry/opentelemetry-go-contrib/blob/80c9316336ebb4f4c67d8e1011a3add889213fb7/propagators/aws/xray/propagator.go
41+
const (
42+
traceHeaderKey = "X-Amzn-Trace-Id"
43+
traceHeaderDelimiter = ";"
44+
kvDelimiter = "="
45+
traceIDKey = "Root"
46+
sampleFlagKey = "Sampled"
47+
parentIDKey = "Parent"
48+
traceIDVersion = "1"
49+
traceIDDelimiter = "-"
50+
isSampled = "1"
51+
notSampled = "0"
52+
53+
traceFlagNone = 0x0
54+
traceFlagSampled = 0x1 << 0
55+
traceIDLength = 35
56+
traceIDDelimitterIndex1 = 1
57+
traceIDDelimitterIndex2 = 10
58+
traceIDFirstPartLength = 8
59+
sampledFlagLength = 1
60+
)
61+
62+
var (
63+
empty = trace.SpanContext{}
64+
errInvalidTraceHeader = errors.New("invalid X-Amzn-Trace-Id header value, should contain 3 different part separated by ;")
65+
errMalformedTraceID = errors.New("cannot decode trace ID from header")
66+
errLengthTraceIDHeader = errors.New("incorrect length of X-Ray trace ID found, 35 character length expected")
67+
errInvalidTraceIDVersion = errors.New("invalid X-Ray trace ID header found, does not have valid trace ID version")
68+
errInvalidSpanIDLength = errors.New("invalid span ID length, must be 16")
69+
)
70+
71+
// extract extracts Span Context from context.
72+
func extract(headerVal string) (trace.SpanContext, error) {
73+
var (
74+
scc = trace.SpanContextConfig{}
75+
err error
76+
delimiterIndex int
77+
part string
78+
)
79+
pos := 0
80+
for pos < len(headerVal) {
81+
delimiterIndex = indexOf(headerVal, traceHeaderDelimiter, pos)
82+
if delimiterIndex >= 0 {
83+
part = headerVal[pos:delimiterIndex]
84+
pos = delimiterIndex + 1
85+
} else {
86+
// last part
87+
part = strings.TrimSpace(headerVal[pos:])
88+
pos = len(headerVal)
89+
}
90+
equalsIndex := strings.Index(part, kvDelimiter)
91+
if equalsIndex < 0 {
92+
return empty, errInvalidTraceHeader
93+
}
94+
value := part[equalsIndex+1:]
95+
switch {
96+
case strings.HasPrefix(part, traceIDKey):
97+
scc.TraceID, err = parseTraceID(value)
98+
if err != nil {
99+
return empty, err
100+
}
101+
case strings.HasPrefix(part, parentIDKey):
102+
// extract parentId
103+
scc.SpanID, err = trace.SpanIDFromHex(value)
104+
if err != nil {
105+
return empty, errInvalidSpanIDLength
106+
}
107+
case strings.HasPrefix(part, sampleFlagKey):
108+
// extract traceflag
109+
scc.TraceFlags = parseTraceFlag(value)
110+
}
111+
}
112+
return trace.NewSpanContext(scc), nil
113+
}
114+
115+
// indexOf returns position of the first occurrence of a substr in str starting at pos index.
116+
func indexOf(str, substr string, pos int) int {
117+
index := strings.Index(str[pos:], substr)
118+
if index > -1 {
119+
index += pos
120+
}
121+
return index
122+
}
123+
124+
// parseTraceID returns trace ID if valid else return invalid trace ID.
125+
func parseTraceID(xrayTraceID string) (trace.TraceID, error) {
126+
if len(xrayTraceID) != traceIDLength {
127+
return empty.TraceID(), errLengthTraceIDHeader
128+
}
129+
if !strings.HasPrefix(xrayTraceID, traceIDVersion) {
130+
return empty.TraceID(), errInvalidTraceIDVersion
131+
}
132+
133+
if xrayTraceID[traceIDDelimitterIndex1:traceIDDelimitterIndex1+1] != traceIDDelimiter ||
134+
xrayTraceID[traceIDDelimitterIndex2:traceIDDelimitterIndex2+1] != traceIDDelimiter {
135+
return empty.TraceID(), errMalformedTraceID
136+
}
137+
138+
epochPart := xrayTraceID[traceIDDelimitterIndex1+1 : traceIDDelimitterIndex2]
139+
uniquePart := xrayTraceID[traceIDDelimitterIndex2+1 : traceIDLength]
140+
141+
result := epochPart + uniquePart
142+
return trace.TraceIDFromHex(result)
143+
}
144+
145+
// parseTraceFlag returns a parsed trace flag.
146+
func parseTraceFlag(xraySampledFlag string) trace.TraceFlags {
147+
// Use a direct comparison here (#7262).
148+
if xraySampledFlag == isSampled {
149+
return trace.FlagsSampled
150+
}
151+
return trace.FlagsSampled.WithSampled(false)
152+
}

0 commit comments

Comments
 (0)