Skip to content

Commit e742a18

Browse files
committed
Merge branch 'graph-attributes' into singleton-flags-and-attributes
2 parents 09ba009 + dba8b05 commit e742a18

File tree

11 files changed

+235
-56
lines changed

11 files changed

+235
-56
lines changed

Diff for: service/internal/graph/attribute/attribute.go

+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package attribute // import "go.opentelemetry.io/collector/service/internal/graph/attribute"
5+
6+
import (
7+
"fmt"
8+
"hash/fnv"
9+
10+
"go.opentelemetry.io/otel/attribute"
11+
12+
"go.opentelemetry.io/collector/component"
13+
"go.opentelemetry.io/collector/pipeline"
14+
)
15+
16+
const (
17+
componentKindKey = "otelcol.component.kind"
18+
componentIDKey = "otelcol.component.id"
19+
pipelineIDKey = "otelcol.pipeline.id"
20+
signalKey = "otelcol.signal"
21+
signalOutputKey = "otelcol.signal.output"
22+
23+
receiverKind = "receiver"
24+
processorKind = "processor"
25+
exporterKind = "exporter"
26+
connectorKind = "connector"
27+
capabiltiesKind = "capabilities"
28+
fanoutKind = "fanout"
29+
)
30+
31+
type Attributes struct {
32+
set attribute.Set
33+
id int64
34+
}
35+
36+
func newAttributes(attrs ...attribute.KeyValue) *Attributes {
37+
h := fnv.New64a()
38+
for _, kv := range attrs {
39+
h.Write([]byte("(" + string(kv.Key) + "|" + kv.Value.AsString() + ")"))
40+
}
41+
return &Attributes{
42+
set: attribute.NewSet(attrs...),
43+
id: int64(h.Sum64()), // #nosec G115
44+
}
45+
}
46+
47+
func (a Attributes) Attributes() *attribute.Set {
48+
return &a.set
49+
}
50+
51+
func (a Attributes) ID() int64 {
52+
return a.id
53+
}
54+
55+
func Receiver(pipelineType pipeline.Signal, id component.ID) *Attributes {
56+
return newAttributes(
57+
attribute.String(componentKindKey, receiverKind),
58+
attribute.String(signalKey, pipelineType.String()),
59+
attribute.String(componentIDKey, id.String()),
60+
)
61+
}
62+
63+
func Processor(pipelineID pipeline.ID, id component.ID) *Attributes {
64+
return newAttributes(
65+
attribute.String(componentKindKey, processorKind),
66+
attribute.String(signalKey, pipelineID.Signal().String()),
67+
attribute.String(pipelineIDKey, pipelineID.String()),
68+
attribute.String(componentIDKey, id.String()),
69+
)
70+
}
71+
72+
func Exporter(pipelineType pipeline.Signal, id component.ID) *Attributes {
73+
return newAttributes(
74+
attribute.String(componentKindKey, exporterKind),
75+
attribute.String(signalKey, pipelineType.String()),
76+
attribute.String(componentIDKey, id.String()),
77+
)
78+
}
79+
80+
func Connector(exprPipelineType, rcvrPipelineType pipeline.Signal, id component.ID) *Attributes {
81+
return newAttributes(
82+
attribute.String(componentKindKey, connectorKind),
83+
attribute.String(signalKey, fmt.Sprintf("%s_to_%s", exprPipelineType.String(), rcvrPipelineType.String())),
84+
attribute.String(componentIDKey, id.String()),
85+
)
86+
}
87+
88+
func Capabilities(pipelineID pipeline.ID) *Attributes {
89+
return newAttributes(
90+
attribute.String(componentKindKey, capabiltiesKind),
91+
attribute.String(pipelineIDKey, pipelineID.String()),
92+
)
93+
}
94+
95+
func Fanout(pipelineID pipeline.ID) *Attributes {
96+
return newAttributes(
97+
attribute.String(componentKindKey, fanoutKind),
98+
attribute.String(pipelineIDKey, pipelineID.String()),
99+
)
100+
}

