Skip to content

Commit 4ace638

Browse files
authored
[chore][graph] Decompose buildConnector (open-telemetry#11330)
The primary goal here was to delegate from each switch case, rather than nest logic as deep as necessary.
1 parent 69ff46b commit 4ace638

File tree

1 file changed

+203
-157
lines changed

1 file changed

+203
-157
lines changed

service/internal/graph/connector.go

+203-157
Original file line numberDiff line numberDiff line change
@@ -55,173 +55,219 @@ func (n *connectorNode) buildComponent(
5555
) error {
5656
tel.Logger = components.ConnectorLogger(tel.Logger, n.componentID, n.exprPipelineType, n.rcvrPipelineType)
5757
set := connector.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
58-
5958
switch n.rcvrPipelineType {
6059
case pipeline.SignalTraces:
61-
capability := consumer.Capabilities{MutatesData: false}
62-
consumers := make(map[pipeline.ID]consumer.Traces, len(nexts))
63-
for _, next := range nexts {
64-
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Traces)
65-
capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData
66-
}
67-
next := connector.NewTracesRouter(consumers)
68-
69-
switch n.exprPipelineType {
70-
case pipeline.SignalTraces:
71-
conn, err := builder.CreateTracesToTraces(ctx, set, next)
72-
if err != nil {
73-
return err
74-
}
75-
n.Component = conn
76-
// When connecting pipelines of the same data type, the connector must
77-
// inherit the capabilities of pipelines in which it is acting as a receiver.
78-
// Since the incoming and outgoing data types are the same, we must also consider
79-
// that the connector itself may MutatesData.
80-
capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData
81-
n.baseConsumer = capabilityconsumer.NewTraces(conn, capability)
82-
case pipeline.SignalMetrics:
83-
conn, err := builder.CreateMetricsToTraces(ctx, set, next)
84-
if err != nil {
85-
return err
86-
}
87-
n.Component, n.baseConsumer = conn, conn
88-
case pipeline.SignalLogs:
89-
conn, err := builder.CreateLogsToTraces(ctx, set, next)
90-
if err != nil {
91-
return err
92-
}
93-
n.Component, n.baseConsumer = conn, conn
94-
case componentprofiles.SignalProfiles:
95-
conn, err := builder.CreateProfilesToTraces(ctx, set, next)
96-
if err != nil {
97-
return err
98-
}
99-
n.Component, n.baseConsumer = conn, conn
60+
return n.buildTraces(ctx, set, builder, nexts)
61+
case pipeline.SignalMetrics:
62+
return n.buildMetrics(ctx, set, builder, nexts)
63+
case pipeline.SignalLogs:
64+
return n.buildLogs(ctx, set, builder, nexts)
65+
case componentprofiles.SignalProfiles:
66+
return n.buildProfiles(ctx, set, builder, nexts)
67+
}
68+
return nil
69+
}
70+
71+
func (n *connectorNode) buildTraces(
72+
ctx context.Context,
73+
set connector.Settings,
74+
builder *builders.ConnectorBuilder,
75+
nexts []baseConsumer,
76+
) error {
77+
consumers := make(map[pipeline.ID]consumer.Traces, len(nexts))
78+
for _, next := range nexts {
79+
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Traces)
80+
}
81+
next := connector.NewTracesRouter(consumers)
82+
83+
switch n.exprPipelineType {
84+
case pipeline.SignalTraces:
85+
conn, err := builder.CreateTracesToTraces(ctx, set, next)
86+
if err != nil {
87+
return err
88+
}
89+
n.Component, n.baseConsumer = conn, conn
90+
case pipeline.SignalMetrics:
91+
conn, err := builder.CreateMetricsToTraces(ctx, set, next)
92+
if err != nil {
93+
return err
94+
}
95+
n.Component, n.baseConsumer = conn, conn
96+
case pipeline.SignalLogs:
97+
conn, err := builder.CreateLogsToTraces(ctx, set, next)
98+
if err != nil {
99+
return err
100+
}
101+
n.Component, n.baseConsumer = conn, conn
102+
case componentprofiles.SignalProfiles:
103+
conn, err := builder.CreateProfilesToTraces(ctx, set, next)
104+
if err != nil {
105+
return err
100106
}
107+
n.Component, n.baseConsumer = conn, conn
108+
}
109+
110+
if n.exprPipelineType == pipeline.SignalTraces {
111+
n.baseConsumer = capabilityconsumer.NewTraces(
112+
n.Component.(connector.Traces),
113+
aggregateCapabilities(n.baseConsumer, nexts),
114+
)
115+
}
116+
return nil
117+
}
118+
119+
func (n *connectorNode) buildMetrics(
120+
ctx context.Context,
121+
set connector.Settings,
122+
builder *builders.ConnectorBuilder,
123+
nexts []baseConsumer,
124+
) error {
125+
consumers := make(map[pipeline.ID]consumer.Metrics, len(nexts))
126+
for _, next := range nexts {
127+
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Metrics)
128+
}
129+
next := connector.NewMetricsRouter(consumers)
101130

