Skip to content

Commit 52d1414

Browse files
authored
[chore] Add internal attribute package (#12073)
This PR creates a new internal package which defines the correct set of attributes for each kind of component. This is a subset of #12057 which is broken off in order to reduce the overall size of that PR. As such, this package will not actually be used until #12057 is merged.
1 parent 81f1fad commit 52d1414

File tree

2 files changed

+296
-0
lines changed

2 files changed

+296
-0
lines changed
+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package attribute // import "go.opentelemetry.io/collector/service/internal/attribute"
5+
6+
import (
7+
"hash/fnv"
8+
9+
"go.opentelemetry.io/otel/attribute"
10+
11+
"go.opentelemetry.io/collector/component"
12+
"go.opentelemetry.io/collector/pipeline"
13+
)
14+
15+
const (
16+
componentKindKey = "otelcol.component.kind"
17+
componentIDKey = "otelcol.component.id"
18+
pipelineIDKey = "otelcol.pipeline.id"
19+
signalKey = "otelcol.signal"
20+
signalOutputKey = "otelcol.signal.output"
21+
22+
capabiltiesKind = "capabilities"
23+
fanoutKind = "fanout"
24+
)
25+
26+
type Attributes struct {
27+
set attribute.Set
28+
id int64
29+
}
30+
31+
func newAttributes(attrs ...attribute.KeyValue) *Attributes {
32+
h := fnv.New64a()
33+
for _, kv := range attrs {
34+
h.Write([]byte("(" + string(kv.Key) + "|" + kv.Value.AsString() + ")"))
35+
}
36+
return &Attributes{
37+
set: attribute.NewSet(attrs...),
38+
id: int64(h.Sum64()), // #nosec G115
39+
}
40+
}
41+
42+
func (a Attributes) Attributes() *attribute.Set {
43+
return &a.set
44+
}
45+
46+
func (a Attributes) ID() int64 {
47+
return a.id
48+
}
49+
50+
func Receiver(pipelineType pipeline.Signal, id component.ID) *Attributes {
51+
return newAttributes(
52+
attribute.String(componentKindKey, component.KindReceiver.String()),
53+
attribute.String(signalKey, pipelineType.String()),
54+
attribute.String(componentIDKey, id.String()),
55+
)
56+
}
57+
58+
func Processor(pipelineID pipeline.ID, id component.ID) *Attributes {
59+
return newAttributes(
60+
attribute.String(componentKindKey, component.KindProcessor.String()),
61+
attribute.String(signalKey, pipelineID.Signal().String()),
62+
attribute.String(pipelineIDKey, pipelineID.String()),
63+
attribute.String(componentIDKey, id.String()),
64+
)
65+
}
66+
67+
func Exporter(pipelineType pipeline.Signal, id component.ID) *Attributes {
68+
return newAttributes(
69+
attribute.String(componentKindKey, component.KindExporter.String()),
70+
attribute.String(signalKey, pipelineType.String()),
71+
attribute.String(componentIDKey, id.String()),
72+
)
73+
}
74+
75+
func Connector(exprPipelineType, rcvrPipelineType pipeline.Signal, id component.ID) *Attributes {
76+
return newAttributes(
77+
attribute.String(componentKindKey, component.KindConnector.String()),
78+
attribute.String(signalKey, exprPipelineType.String()),
79+
attribute.String(signalOutputKey, rcvrPipelineType.String()),
80+
attribute.String(componentIDKey, id.String()),
81+
)
82+
}
83+
84+
func Capabilities(pipelineID pipeline.ID) *Attributes {
85+
return newAttributes(
86+
attribute.String(componentKindKey, capabiltiesKind),
87+
attribute.String(pipelineIDKey, pipelineID.String()),
88+
)
89+
}
90+
91+
func Fanout(pipelineID pipeline.ID) *Attributes {
92+
return newAttributes(
93+
attribute.String(componentKindKey, fanoutKind),
94+
attribute.String(pipelineIDKey, pipelineID.String()),
95+
)
96+
}
97+
98+
func Extension(id component.ID) *Attributes {
99+
return newAttributes(
100+
attribute.String(componentKindKey, component.KindExtension.String()),
101+
attribute.String(componentIDKey, id.String()),
102+
)
103+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package attribute
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/require"
10+
11+
"go.opentelemetry.io/collector/component"
12+
"go.opentelemetry.io/collector/pipeline"
13+
"go.opentelemetry.io/collector/pipeline/xpipeline"
14+
)
15+
16+
var (
17+
signals = []pipeline.Signal{
18+
pipeline.SignalTraces,
19+
pipeline.SignalMetrics,
20+
pipeline.SignalLogs,
21+
xpipeline.SignalProfiles,
22+
}
23+
24+
cIDs = []component.ID{
25+
component.MustNewID("foo"),
26+
component.MustNewID("foo2"),
27+
component.MustNewID("bar"),
28+
}
29+
30+
pIDs = []pipeline.ID{
31+
pipeline.MustNewID("traces"),
32+
pipeline.MustNewIDWithName("traces", "2"),
33+
pipeline.MustNewID("metrics"),
34+
pipeline.MustNewIDWithName("metrics", "2"),
35+
pipeline.MustNewID("logs"),
36+
pipeline.MustNewIDWithName("logs", "2"),
37+
pipeline.MustNewID("profiles"),
38+
pipeline.MustNewIDWithName("profiles", "2"),
39+
}
40+
)
41+
42+
func TestReceiver(t *testing.T) {
43+
for _, sig := range signals {
44+
for _, id := range cIDs {
45+
r := Receiver(sig, id)
46+
componentKind, ok := r.Attributes().Value(componentKindKey)
47+
require.True(t, ok)
48+
require.Equal(t, component.KindReceiver.String(), componentKind.AsString())
49+
50+
signal, ok := r.Attributes().Value(signalKey)
51+
require.True(t, ok)
52+
require.Equal(t, sig.String(), signal.AsString())
53+
54+
componentID, ok := r.Attributes().Value(componentIDKey)
55+
require.True(t, ok)
56+
require.Equal(t, id.String(), componentID.AsString())
57+
}
58+
}
59+
}
60+
61+
func TestProcessor(t *testing.T) {
62+
for _, pID := range pIDs {
63+
for _, id := range cIDs {
64+
p := Processor(pID, id)
65+
componentKind, ok := p.Attributes().Value(componentKindKey)
66+
require.True(t, ok)
67+
require.Equal(t, component.KindProcessor.String(), componentKind.AsString())
68+
69+
pipelineID, ok := p.Attributes().Value(pipelineIDKey)
70+
require.True(t, ok)
71+
require.Equal(t, pID.String(), pipelineID.AsString())
72+
73+
componentID, ok := p.Attributes().Value(componentIDKey)
74+
require.True(t, ok)
75+
require.Equal(t, id.String(), componentID.AsString())
76+
}
77+
}
78+
}
79+
80+
func TestExporter(t *testing.T) {
81+
for _, sig := range signals {
82+
for _, id := range cIDs {
83+
e := Exporter(sig, id)
84+
componentKind, ok := e.Attributes().Value(componentKindKey)
85+
require.True(t, ok)
86+
require.Equal(t, component.KindExporter.String(), componentKind.AsString())
87+
88+
signal, ok := e.Attributes().Value(signalKey)
89+
require.True(t, ok)
90+
require.Equal(t, sig.String(), signal.AsString())
91+
92+
componentID, ok := e.Attributes().Value(componentIDKey)
93+
require.True(t, ok)
94+
require.Equal(t, id.String(), componentID.AsString())
95+
}
96+
}
97+
}
98+
99+
func TestConnector(t *testing.T) {
100+
for _, exprSig := range signals {
101+
for _, rcvrSig := range signals {
102+
for _, id := range cIDs {
103+
c := Connector(exprSig, rcvrSig, id)
104+
componentKind, ok := c.Attributes().Value(componentKindKey)
105+
require.True(t, ok)
106+
require.Equal(t, component.KindConnector.String(), componentKind.AsString())
107+
108+
signal, ok := c.Attributes().Value(signalKey)
109+
require.True(t, ok)
110+
require.Equal(t, exprSig.String(), signal.AsString())
111+
112+
signalOutput, ok := c.Attributes().Value(signalOutputKey)
113+
require.True(t, ok)
114+
require.Equal(t, rcvrSig.String(), signalOutput.AsString())
115+
116+
componentID, ok := c.Attributes().Value(componentIDKey)
117+
require.True(t, ok)
118+
require.Equal(t, id.String(), componentID.AsString())
119+
}
120+
}
121+
}
122+
}
123+
124+
func TestExtension(t *testing.T) {
125+
e := Extension(component.MustNewID("foo"))
126+
componentKind, ok := e.Attributes().Value(componentKindKey)
127+
require.True(t, ok)
128+
require.Equal(t, component.KindExtension.String(), componentKind.AsString())
129+
}
130+
131+
func TestSetEquality(t *testing.T) {
132+
// The sets are created independently but should be exactly equivalent.
133+
// We will ensure that corresponding elements are equal and that
134+
// non-corresponding elements are not equal.
135+
setI, setJ := createExampleSets(), createExampleSets()
136+
for i, ei := range setI {
137+
for j, ej := range setJ {
138+
if i == j {
139+
require.Equal(t, ei.ID(), ej.ID())
140+
require.True(t, ei.Attributes().Equals(ej.Attributes()))
141+
} else {
142+
require.NotEqual(t, ei.ID(), ej.ID())
143+
require.False(t, ei.Attributes().Equals(ej.Attributes()))
144+
}
145+
}
146+
}
147+
}
148+
149+
func createExampleSets() []*Attributes {
150+
sets := []*Attributes{}
151+
152+
// Receiver examples.
153+
for _, sig := range signals {
154+
for _, id := range cIDs {
155+
sets = append(sets, Receiver(sig, id))
156+
}
157+
}
158+
159+
// Processor examples.
160+
for _, pID := range pIDs {
161+
for _, cID := range cIDs {
162+
sets = append(sets, Processor(pID, cID))
163+
}
164+
}
165+
166+
// Exporter examples.
167+
for _, sig := range signals {
168+
for _, id := range cIDs {
169+
sets = append(sets, Exporter(sig, id))
170+
}
171+
}
172+
173+
// Connector examples.
174+
for _, exprSig := range signals {
175+
for _, rcvrSig := range signals {
176+
for _, id := range cIDs {
177+
sets = append(sets, Connector(exprSig, rcvrSig, id))
178+
}
179+
}
180+
}
181+
182+
// Capabilities examples.
183+
for _, pID := range pIDs {
184+
sets = append(sets, Capabilities(pID))
185+
}
186+
187+
// Fanout examples.
188+
for _, pID := range pIDs {
189+
sets = append(sets, Fanout(pID))
190+
}
191+
192+
return sets
193+
}

0 commit comments

Comments
 (0)