Diff for: service/internal/graph/attribute/attribute_test.go

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package attribute
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/require"
10+
11+
"go.opentelemetry.io/collector/component"
12+
"go.opentelemetry.io/collector/pipeline"
13+
"go.opentelemetry.io/collector/pipeline/pipelineprofiles"
14+
)
15+
16+
var (
17+
signals = []pipeline.Signal{
18+
pipeline.SignalTraces,
19+
pipeline.SignalMetrics,
20+
pipeline.SignalLogs,
21+
pipelineprofiles.SignalProfiles,
22+
}
23+
24+
cIDs = []component.ID{
25+
component.MustNewID("foo"),
26+
component.MustNewID("foo2"),
27+
component.MustNewID("bar"),
28+
}
29+
30+
pIDs = []pipeline.ID{
31+
pipeline.MustNewID("traces"),
32+
pipeline.MustNewIDWithName("traces", "2"),
33+
pipeline.MustNewID("metrics"),
34+
pipeline.MustNewIDWithName("metrics", "2"),
35+
pipeline.MustNewID("logs"),
36+
pipeline.MustNewIDWithName("logs", "2"),
37+
pipeline.MustNewID("profiles"),
38+
pipeline.MustNewIDWithName("profiles", "2"),
39+
}
40+
)
41+
42+
func TestAttributes(t *testing.T) {
43+
// The sets are created independently but should be exactly equivalent.
44+
// We will ensure that corresponding elements are equal and that
45+
// non-corresponding elements are not equal.
46+
setI, setJ := createExampleSets(), createExampleSets()
47+
for i, ei := range setI {
48+
for j, ej := range setJ {
49+
if i == j {
50+
require.Equal(t, ei.ID(), ej.ID())
51+
require.True(t, ei.Attributes().Equals(ej.Attributes()))
52+
} else {
53+
require.NotEqual(t, ei.ID(), ej.ID())
54+
require.False(t, ei.Attributes().Equals(ej.Attributes()))
55+
}
56+
}
57+
}
58+
}
59+
60+
func createExampleSets() []*Attributes {
61+
sets := []*Attributes{}
62+
63+
// Receiver examples.
64+
for _, sig := range signals {
65+
for _, id := range cIDs {
66+
sets = append(sets, Receiver(sig, id))
67+
}
68+
}
69+
70+
// Processor examples.
71+
for _, pID := range pIDs {
72+
for _, cID := range cIDs {
73+
sets = append(sets, Processor(pID, cID))
74+
}
75+
}
76+
77+
// Exporter examples.
78+
for _, sig := range signals {
79+
for _, id := range cIDs {
80+
sets = append(sets, Exporter(sig, id))
81+
}
82+
}
83+
84+
// Connector examples.
85+
for _, exprSig := range signals {
86+
for _, rcvrSig := range signals {
87+
for _, id := range cIDs {
88+
sets = append(sets, Connector(exprSig, rcvrSig, id))
89+
}
90+
}
91+
}
92+
93+
// Capabilities examples.
94+
for _, pID := range pIDs {
95+
sets = append(sets, Capabilities(pID))
96+
}
97+
98+
// Fanout examples.
99+
for _, pID := range pIDs {
100+
sets = append(sets, Fanout(pID))
101+
}
102+
103+
return sets
104+
}

Diff for: service/internal/graph/capabilities.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@ import (
77
"go.opentelemetry.io/collector/consumer"
88
"go.opentelemetry.io/collector/consumer/xconsumer"
99
"go.opentelemetry.io/collector/pipeline"
10+
"go.opentelemetry.io/collector/service/internal/graph/attribute"
1011
)
1112

12-
const capabilitiesSeed = "capabilities"
13-
1413
var _ consumerNode = (*capabilitiesNode)(nil)
1514

