Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/feat_43782.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "enhancement"

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: receiver/kafka

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for exclude topics when consuming topics with a regex pattern

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [43782]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
11 changes: 11 additions & 0 deletions internal/kafka/franz_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func NewFranzSyncProducer(ctx context.Context, clientCfg configkafka.ClientConfi
func NewFranzConsumerGroup(ctx context.Context, clientCfg configkafka.ClientConfig,
consumerCfg configkafka.ConsumerConfig,
topics []string,
excludeTopics []string,
logger *zap.Logger,
opts ...kgo.Opt,
) (*kgo.Client, error) {
Expand All @@ -108,15 +109,25 @@ func NewFranzConsumerGroup(ctx context.Context, clientCfg configkafka.ClientConf
return nil, err
}

// Check if any topic uses regex pattern
isRegex := false
for _, t := range topics {
// Similar to librdkafka, if the topic starts with `^`, it is a regex topic:
// https://github.com/confluentinc/librdkafka/blob/b871fdabab84b2ea1be3866a2ded4def7e31b006/src/rdkafka.h#L3899-L3938
if strings.HasPrefix(t, "^") {
isRegex = true
opts = append(opts, kgo.ConsumeRegex())
break
}
}

// Add exclude topics only when regex consumption is enabled
if len(excludeTopics) > 0 && isRegex {
opts = append(opts, kgo.ConsumeExcludeTopics(excludeTopics...))
} else {
logger.Warn("exclude_topic is configured but will be ignored because topic pattern does not use regex (must start with '^')")
}

// Configure session timeout
if consumerCfg.SessionTimeout > 0 {
opts = append(opts, kgo.SessionTimeout(consumerCfg.SessionTimeout))
Expand Down
2 changes: 1 addition & 1 deletion internal/kafka/franz_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func mustNewFranzConsumerGroup(t *testing.T,
minAge := 10 * time.Millisecond
opts = append(opts, kgo.MetadataMinAge(minAge), kgo.MetadataMaxAge(minAge*2))
client, err := NewFranzConsumerGroup(t.Context(), clientConfig, consumerConfig,
topics, zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel)), opts...,
topics, nil, zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel)), opts...,
)
require.NoError(t, err)
t.Cleanup(client.Close)
Expand Down
30 changes: 30 additions & 0 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,19 @@ The following settings can be optionally configured:
- `logs`
- `topic` (default = otlp\_logs): The name of the Kafka topic from which to consume logs.
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
- `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `metrics`
- `topic` (default = otlp\_metrics): The name of the Kafka topic from which to consume metrics.
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
- `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `traces`
- `topic` (default = otlp\_spans): The name of the Kafka topic from which to consume traces.
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
- `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `profiles`
- `topic` (default = otlp\_profiles): The name of the Kafka topic from which to consume profiles.
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
- `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `topic` (Deprecated [v0.124.0]: use `logs::topic`, `traces::topic`, or `metrics::topic`).
If this is set, it will take precedence over the default value for those fields.
- `encoding` (Deprecated [v0.124.0]: use `logs::encoding`, `traces::encoding`, or `metrics::encoding`).
Expand Down Expand Up @@ -217,3 +221,29 @@ attributes with the prefix "kafka.header.", i.e.
}
...
```

#### Regex topic patterns with exclusions

When using the `franz-go` client, you can consume from multiple topics using regex patterns
and exclude specific topics from consumption. This is useful when you want to consume from
a dynamic set of topics but need to filter out certain ones.

**Note:** Both `topic` and `exclude_topic` must use regex patterns (prefix with `^`) for
exclusion to work. This feature is only available with the franz-go client.

```yaml
receivers:
kafka:
logs:
topic: "^logs-.*" # Consume from all topics matching logs-*
exclude_topic: "^logs-(test|dev)$" # Exclude logs-test and logs-dev
metrics:
topic: "^metrics-.*"
exclude_topic: "^metrics-internal-.*$"
```

In the example above:
- For logs: the receiver will consume from topics like `logs-prod`, `logs-staging`, `logs-app`
but will exclude `logs-test` and `logs-dev`
- For metrics: the receiver will consume from topics like `metrics-app`, `metrics-infra`
but will exclude any topics starting with `metrics-internal-`
3 changes: 3 additions & 0 deletions receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ type TopicEncodingConfig struct {
//
// Defaults to "otlp_proto".
Encoding string `mapstructure:"encoding"`

// Optional exclude topic option, used only in regex mode.
ExcludeTopic string `mapstructure:"exclude_topic"`
}

type MessageMarking struct {
Expand Down
29 changes: 29 additions & 0 deletions receiver/kafkareceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,35 @@ func TestLoadConfig(t *testing.T) {
},
},
},
{
id: component.NewIDWithName(metadata.Type, "regex_topic_with_exclusion"),
expected: &Config{
ClientConfig: configkafka.NewDefaultClientConfig(),
ConsumerConfig: configkafka.NewDefaultConsumerConfig(),
Logs: TopicEncodingConfig{
Topic: "^logs-.*",
ExcludeTopic: "^logs-(test|dev)$",
Encoding: "otlp_proto",
},
Metrics: TopicEncodingConfig{
Topic: "^metrics-.*",
ExcludeTopic: "^metrics-internal-.*$",
Encoding: "otlp_proto",
},
Traces: TopicEncodingConfig{
Topic: "^traces-.*",
ExcludeTopic: "^traces-debug-.*$",
Encoding: "otlp_proto",
},
Profiles: TopicEncodingConfig{
Topic: "otlp_profiles",
Encoding: "otlp_proto",
},
ErrorBackOff: configretry.BackOffConfig{
Enabled: false,
},
},
},
}

for _, tt := range tests {
Expand Down
4 changes: 4 additions & 0 deletions receiver/kafkareceiver/consumer_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type topicPartition struct {
type franzConsumer struct {
config *Config
topics []string
excludeTopics []string
settings receiver.Settings
telemetryBuilder *metadata.TelemetryBuilder
newConsumeFn newConsumeMessageFunc
Expand Down Expand Up @@ -123,6 +124,7 @@ func newFranzKafkaConsumer(
config *Config,
set receiver.Settings,
topics []string,
excludeTopics []string,
newConsumeFn newConsumeMessageFunc,
) (*franzConsumer, error) {
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
Expand All @@ -133,6 +135,7 @@ func newFranzKafkaConsumer(
return &franzConsumer{
config: config,
topics: topics,
excludeTopics: excludeTopics,
newConsumeFn: newConsumeFn,
settings: set,
telemetryBuilder: telemetryBuilder,
Expand Down Expand Up @@ -212,6 +215,7 @@ func (c *franzConsumer) Start(ctx context.Context, host component.Host) error {
c.config.ClientConfig,
c.config.ConsumerConfig,
c.topics,
c.excludeTopics,
c.settings.Logger,
opts...,
)
Expand Down
145 changes: 140 additions & 5 deletions receiver/kafkareceiver/consumer_franz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/collector/pdata/testdata"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap/zapcore"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/configkafka"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadata"
Expand Down Expand Up @@ -199,7 +200,7 @@ func TestConsumerShutdownConsuming(t *testing.T) {
test := func(tb testing.TB, expected int64) {
ctx := t.Context()
consumeFn, consuming := newConsumeFunc()
consumer, e := newFranzKafkaConsumer(cfg, settings, []string{topic}, consumeFn)
consumer, e := newFranzKafkaConsumer(cfg, settings, []string{topic}, nil, consumeFn)
require.NoError(tb, e)
require.NoError(tb, consumer.Start(ctx, componenttest.NewNopHost()))
require.NoError(tb, kafkaClient.ProduceSync(ctx, rs...).FirstErr())
Expand Down Expand Up @@ -239,7 +240,12 @@ func TestConsumerShutdownNotStarted(t *testing.T) {

_, cfg := mustNewFakeCluster(t, kfake.SeedTopics(1, "test"))
settings, _, _ := mustNewSettings(t)
c, err := newFranzKafkaConsumer(cfg, settings, []string{"test"}, nil)
consumeFn := func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) {
return func(_ context.Context, _ kafkaMessage, _ attribute.Set) error {
return nil
}, nil
}
c, err := newFranzKafkaConsumer(cfg, settings, []string{"test"}, nil, consumeFn)
require.NoError(t, err)

for range 2 {
Expand Down Expand Up @@ -285,7 +291,7 @@ func TestRaceLostVsConsume(t *testing.T) {
}, nil
}

c, err := newFranzKafkaConsumer(cfg, settings, []string{topic}, consumeFn)
c, err := newFranzKafkaConsumer(cfg, settings, []string{topic}, nil, consumeFn)
require.NoError(t, err)
require.NoError(t, c.Start(t.Context(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -318,7 +324,7 @@ func TestLost(t *testing.T) {
return nil
}, nil
}
c, err := newFranzKafkaConsumer(cfg, settings, []string{"test"}, consumeFn)
c, err := newFranzKafkaConsumer(cfg, settings, []string{"test"}, nil, consumeFn)
require.NoError(t, err)
require.NoError(t, c.Start(t.Context(), componenttest.NewNopHost()))
defer func() { require.NoError(t, c.Shutdown(t.Context())) }()
Expand Down Expand Up @@ -361,7 +367,7 @@ func TestFranzConsumer_UseLeaderEpoch_Smoke(t *testing.T) {
{Topic: topic, Value: data},
}

c, err := newFranzKafkaConsumer(cfg, settings, []string{topic}, consumeFn)
c, err := newFranzKafkaConsumer(cfg, settings, []string{topic}, nil, consumeFn)
require.NoError(t, err)
require.NoError(t, c.Start(t.Context(), componenttest.NewNopHost()))
require.NoError(t, kafkaClient.ProduceSync(t.Context(), rs...).FirstErr())
Expand Down Expand Up @@ -394,3 +400,132 @@ func TestMakeUseLeaderEpochAdjuster_ClearsEpoch(t *testing.T) {
require.Equal(t, kgo.NewOffset().At(42).WithEpoch(-1), out["t"][0])
require.Equal(t, kgo.NewOffset().At(100).WithEpoch(-1), out["t"][1])
}

// TestExcludeTopicWithRegex tests that exclude_topic works correctly with regex topic patterns.
// It creates three topics (logs-a, logs-b, logs-c) matching the pattern ^logs-.*
// and excludes logs-a and logs-b using ^logs-(a|b)$, expecting only logs-c to be consumed.
func TestExcludeTopicWithRegex(t *testing.T) {
setFranzGo(t, true)

// Create three topics: logs-a, logs-b, logs-c
kafkaClient, cfg := mustNewFakeCluster(t,
kfake.SeedTopics(1, "logs-a"),
kfake.SeedTopics(1, "logs-b"),
kfake.SeedTopics(1, "logs-c"),
)

cfg.ConsumerConfig = configkafka.ConsumerConfig{
GroupID: t.Name(),
InitialOffset: "earliest",
AutoCommit: configkafka.AutoCommitConfig{Enable: true, Interval: 100 * time.Millisecond},
}

// Prepare test data
traces := testdata.GenerateTraces(5)
data, err := (&ptrace.ProtoMarshaler{}).MarshalTraces(traces)
require.NoError(t, err)

// Produce records to all three topics
rs := []*kgo.Record{
{Topic: "logs-a", Value: data},
{Topic: "logs-b", Value: data},
{Topic: "logs-c", Value: data},
}
require.NoError(t, kafkaClient.ProduceSync(t.Context(), rs...).FirstErr())

// Track which topics were consumed
consumedTopics := make(map[string]int)
var mu sync.Mutex
var called atomic.Int64

settings, _, _ := mustNewSettings(t)
consumeFn := func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) {
return func(_ context.Context, msg kafkaMessage, _ attribute.Set) error {
mu.Lock()
consumedTopics[msg.topic()]++
mu.Unlock()
called.Add(1)
return nil
}, nil
}

// Create consumer with regex topic pattern and exclude pattern
c, err := newFranzKafkaConsumer(
cfg,
settings,
[]string{"^logs-.*"}, // Match all logs-* topics
[]string{"^logs-(a|b)$"}, // Exclude logs-a and logs-b
consumeFn,
)
require.NoError(t, err)
require.NoError(t, c.Start(t.Context(), componenttest.NewNopHost()))
defer func() { require.NoError(t, c.Shutdown(t.Context())) }()

// Wait for consumption (should only consume 1 record from logs-c)
deadline := time.After(5 * time.Second)
for called.Load() < 1 {
select {
case <-deadline:
t.Fatalf("expected to consume 1 record, got %d", called.Load())
case <-time.After(50 * time.Millisecond):
}
}

// Give it a bit more time to ensure no other messages are consumed
time.Sleep(500 * time.Millisecond)

// Verify results
mu.Lock()
defer mu.Unlock()

assert.Equal(t, int64(1), called.Load(), "should consume exactly 1 record")
assert.Equal(t, 0, consumedTopics["logs-a"], "logs-a should be excluded")
assert.Equal(t, 0, consumedTopics["logs-b"], "logs-b should be excluded")
assert.Equal(t, 1, consumedTopics["logs-c"], "logs-c should be consumed")
}

// TestExcludeTopicWithoutRegex tests that a warning is logged when exclude_topic
// is configured but the topic pattern doesn't use regex.
func TestExcludeTopicWithoutRegex(t *testing.T) {
setFranzGo(t, true)

_, cfg := mustNewFakeCluster(t, kfake.SeedTopics(1, "test-topic"))
cfg.ConsumerConfig = configkafka.ConsumerConfig{
GroupID: t.Name(),
InitialOffset: "earliest",
AutoCommit: configkafka.AutoCommitConfig{Enable: true, Interval: 100 * time.Millisecond},
}

settings, _, observedLogs := mustNewSettings(t)
consumeFn := func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) {
return func(_ context.Context, _ kafkaMessage, _ attribute.Set) error {
return nil
}, nil
}

// Create consumer with non-regex topic and exclude_topic
c, err := newFranzKafkaConsumer(
cfg,
settings,
[]string{"test-topic"}, // Not a regex pattern
[]string{"^exclude-pattern$"}, // Exclude pattern (won't be used)
consumeFn,
)
require.NoError(t, err)
require.NoError(t, c.Start(t.Context(), componenttest.NewNopHost()))
defer func() { require.NoError(t, c.Shutdown(t.Context())) }()

// Give it time to start and log the warning
time.Sleep(200 * time.Millisecond)

// Verify warning was logged
var foundWarning bool
for _, log := range observedLogs.All() {
if log.Level == zapcore.WarnLevel &&
log.Message == "exclude_topic is configured but will be ignored because topic pattern does not use regex (must start with '^')" {
foundWarning = true
break
}
}
assert.True(t, foundWarning, "expected warning about exclude_topic being ignored without regex")
}
Loading