Skip to content

Commit 7a8a355

Browse files
committed
[exporterhelper] Preserve request span context in the persistent queue
1 parent d800ad3 commit 7a8a355

23 files changed

+558
-60
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: breaking
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
5+
component: exporterhelper
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: QueueBatchEncoding interface is changed to support marshaling and unmarshaling of request context.
9+
10+
# One or more tracking issues or pull requests related to the change
11+
issues: [13176]
12+
13+
# Optional: The change log or logs in which this entry should be included.
14+
# e.g. '[user]' or '[user, api]'
15+
# Include 'user' if the change is relevant to end users.
16+
# Include 'api' if there is a change to a library API.
17+
# Default: '[user]'
18+
change_logs: [api]
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
5+
component: exporterhelper
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Add an option to preserve request span context in the persistent queue
9+
10+
# One or more tracking issues or pull requests related to the change
11+
issues: [11740]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext: |
17+
Currently, it is behind the exporter.PersistRequestContext feature gate, which can be enabled by adding
18+
`--feature-gates=exporter.PersistRequestContext` to the collector command line. An exporter buffer stored by
19+
a previous version of the collector (or by a collector with the feature gate disabled) can be read by a newer
20+
collector with the feature enabled. However, the reverse is not supported: a buffer stored by a newer collector with
21+
the feature enabled cannot be read by an older collector (or by a collector with the feature gate disabled).
22+
23+
# Optional: The change log or logs in which this entry should be included.
24+
# e.g. '[user]' or '[user, api]'
25+
# Include 'user' if the change is relevant to end users.
26+
# Include 'api' if there is a change to a library API.
27+
# Default: '[user]'
28+
change_logs: [user]

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ gensemconv: $(SEMCONVGEN) $(SEMCONVKIT)
261261
$(SEMCONVGEN) -o semconv/${SPECTAG} -t semconv/template.j2 -s ${SPECTAG} -i ${SPECPATH}/model/. --only=attribute_group -p conventionType=attribute_group -f generated_attribute_group.go
262262
$(SEMCONVKIT) -output "semconv/$(SPECTAG)" -tag "$(SPECTAG)"
263263

