Skip to content

Commit a41298f

Browse files
authored
[connector/routing] Fix issue where conditions were not deduplicated properly (open-telemetry#35962)
1 parent 468908c commit a41298f

File tree

25 files changed

+1749
-23
lines changed

25 files changed

+1749
-23
lines changed

.chloggen/routing-tests-golden.yaml

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: connector/routing
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix detection of duplicate conditions in routing table.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35962]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

connector/routingconnector/config_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func TestLoadConfig(t *testing.T) {
2424
expected component.Config
2525
}{
2626
{
27-
configPath: "config_traces.yaml",
27+
configPath: filepath.Join("testdata", "config", "traces.yaml"),
2828
id: component.NewIDWithName(metadata.Type, ""),
2929
expected: &Config{
3030
DefaultPipelines: []pipeline.ID{
@@ -49,7 +49,7 @@ func TestLoadConfig(t *testing.T) {
4949
},
5050
},
5151
{
52-
configPath: "config_metrics.yaml",
52+
configPath: filepath.Join("testdata", "config", "metrics.yaml"),
5353
id: component.NewIDWithName(metadata.Type, ""),
5454
expected: &Config{
5555
DefaultPipelines: []pipeline.ID{
@@ -74,7 +74,7 @@ func TestLoadConfig(t *testing.T) {
7474
},
7575
},
7676
{
77-
configPath: "config_logs.yaml",
77+
configPath: filepath.Join("testdata", "config", "logs.yaml"),
7878
id: component.NewIDWithName(metadata.Type, ""),
7979
expected: &Config{
8080
DefaultPipelines: []pipeline.ID{
@@ -102,7 +102,7 @@ func TestLoadConfig(t *testing.T) {
102102

103103
for _, tt := range testcases {
104104
t.Run(tt.configPath, func(t *testing.T) {
105-
cm, err := confmaptest.LoadConf(filepath.Join("testdata", tt.configPath))
105+
cm, err := confmaptest.LoadConf(tt.configPath)
106106
require.NoError(t, err)
107107

108108
factory := NewFactory()

connector/routingconnector/logs_test.go

+92
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,24 @@ package routingconnector // import "github.com/open-telemetry/opentelemetry-coll
55

66
import (
77
"context"
8+
"os"
9+
"path/filepath"
810
"testing"
911

1012
"github.com/stretchr/testify/assert"
1113
"github.com/stretchr/testify/require"
14+
"go.opentelemetry.io/collector/component"
1215
"go.opentelemetry.io/collector/component/componenttest"
16+
"go.opentelemetry.io/collector/confmap/confmaptest"
1317
"go.opentelemetry.io/collector/connector"
1418
"go.opentelemetry.io/collector/connector/connectortest"
1519
"go.opentelemetry.io/collector/consumer"
1620
"go.opentelemetry.io/collector/consumer/consumertest"
1721
"go.opentelemetry.io/collector/pdata/plog"
1822
"go.opentelemetry.io/collector/pipeline"
23+
24+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
25+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
1926
)
2027

2128
func TestLogsRegisterConsumersForValidRoute(t *testing.T) {
@@ -465,3 +472,88 @@ func TestLogsConnectorCapabilities(t *testing.T) {
465472
require.NoError(t, err)
466473
assert.False(t, conn.Capabilities().MutatesData)
467474
}
475+
476+
func TestLogsConnectorDetailed(t *testing.T) {
477+
testCases := []string{
478+
filepath.Join("testdata", "logs", "resource_context", "all_match_first_only"),
479+
filepath.Join("testdata", "logs", "resource_context", "all_match_last_only"),
480+
filepath.Join("testdata", "logs", "resource_context", "all_match_once"),
481+
filepath.Join("testdata", "logs", "resource_context", "each_matches_one"),
482+
filepath.Join("testdata", "logs", "resource_context", "match_none_with_default"),
483+
filepath.Join("testdata", "logs", "resource_context", "match_none_without_default"),
484+
}
485+
486+
for _, tt := range testCases {
487+
t.Run(tt, func(t *testing.T) {
488+
489+
cm, err := confmaptest.LoadConf(filepath.Join(tt, "config.yaml"))
490+
require.NoError(t, err)
491+
factory := NewFactory()
492+
cfg := factory.CreateDefaultConfig()
493+
sub, err := cm.Sub("routing")
494+
require.NoError(t, err)
495+
require.NoError(t, sub.Unmarshal(cfg))
496+
require.NoError(t, component.ValidateConfig(cfg))
497+
498+
var sinkDefault, sink0, sink1 consumertest.LogsSink
499+
router := connector.NewLogsRouter(map[pipeline.ID]consumer.Logs{
500+
pipeline.NewIDWithName(pipeline.SignalLogs, "default"): &sinkDefault,
501+
pipeline.NewIDWithName(pipeline.SignalLogs, "0"): &sink0,
502+
pipeline.NewIDWithName(pipeline.SignalLogs, "1"): &sink1,
503+
})
504+
505+
conn, err := factory.CreateLogsToLogs(
506+
context.Background(),
507+
connectortest.NewNopSettings(),
508+
cfg,
509+
router.(consumer.Logs),
510+
)
511+
require.NoError(t, err)
512+
513+
var expected0, expected1, expectedDefault *plog.Logs
514+
if expected, readErr := golden.ReadLogs(filepath.Join(tt, "sink_0.yaml")); readErr == nil {
515+
expected0 = &expected
516+
} else if !os.IsNotExist(readErr) {
517+
t.Fatalf("Error reading sink_0.yaml: %v", readErr)
518+
}
519+
520+
if expected, readErr := golden.ReadLogs(filepath.Join(tt, "sink_1.yaml")); readErr == nil {
521+
expected1 = &expected
522+
} else if !os.IsNotExist(readErr) {
523+
t.Fatalf("Error reading sink_1.yaml: %v", readErr)
524+
}
525+
526+
if expected, readErr := golden.ReadLogs(filepath.Join(tt, "sink_default.yaml")); readErr == nil {
527+
expectedDefault = &expected
528+
} else if !os.IsNotExist(readErr) {
529+
t.Fatalf("Error reading sink_default.yaml: %v", readErr)
530+
}
531+
532+
input, readErr := golden.ReadLogs(filepath.Join(tt, "input.yaml"))
533+
require.NoError(t, readErr)
534+
535+
require.NoError(t, conn.ConsumeLogs(context.Background(), input))
536+
537+
if expected0 == nil {
538+
assert.Empty(t, sink0.AllLogs(), "sink0 should be empty")
539+
} else {
540+
require.Len(t, sink0.AllLogs(), 1, "sink0 should have one plog.Logs")
541+
assert.NoError(t, plogtest.CompareLogs(*expected0, sink0.AllLogs()[0]), "sink0 has unexpected result")
542+
}
543+
544+
if expected1 == nil {
545+
assert.Empty(t, sink1.AllLogs(), "sink1 should be empty")
546+
} else {
547+
require.Len(t, sink1.AllLogs(), 1, "sink1 should have one plog.Logs")
548+
assert.NoError(t, plogtest.CompareLogs(*expected1, sink1.AllLogs()[0]), "sink1 has unexpected result")
549+
}
550+
551+
if expectedDefault == nil {
552+
assert.Empty(t, sinkDefault.AllLogs(), "sinkDefault should be empty")
553+
} else {
554+
require.Len(t, sinkDefault.AllLogs(), 1, "sinkDefault should have one plog.Logs")
555+
assert.NoError(t, plogtest.CompareLogs(*expectedDefault, sinkDefault.AllLogs()[0]), "sinkDefault has unexpected result")
556+
}
557+
})
558+
}
559+
}

connector/routingconnector/router.go

+13-19
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ func (r *router[C]) registerConsumers(defaultPipelineIDs []pipeline.ID) error {
8383
return err
8484
}
8585

86+
r.normalizeConditions()
87+
8688
// register pipelines for each route
8789
err = r.registerRouteConsumers()
8890
if err != nil {
@@ -109,11 +111,21 @@ func (r *router[C]) registerDefaultConsumer(pipelineIDs []pipeline.ID) error {
109111
return nil
110112
}
111113

114+
// convert conditions to statements
115+
func (r *router[C]) normalizeConditions() {
116+
for i := range r.table {
117+
item := &r.table[i]
118+
if item.Condition != "" {
119+
item.Statement = fmt.Sprintf("route() where %s", item.Condition)
120+
}
121+
}
122+
}
123+
112124
// registerRouteConsumers registers a consumer for the pipelines configured
113125
// for each route
114126
func (r *router[C]) registerRouteConsumers() error {
115127
for _, item := range r.table {
116-
statement, err := r.getStatementFrom(item)
128+
statement, err := r.parser.ParseStatement(item.Statement)
117129
if err != nil {
118130
return err
119131
}
@@ -144,24 +156,6 @@ func (r *router[C]) registerRouteConsumers() error {
144156
return nil
145157
}
146158

147-
// getStatementFrom builds a routing OTTL statement from the provided
148-
// routing table entry configuration. If the routing table entry configuration
149-
// does not contain a valid OTTL statement then nil is returned.
150-
func (r *router[C]) getStatementFrom(item RoutingTableItem) (*ottl.Statement[ottlresource.TransformContext], error) {
151-
var statement *ottl.Statement[ottlresource.TransformContext]
152-
if item.Condition != "" {
153-
item.Statement = fmt.Sprintf("route() where %s", item.Condition)
154-
}
155-
if item.Statement != "" {
156-
var err error
157-
statement, err = r.parser.ParseStatement(item.Statement)
158-
if err != nil {
159-
return statement, err
160-
}
161-
}
162-
return statement, nil
163-
}
164-
165159
func key(entry RoutingTableItem) string {
166160
return entry.Statement
167161
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
routing:
2+
default_pipelines:
3+
- logs/default
4+
table:
5+
- condition: attributes["resourceName"] != nil
6+
pipelines:
7+
- logs/0
8+
- condition: attributes["resourceName"] == "resourceY"
9+
pipelines:
10+
- logs/1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
resourceLogs:
2+
- resource:
3+
attributes:
4+
- key: resourceName
5+
value:
6+
stringValue: resourceA
7+
- key: resourceNameAgain
8+
value:
9+
stringValue: resourceA
10+
schemaUrl: https://opentelemetry.io/schemas/1.6.1
11+
scopeLogs:
12+
- attributes:
13+
- key: scopeName
14+
value:
15+
stringValue: scopeA
16+
- key: scopeNameAgain
17+
value:
18+
stringValue: scopeA
19+
logRecords:
20+
- attributes:
21+
- key: logName
22+
value:
23+
stringValue: logA
24+
- key: logNameAgain
25+
value:
26+
stringValue: logA
27+
body:
28+
stringValue: logA
29+
- attributes:
30+
- key: logName
31+
value:
32+
stringValue: logB
33+
- key: logNameAgain
34+
value:
35+
stringValue: logB
36+
body:
37+
stringValue: logB
38+
schemaUrl: https://opentelemetry.io/schemas/1.6.1
39+
scope:
40+
name: scopeA
41+
version: v0.1.0
42+
- attributes:
43+
- key: scopeName
44+
value:
45+
stringValue: scopeB
46+
- key: scopeNameAgain
47+
value:
48+
stringValue: scopeB
49+
logRecords:
50+
- attributes:
51+
- key: logName
52+
value:
53+
stringValue: logA
54+
- key: logNameAgain
55+
value:
56+
stringValue: logA
57+
body:
58+
stringValue: logA
59+
- attributes:
60+
- key: logName
61+
value:
62+
stringValue: logB
63+
- key: logNameAgain
64+
value:
65+
stringValue: logB
66+
body:
67+
stringValue: logB
68+
schemaUrl: https://opentelemetry.io/schemas/1.6.1
69+
scope:
70+
name: scopeB
71+
version: v0.1.0
72+
- resource:
73+
attributes:
74+
- key: resourceName
75+
value:
76+
stringValue: resourceB
77+
- key: resourceNameAgain
78+
value:
79+
stringValue: resourceB
80+
schemaUrl: https://opentelemetry.io/schemas/1.6.1
81+
scopeLogs:
82+
- attributes:
83+
- key: scopeName
84+
value:
85+
stringValue: scopeA
86+
- key: scopeNameAgain
87+
value:
88+
stringValue: scopeA
89+
logRecords:
90+
- attributes:
91+
- key: logName
92+
value:
93+
stringValue: logA
94+
- key: logNameAgain
95+
value:
96+
stringValue: logA
97+
body:
98+
stringValue: logA
99+
- attributes:
100+
- key: logName
101+
value:
102+
stringValue: logB
103+
- key: logNameAgain
104+
value:
105+
stringValue: logB
106+
body:
107+
stringValue: logB
108+
schemaUrl: https://opentelemetry.io/schemas/1.6.1
109+
scope:
110+
name: scopeA
111+
version: v0.1.0
112+
- attributes:
113+
- key: scopeName
114+
value:
115+
stringValue: scopeB
116+
- key: scopeNameAgain
117+
value:
118+
stringValue: scopeB
119+
logRecords:
120+
- attributes:
121+
- key: logName
122+
value:
123+
stringValue: logA
124+
- key: logNameAgain
125+
value:
126+
stringValue: logA
127+
body:
128+
stringValue: logA
129+
- attributes:
130+
- key: logName
131+
value:
132+
stringValue: logB
133+
- key: logNameAgain
134+
value:
135+
stringValue: logB
136+
body:
137+
stringValue: logB
138+
schemaUrl: https://opentelemetry.io/schemas/1.6.1
139+
scope:
140+
name: scopeB
141+
version: v0.1.0

0 commit comments

Comments
 (0)