1615
// Every pipeline has a "virtual" capabilities node immediately after the receiver(s).
@@ -19,7 +18,7 @@ var _ consumerNode = (*capabilitiesNode)(nil)
1918
// 2. Present a consistent "first consumer" for each pipeline.
2019
// The nodeID is derived from "pipeline ID".
2120
type capabilitiesNode struct {
22-
nodeID
21+
*attribute.Attributes
2322
pipelineID pipeline.ID
2423
baseConsumer
2524
consumer.ConsumeTracesFunc
@@ -30,7 +29,7 @@ type capabilitiesNode struct {
3029

3130
func newCapabilitiesNode(pipelineID pipeline.ID) *capabilitiesNode {
3231
return &capabilitiesNode{
33-
nodeID: newNodeID(capabilitiesSeed, pipelineID.String()),
32+
Attributes: attribute.Capabilities(pipelineID),
3433
pipelineID: pipelineID,
3534
}
3635
}

Diff for: service/internal/graph/connector.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,15 @@ import (
1616
"go.opentelemetry.io/collector/service/internal/builders"
1717
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
1818
"go.opentelemetry.io/collector/service/internal/components"
19+
"go.opentelemetry.io/collector/service/internal/graph/attribute"
1920
)
2021

21-
const connectorSeed = "connector"
22-
2322
var _ consumerNode = (*connectorNode)(nil)
2423

2524
// A connector instance connects one pipeline type to one other pipeline type.
2625
// Therefore, nodeID is derived from "exporter pipeline type", "receiver pipeline type", and "component ID".
2726
type connectorNode struct {
28-
nodeID
27+
*attribute.Attributes
2928
componentID component.ID
3029
exprPipelineType pipeline.Signal
3130
rcvrPipelineType pipeline.Signal
@@ -34,7 +33,7 @@ type connectorNode struct {
3433

3534
func newConnectorNode(exprPipelineType, rcvrPipelineType pipeline.Signal, connID component.ID) *connectorNode {
3635
return &connectorNode{
37-
nodeID: newNodeID(connectorSeed, connID.String(), exprPipelineType.String(), rcvrPipelineType.String()),
36+
Attributes: attribute.Connector(exprPipelineType, rcvrPipelineType, connID),
3837
componentID: connID,
3938
exprPipelineType: exprPipelineType,
4039
rcvrPipelineType: rcvrPipelineType,

Diff for: service/internal/graph/exporter.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,23 @@ import (
1313
"go.opentelemetry.io/collector/pipeline/xpipeline"
1414
"go.opentelemetry.io/collector/service/internal/builders"
1515
"go.opentelemetry.io/collector/service/internal/components"
16+
"go.opentelemetry.io/collector/service/internal/graph/attribute"
1617
)
1718

18-
const exporterSeed = "exporter"
19-
2019
var _ consumerNode = (*exporterNode)(nil)
2120

2221
// An exporter instance can be shared by multiple pipelines of the same type.
2322
// Therefore, nodeID is derived from "pipeline type" and "component ID".
2423
type exporterNode struct {
25-
nodeID
24+
*attribute.Attributes
2625
componentID component.ID
2726
pipelineType pipeline.Signal
2827
component.Component
2928
}
3029

3130
func newExporterNode(pipelineType pipeline.Signal, exprID component.ID) *exporterNode {
3231
return &exporterNode{
33-
nodeID: newNodeID(exporterSeed, pipelineType.String(), exprID.String()),
32+
Attributes: attribute.Exporter(pipelineType, exprID),
3433
componentID: exprID,
3534
pipelineType: pipelineType,
3635
}

Diff for: service/internal/graph/fanout.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,22 @@ package graph // import "go.opentelemetry.io/collector/service/internal/graph"
55

66
import (
77
"go.opentelemetry.io/collector/pipeline"
8+
"go.opentelemetry.io/collector/service/internal/graph/attribute"
89
)
910

10-
const fanOutToExporters = "fanout_to_exporters"
11-
1211
var _ consumerNode = (*fanOutNode)(nil)
1312

1413
// Each pipeline has one fan-out node before exporters.
1514
// Therefore, nodeID is derived from "pipeline ID".
1615
type fanOutNode struct {
17-
nodeID
16+
*attribute.Attributes
1817
pipelineID pipeline.ID
1918
baseConsumer
2019
}
2120

2221
func newFanOutNode(pipelineID pipeline.ID) *fanOutNode {
2322
return &fanOutNode{
24-
nodeID: newNodeID(fanOutToExporters, pipelineID.String()),
23+
Attributes: attribute.Fanout(pipelineID),
2524
pipelineID: pipelineID,
2625
}
2726
}

Diff for: service/internal/graph/graph_test.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -2067,11 +2067,11 @@ func TestGraphBuildErrors(t *testing.T) {
20672067
},
20682068
},
20692069
expected: `cycle detected: ` +
2070-
`connector "nop/conn1" (traces to traces) -> ` +
2071-
`processor "nop" in pipeline "traces/2" -> ` +
20722070
`connector "nop/conn" (traces to traces) -> ` +
20732071
`processor "nop" in pipeline "traces/1" -> ` +
2074-
`connector "nop/conn1" (traces to traces)`,
2072+
`connector "nop/conn1" (traces to traces) -> ` +
2073+
`processor "nop" in pipeline "traces/2" -> ` +
2074+
`connector "nop/conn" (traces to traces)`,
20752075
},
20762076
{
20772077
name: "not_allowed_deep_cycle_metrics.yaml",
@@ -2157,11 +2157,11 @@ func TestGraphBuildErrors(t *testing.T) {
21572157
},
21582158
},
21592159
expected: `cycle detected: ` +
2160-
`connector "nop/conn1" (logs to logs) -> ` +
2161-
`processor "nop" in pipeline "logs/2" -> ` +
21622160
`connector "nop/conn" (logs to logs) -> ` +
21632161
`processor "nop" in pipeline "logs/1" -> ` +
2164-
`connector "nop/conn1" (logs to logs)`,
2162+
`connector "nop/conn1" (logs to logs) -> ` +
2163+
`processor "nop" in pipeline "logs/2" -> ` +
2164+
`connector "nop/conn" (logs to logs)`,
21652165
},
21662166
{
21672167
name: "not_allowed_deep_cycle_profiles.yaml",
@@ -2263,13 +2263,13 @@ func TestGraphBuildErrors(t *testing.T) {
22632263
},
22642264
},
22652265
expected: `cycle detected: ` +
2266+
`connector "nop/forkagain" (traces to traces) -> ` +
2267+
`processor "nop" in pipeline "traces/copy2b" -> ` +
22662268
`connector "nop/rawlog" (traces to logs) -> ` +
22672269
`processor "nop" in pipeline "logs/raw" -> ` +
22682270
`connector "nop/fork" (logs to traces) -> ` +
22692271
`processor "nop" in pipeline "traces/copy2" -> ` +
2270-
`connector "nop/forkagain" (traces to traces) -> ` +
2271-
`processor "nop" in pipeline "traces/copy2b" -> ` +
2272-
`connector "nop/rawlog" (traces to logs)`,
2272+
`connector "nop/forkagain" (traces to traces)`,
22732273
},
22742274
{
22752275
name: "unknown_exporter_config",

Diff for: service/internal/graph/node.go

-22
This file was deleted.

0 commit comments

Comments
 (0)