264-
INTERNAL_PROTO_SRC_DIRS := exporter/exporterhelper/internal/queuebatch/internal/persistentqueue
264+
INTERNAL_PROTO_SRC_DIRS := exporter/exporterhelper/internal/persistentqueue
265265
# INTERNAL_PROTO_SRC_DIRS += path/to/other/proto/dirs
266266
INTERNAL_PROTO_FILES := $(foreach dir,$(INTERNAL_PROTO_SRC_DIRS),$(wildcard $(dir)/*.proto))
267267
INTERNAL_PROTOC := $(DOCKERCMD) run --rm -u ${shell id -u} -v${PWD}:${PWD} -w${PWD} ${DOCKER_PROTOBUF} --proto_path=${PWD} --go_out=${PWD}

exporter/exporterhelper/internal/base_exporter_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,10 @@ func newFakeQueueBatch() QueueBatchSettings[request.Request] {
162162

163163
type fakeEncoding struct{}
164164

165-
func (f fakeEncoding) Marshal(request.Request) ([]byte, error) {
165+
func (f fakeEncoding) Marshal(context.Context, request.Request) ([]byte, error) {
166166
return []byte("mockRequest"), nil
167167
}
168168

169-
func (f fakeEncoding) Unmarshal([]byte) (request.Request, error) {
170-
return &requesttest.FakeRequest{}, nil
169+
func (f fakeEncoding) Unmarshal([]byte) (context.Context, request.Request, error) {
170+
return context.Background(), &requesttest.FakeRequest{}, nil
171171
}
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package persistentqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/persistentqueue"
5+
6+
import (
7+
"context"
8+
"encoding/binary"
9+
"errors"
10+
"fmt"
11+
"io"
12+
"math"
13+
14+
"go.opentelemetry.io/otel/propagation"
15+
16+
"go.opentelemetry.io/collector/featuregate"
17+
)
18+
19+
// PersistRequestContextFeatureGate controls whether request context should be preserved in the persistent queue.
20+
var PersistRequestContextFeatureGate = featuregate.GlobalRegistry().MustRegister(
21+
"exporter.PersistRequestContext",
22+
featuregate.StageAlpha,
23+
featuregate.WithRegisterFromVersion("v0.128.0"),
24+
featuregate.WithRegisterDescription("controls whether context should be stored alongside requests in the persistent queue"),
25+
)
26+
27+
type Encoding[T any] interface {
28+
// Marshal is a function that can marshal a request and its context into bytes.
29+
Marshal(context.Context, T) ([]byte, error)
30+
31+
// Unmarshal is a function that can unmarshal bytes into a request and its context.
32+
Unmarshal([]byte) (context.Context, T, error)
33+
}
34+
35+
type RequestEncoding[T any] interface {
36+
// Marshal is a function that can marshal a request into bytes.
37+
Marshal(T) ([]byte, error)
38+
39+
// Unmarshal is a function that can unmarshal bytes into a request.
40+
Unmarshal([]byte) (T, error)
41+
}
42+
43+
// defaultEncoder provides an interface for marshaling and unmarshaling requests along with their context.
44+
type defaultEncoder[T any] struct {
45+
requestEncoding RequestEncoding[T]
46+
}
47+
48+
func NewEncoder[T any](requestEncoding RequestEncoding[T]) Encoding[T] {
49+
return defaultEncoder[T]{requestEncoding: requestEncoding}
50+
}
51+
52+
// requestDataKey is the key used to store request data in bytesMap.
53+
const requestDataKey = "req"
54+
55+
var tracePropagator = propagation.TraceContext{}
56+
57+
func (re defaultEncoder[T]) Marshal(ctx context.Context, req T) ([]byte, error) {
58+
if !PersistRequestContextFeatureGate.IsEnabled() {
59+
return re.requestEncoding.Marshal(req)
60+
}
61+
62+
bm := newBytesMap()
63+
tracePropagator.Inject(ctx, &bytesMapCarrier{bytesMap: bm})
64+
reqBuf, err := re.requestEncoding.Marshal(req)
65+
if err != nil {
66+
return nil, err
67+
}
68+
if err := bm.set(requestDataKey, reqBuf); err != nil {
69+
return nil, fmt.Errorf("failed to marshal request: %w", err)
70+
}
71+
72+
return *bm, nil
73+
}
74+
75+
func (re defaultEncoder[T]) Unmarshal(b []byte) (context.Context, T, error) {
76+
if !PersistRequestContextFeatureGate.IsEnabled() {
77+
req, err := re.requestEncoding.Unmarshal(b)
78+
return context.Background(), req, err
79+
}
80+
81+
bm := bytesMapFromBytes(b)
82+
if bm == nil {
83+
// Fall back to unmarshalling of the request alone.
84+
// This can happen if the data persisted by the version that doesn't support the context unmarshaling.
85+
req, err := re.requestEncoding.Unmarshal(b)
86+
return context.Background(), req, err
87+
}
88+
ctx := tracePropagator.Extract(context.Background(), &bytesMapCarrier{bytesMap: bm})
89+
reqBuf, err := bm.get(requestDataKey)
90+
var req T
91+
if err != nil {
92+
return context.Background(), req, fmt.Errorf("failed to read serialized request data: %w", err)
93+
}
94+
req, err = re.requestEncoding.Unmarshal(reqBuf)
95+
return ctx, req, err
96+
}
97+
98+
// bytesMap is a slice of bytes that represents a map-like structure for storing key-value pairs.
99+
// It's optimized for efficient memory usage for low number of key-value pairs with big values.
100+
// The format is a sequence of key-value pairs encoded as:
101+
// - 1 byte length of the key
102+
// - key bytes
103+
// - 4 byte length of the value
104+
// - value bytes
105+
type bytesMap []byte
106+
107+
const (
108+
// prefix bytes to denote the bytesMap serialization: 0x00 magic byte + 0x01 version of the encoder.
109+
magicByte = byte(0x00)
110+
formatV1Byte = byte(0x01)
111+
prefixBytesLen = 2
112+
113+
initialCapacity = 256
114+
)
115+
116+
func newBytesMap() *bytesMap {
117+
bm := bytesMap(make([]byte, 0, initialCapacity))
118+
bm = append(bm, magicByte, formatV1Byte)
119+
return &bm
120+
}
121+
122+
// set sets the specified key in the map. Must be called only once for each key.
123+
func (bm *bytesMap) set(key string, val []byte) error {
124+
if len(key) > math.MaxUint8 {
125+
return errors.New("key param is too long")
126+
}
127+
valSize := len(val)
128+
if uint64(valSize) > math.MaxUint32 {
129+
return fmt.Errorf("value is too large to persist, size %d", valSize)
130+
}
131+
132+
*bm = append(*bm, byte(len(key)))
133+
*bm = append(*bm, key...)
134+
135+
var lenBuf [4]byte
136+
binary.LittleEndian.PutUint32(lenBuf[:], uint32(valSize)) //nolint:gosec // disable G115
137+
*bm = append(*bm, lenBuf[:]...)
138+
*bm = append(*bm, val...)
139+
140+
return nil
141+
}
142+
143+
// get scans sequentially for the first matching key and returns the value as bytes.
144+
func (bm *bytesMap) get(k string) ([]byte, error) {
145+
for i := prefixBytesLen; i < len(*bm); {
146+
kl := int([]byte(*bm)[i])
147+
i++
148+
149+
if i+kl > len(*bm) {
150+
return nil, io.ErrUnexpectedEOF
151+
}
152+
key := string([]byte(*bm)[i : i+kl])
153+
i += kl
154+
155+
if i+4 > len(*bm) {
156+
return nil, io.ErrUnexpectedEOF
157+
}
158+
vLen := binary.LittleEndian.Uint32([]byte(*bm)[i:])
159+
i += 4
160+
161+
if i+int(vLen) > len(*bm) {
162+
return nil, io.ErrUnexpectedEOF
163+
}
164+
val := []byte(*bm)[i : i+int(vLen)]
165+
i += int(vLen)
166+
167+
if key == k {
168+
return val, nil
169+
}
170+
}
171+
return nil, nil
172+
}
173+
174+
// keys returns header names in encounter order.
175+
func (bm *bytesMap) keys() []string {
176+
var out []string
177+
for i := prefixBytesLen; i < len(*bm); {
178+
kl := int([]byte(*bm)[i])
179+
i++
180+
181+
if i+kl > len(*bm) {
182+
break // malformed entry
183+
}
184+
out = append(out, string([]byte(*bm)[i:i+kl]))
185+
i += kl
186+
187+
if i+4 > len(*bm) {
188+
break // malformed entry
189+
}
190+
vLen := binary.LittleEndian.Uint32([]byte(*bm)[i:])
191+
i += 4 + int(vLen)
192+
}
193+
return out
194+
}
195+
196+
func bytesMapFromBytes(b []byte) *bytesMap {
197+
if len(b) < prefixBytesLen || b[0] != magicByte || b[1] != formatV1Byte {
198+
return nil
199+
}
200+
return (*bytesMap)(&b)
201+
}
202+
203+
// bytesMapCarrier implements propagation.TextMapCarrier on top of bytesMap.
204+
type bytesMapCarrier struct {
205+
*bytesMap
206+
}
207+
208+
var _ propagation.TextMapCarrier = (*bytesMapCarrier)(nil)
209+
210+
// Set appends a new string entry; if the key already exists it is left unchanged.
211+
func (c *bytesMapCarrier) Set(k, v string) {
212+
_ = c.set(k, []byte(v))
213+
}
214+
215+
// Get scans sequentially for the first matching key.
216+
func (c *bytesMapCarrier) Get(k string) string {
217+
v, _ := c.get(k)
218+
return string(v)
219+
}
220+
221+
// Keys returns header names in encounter order.
222+
func (c *bytesMapCarrier) Keys() []string {
223+
return c.keys()
224+
}

0 commit comments

Comments
 (0)