131+
switch n.exprPipelineType {
132+
case pipeline.SignalTraces:
133+
conn, err := builder.CreateTracesToMetrics(ctx, set, next)
134+
if err != nil {
135+
return err
136+
}
137+
n.Component, n.baseConsumer = conn, conn
102138
case pipeline.SignalMetrics:
103-
capability := consumer.Capabilities{MutatesData: false}
104-
consumers := make(map[pipeline.ID]consumer.Metrics, len(nexts))
105-
for _, next := range nexts {
106-
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Metrics)
107-
capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData
108-
}
109-
next := connector.NewMetricsRouter(consumers)
110-
111-
switch n.exprPipelineType {
112-
case pipeline.SignalTraces:
113-
conn, err := builder.CreateTracesToMetrics(ctx, set, next)
114-
if err != nil {
115-
return err
116-
}
117-
n.Component, n.baseConsumer = conn, conn
118-
case pipeline.SignalMetrics:
119-
conn, err := builder.CreateMetricsToMetrics(ctx, set, next)
120-
if err != nil {
121-
return err
122-
}
123-
n.Component = conn
124-
// When connecting pipelines of the same data type, the connector must
125-
// inherit the capabilities of pipelines in which it is acting as a receiver.
126-
// Since the incoming and outgoing data types are the same, we must also consider
127-
// that the connector itself may MutatesData.
128-
capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData
129-
n.baseConsumer = capabilityconsumer.NewMetrics(conn, capability)
130-
case pipeline.SignalLogs:
131-
conn, err := builder.CreateLogsToMetrics(ctx, set, next)
132-
if err != nil {
133-
return err
134-
}
135-
n.Component, n.baseConsumer = conn, conn
136-
case componentprofiles.SignalProfiles:
137-
conn, err := builder.CreateProfilesToMetrics(ctx, set, next)
138-
if err != nil {
139-
return err
140-
}
141-
n.Component, n.baseConsumer = conn, conn
139+
conn, err := builder.CreateMetricsToMetrics(ctx, set, next)
140+
if err != nil {
141+
return err
142142
}
143+
n.Component, n.baseConsumer = conn, conn
143144
case pipeline.SignalLogs:
144-
capability := consumer.Capabilities{MutatesData: false}
145-
consumers := make(map[pipeline.ID]consumer.Logs, len(nexts))
146-
for _, next := range nexts {
147-
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Logs)
148-
capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData
149-
}
150-
next := connector.NewLogsRouter(consumers)
151-
152-
switch n.exprPipelineType {
153-
case pipeline.SignalTraces:
154-
conn, err := builder.CreateTracesToLogs(ctx, set, next)
155-
if err != nil {
156-
return err
157-
}
158-
n.Component, n.baseConsumer = conn, conn
159-
case pipeline.SignalMetrics:
160-
conn, err := builder.CreateMetricsToLogs(ctx, set, next)
161-
if err != nil {
162-
return err
163-
}
164-
n.Component, n.baseConsumer = conn, conn
165-
case pipeline.SignalLogs:
166-
conn, err := builder.CreateLogsToLogs(ctx, set, next)
167-
if err != nil {
168-
return err
169-
}
170-
n.Component = conn
171-
// When connecting pipelines of the same data type, the connector must
172-
// inherit the capabilities of pipelines in which it is acting as a receiver.
173-
// Since the incoming and outgoing data types are the same, we must also consider
174-
// that the connector itself may MutatesData.
175-
capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData
176-
n.baseConsumer = capabilityconsumer.NewLogs(conn, capability)
177-
case componentprofiles.SignalProfiles:
178-
conn, err := builder.CreateProfilesToLogs(ctx, set, next)
179-
if err != nil {
180-
return err
181-
}
182-
n.Component, n.baseConsumer = conn, conn
145+
conn, err := builder.CreateLogsToMetrics(ctx, set, next)
146+
if err != nil {
147+
return err
183148
}
149+
n.Component, n.baseConsumer = conn, conn
184150
case componentprofiles.SignalProfiles:
185-
capability := consumer.Capabilities{MutatesData: false}
186-
consumers := make(map[pipeline.ID]consumerprofiles.Profiles, len(nexts))
187-
for _, next := range nexts {
188-
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumerprofiles.Profiles)
189-
capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData
190-
}
191-
next := connectorprofiles.NewProfilesRouter(consumers)
192-
193-
switch n.exprPipelineType {
194-
case pipeline.SignalTraces:
195-
conn, err := builder.CreateTracesToProfiles(ctx, set, next)
196-
if err != nil {
197-
return err
198-
}
199-
n.Component, n.baseConsumer = conn, conn
200-
case pipeline.SignalMetrics:
201-
conn, err := builder.CreateMetricsToProfiles(ctx, set, next)
202-
if err != nil {
203-
return err
204-
}
205-
n.Component, n.baseConsumer = conn, conn
206-
case pipeline.SignalLogs:
207-
conn, err := builder.CreateLogsToProfiles(ctx, set, next)
208-
if err != nil {
209-
return err
210-
}
211-
n.Component, n.baseConsumer = conn, conn
212-
case componentprofiles.SignalProfiles:
213-
conn, err := builder.CreateProfilesToProfiles(ctx, set, next)
214-
if err != nil {
215-
return err
216-
}
217-
n.Component = conn
218-
// When connecting pipelines of the same data type, the connector must
219-
// inherit the capabilities of pipelines in which it is acting as a receiver.
220-
// Since the incoming and outgoing data types are the same, we must also consider
221-
// that the connector itself may MutatesData.
222-
capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData
223-
n.baseConsumer = capabilityconsumer.NewProfiles(conn, capability)
151+
conn, err := builder.CreateProfilesToMetrics(ctx, set, next)
152+
if err != nil {
153+
return err
224154
}
155+
n.Component, n.baseConsumer = conn, conn
156+
}
157+
158+
if n.exprPipelineType == pipeline.SignalMetrics {
159+
n.baseConsumer = capabilityconsumer.NewMetrics(
160+
n.Component.(connector.Metrics),
161+
aggregateCapabilities(n.baseConsumer, nexts),
162+
)
225163
}
226164
return nil
227165
}
166+
167+
func (n *connectorNode) buildLogs(
168+
ctx context.Context,
169+
set connector.Settings,
170+
builder *builders.ConnectorBuilder,
171+
nexts []baseConsumer,
172+
) error {
173+
consumers := make(map[pipeline.ID]consumer.Logs, len(nexts))
174+
for _, next := range nexts {
175+
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Logs)
176+
}
177+
next := connector.NewLogsRouter(consumers)
178+
179+
switch n.exprPipelineType {
180+
case pipeline.SignalTraces:
181+
conn, err := builder.CreateTracesToLogs(ctx, set, next)
182+
if err != nil {
183+
return err
184+
}
185+
n.Component, n.baseConsumer = conn, conn
186+
case pipeline.SignalMetrics:
187+
conn, err := builder.CreateMetricsToLogs(ctx, set, next)
188+
if err != nil {
189+
return err
190+
}
191+
n.Component, n.baseConsumer = conn, conn
192+
case pipeline.SignalLogs:
193+
conn, err := builder.CreateLogsToLogs(ctx, set, next)
194+
if err != nil {
195+
return err
196+
}
197+
n.Component, n.baseConsumer = conn, conn
198+
case componentprofiles.SignalProfiles:
199+
conn, err := builder.CreateProfilesToLogs(ctx, set, next)
200+
if err != nil {
201+
return err
202+
}
203+
n.Component, n.baseConsumer = conn, conn
204+
}
205+
206+
if n.exprPipelineType == pipeline.SignalLogs {
207+
n.baseConsumer = capabilityconsumer.NewLogs(
208+
n.Component.(connector.Logs),
209+
aggregateCapabilities(n.baseConsumer, nexts),
210+
)
211+
}
212+
return nil
213+
}
214+
215+
func (n *connectorNode) buildProfiles(
216+
ctx context.Context,
217+
set connector.Settings,
218+
builder *builders.ConnectorBuilder,
219+
nexts []baseConsumer,
220+
) error {
221+
consumers := make(map[pipeline.ID]consumerprofiles.Profiles, len(nexts))
222+
for _, next := range nexts {
223+
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumerprofiles.Profiles)
224+
}
225+
next := connectorprofiles.NewProfilesRouter(consumers)
226+
227+
switch n.exprPipelineType {
228+
case pipeline.SignalTraces:
229+
conn, err := builder.CreateTracesToProfiles(ctx, set, next)
230+
if err != nil {
231+
return err
232+
}
233+
n.Component, n.baseConsumer = conn, conn
234+
case pipeline.SignalMetrics:
235+
conn, err := builder.CreateMetricsToProfiles(ctx, set, next)
236+
if err != nil {
237+
return err
238+
}
239+
n.Component, n.baseConsumer = conn, conn
240+
case pipeline.SignalLogs:
241+
conn, err := builder.CreateLogsToProfiles(ctx, set, next)
242+
if err != nil {
243+
return err
244+
}
245+
n.Component, n.baseConsumer = conn, conn
246+
case componentprofiles.SignalProfiles:
247+
conn, err := builder.CreateProfilesToProfiles(ctx, set, next)
248+
if err != nil {
249+
return err
250+
}
251+
n.Component, n.baseConsumer = conn, conn
252+
}
253+
254+
if n.exprPipelineType == componentprofiles.SignalProfiles {
255+
n.baseConsumer = capabilityconsumer.NewProfiles(
256+
n.Component.(connectorprofiles.Profiles),
257+
aggregateCapabilities(n.baseConsumer, nexts),
258+
)
259+
}
260+
return nil
261+
}
262+
263+
// When connecting pipelines of the same data type, the connector must
264+
// inherit the capabilities of pipelines in which it is acting as a receiver.
265+
// Since the incoming and outgoing data types are the same, we must also consider
266+
// that the connector itself may MutatesData.
267+
func aggregateCapabilities(base baseConsumer, nexts []baseConsumer) consumer.Capabilities {
268+
capabilities := base.Capabilities()
269+
for _, next := range nexts {
270+
capabilities.MutatesData = capabilities.MutatesData || next.Capabilities().MutatesData
271+
}
272+
return capabilities
273+
}

0 commit comments

Comments
 (0)