Skip to content

Commit 5efae2b

Browse files
[Kafka output] removed regex validation to allow dynamic topic (#40415)
* removed regex validation to allow dynamic topic * added changelog * added back validation to only allowed one field name as dynamic topic * updated changelog * fix lint * resolve dynamic topic for managed agent * removed regex validation * updated changelog * Update CHANGELOG.next.asciidoc Co-authored-by: Tiago Queiroz <[email protected]> --------- Co-authored-by: Tiago Queiroz <[email protected]>
1 parent c4c402d commit 5efae2b

File tree

5 files changed

+7
-42
lines changed

5 files changed

+7
-42
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
202202
- elasticsearch output now supports `idle_connection_timeout`. {issue}35616[35615] {pull}36843[36843]
203203
- Enable early event encoding in the Elasticsearch output, improving cpu and memory use {pull}38572[38572]
204204
- The environment variable `BEATS_ADD_CLOUD_METADATA_PROVIDERS` overrides configured/default `add_cloud_metadata` providers {pull}38669[38669]
205+
- When running under Elastic-Agent Kafka output allows dynamic topic in `topic` field {pull}40415[40415]
205206

206207
*Auditbeat*
207208

libbeat/outputs/kafka/config.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"fmt"
2323
"math"
2424
"math/rand"
25-
"regexp"
2625
"strings"
2726
"time"
2827

@@ -109,11 +108,6 @@ var compressionModes = map[string]sarama.CompressionCodec{
109108
"snappy": sarama.CompressionSnappy,
110109
}
111110

112-
// validTopicRegExp is used to validate the topic contains only valid characters
113-
// when running under Elastic-Agent. The regexp is taken from:
114-
// https://github.com/apache/kafka/blob/a126e3a622f2b7142f3543b9dbee54b6412ba9d8/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L33
115-
var validTopicRegExp = regexp.MustCompile("^[a-zA-Z0-9._-]+$")
116-
117111
func defaultConfig() kafkaConfig {
118112
return kafkaConfig{
119113
Hosts: nil,
@@ -193,10 +187,6 @@ func (c *kafkaConfig) Validate() error {
193187
if len(c.Topics) != 0 {
194188
return errors.New("'topics' is not supported when running under Elastic-Agent")
195189
}
196-
197-
if !validTopicRegExp.MatchString(c.Topic) {
198-
return fmt.Errorf("topic '%s' is invalid, it must match '[a-zA-Z0-9._-]'", c.Topic)
199-
}
200190
}
201191

202192
return nil

libbeat/outputs/kafka/config_test.go

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -142,25 +142,10 @@ func TestConfigUnderElasticAgent(t *testing.T) {
142142
expectError: true,
143143
},
144144
{
145-
name: "topic cannot contain invalid characters",
145+
name: "valid topic with dynamic topic selection",
146146
cfg: mapstr.M{
147-
"topic": "foo bar",
147+
"topic": "%{[event.field]}",
148148
},
149-
expectError: true,
150-
},
151-
{
152-
name: "topic with invalid characters",
153-
cfg: mapstr.M{
154-
"topic": "foo + bar",
155-
},
156-
expectError: true,
157-
},
158-
{
159-
name: "topic with invalid characters from dynamic topic selection",
160-
cfg: mapstr.M{
161-
"topic": "%{event.field}",
162-
},
163-
expectError: true,
164149
},
165150

166151
// The default config does not set `topic` not `topics`.

libbeat/outputs/kafka/kafka.go

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"github.com/Shopify/sarama"
2424

2525
"github.com/elastic/beats/v7/libbeat/beat"
26-
"github.com/elastic/beats/v7/libbeat/management"
2726
"github.com/elastic/beats/v7/libbeat/outputs"
2827
"github.com/elastic/beats/v7/libbeat/outputs/codec"
2928
"github.com/elastic/beats/v7/libbeat/outputs/outil"
@@ -91,21 +90,11 @@ func makeKafka(
9190
// running under Elastic-Agent based on cfg.
9291
//
9392
// When running standalone the topic selector works as expected and documented.
94-
// When running under Elastic-Agent, dynamic topic selection is not supported,
95-
// so a constant selector using the `topic` value is returned.
93+
// When running under Elastic-Agent, dynamic topic selection is also supported
9694
func buildTopicSelector(cfg *config.C) (outil.Selector, error) {
97-
topicCfg := struct {
98-
Topic string `config:"topic" yaml:"topic"`
99-
}{}
10095

101-
if err := cfg.Unpack(&topicCfg); err != nil {
102-
return outil.Selector{}, fmt.Errorf("cannot unpack Kafka config to read the topic: %w", err)
103-
}
104-
105-
if management.UnderAgent() {
106-
exprSelector := outil.ConstSelectorExpr(topicCfg.Topic, outil.SelectorKeepCase)
107-
selector := outil.MakeSelector(exprSelector)
108-
return selector, nil
96+
if cfg == nil {
97+
return outil.Selector{}, fmt.Errorf("Kafka config cannot be nil")
10998
}
11099

111100
return outil.BuildSelectorFromConfig(cfg, outil.Settings{

libbeat/outputs/kafka/kafka_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func TestBuildTopicSelector(t *testing.T) {
4242
{
4343
name: "dynamic topic under agent",
4444
topic: "%{[foo]}",
45-
expected: "%{[foo]}",
45+
expected: "bar",
4646
underAgent: true,
4747
},
4848
{

0 commit comments

Comments
 (0)