diff --git a/.chloggen/feat_43782.yaml b/.chloggen/feat_43782.yaml new file mode 100644 index 0000000000000..59bcd5956d017 --- /dev/null +++ b/.chloggen/feat_43782.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/kafka + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for exclude topics when consuming topics with a regex pattern + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [43782] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/internal/kafka/franz_client.go b/internal/kafka/franz_client.go index 7eaa14a4eba11..46f50201b5b86 100644 --- a/internal/kafka/franz_client.go +++ b/internal/kafka/franz_client.go @@ -97,6 +97,7 @@ func NewFranzSyncProducer(ctx context.Context, clientCfg configkafka.ClientConfi func NewFranzConsumerGroup(ctx context.Context, clientCfg configkafka.ClientConfig, consumerCfg configkafka.ConsumerConfig, topics []string, + excludeTopics []string, logger *zap.Logger, opts ...kgo.Opt, ) (*kgo.Client, error) { @@ -108,15 +109,25 @@ func NewFranzConsumerGroup(ctx context.Context, clientCfg configkafka.ClientConf return nil, err } + // Check if any topic uses regex pattern + isRegex := false for _, t := range topics { // Similar to librdkafka, if the topic starts with `^`, it is a regex topic: // https://github.com/confluentinc/librdkafka/blob/b871fdabab84b2ea1be3866a2ded4def7e31b006/src/rdkafka.h#L3899-L3938 if strings.HasPrefix(t, "^") { + isRegex = true opts = append(opts, kgo.ConsumeRegex()) break } } + // Add exclude topics only when regex consumption is enabled + if len(excludeTopics) > 0 && isRegex { + opts = append(opts, kgo.ConsumeExcludeTopics(excludeTopics...)) + } else { + logger.Warn("exclude_topic is configured but will be ignored because topic pattern does not use regex (must start with '^')") + } + // Configure session timeout if consumerCfg.SessionTimeout > 0 { opts = append(opts, kgo.SessionTimeout(consumerCfg.SessionTimeout)) diff --git a/internal/kafka/franz_client_test.go b/internal/kafka/franz_client_test.go index 98ab1d41e721c..09253e844c162 100644 --- a/internal/kafka/franz_client_test.go +++ b/internal/kafka/franz_client_test.go @@ -484,7 +484,7 @@ func mustNewFranzConsumerGroup(t *testing.T, minAge := 10 * time.Millisecond opts = append(opts, kgo.MetadataMinAge(minAge), kgo.MetadataMaxAge(minAge*2)) client, err := NewFranzConsumerGroup(t.Context(), clientConfig, consumerConfig, - topics, zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel)), opts..., + topics, nil, zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel)), opts..., ) require.NoError(t, err) t.Cleanup(client.Close) diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index e9b9760ed180c..aba804a278a27 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -44,15 +44,19 @@ The following settings can be optionally configured: - `logs` - `topic` (default = otlp\_logs): The name of the Kafka topic from which to consume logs. - `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings). + - `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. - `metrics` - `topic` (default = otlp\_metrics): The name of the Kafka topic from which to consume metrics. - `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings). + - `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. - `traces` - `topic` (default = otlp\_spans): The name of the Kafka topic from which to consume traces. - `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings). + - `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. - `profiles` - `topic` (default = otlp\_profiles): The name of the Kafka topic from which to consume profiles. - `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings). + - `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. - `topic` (Deprecated [v0.124.0]: use `logs::topic`, `traces::topic`, or `metrics::topic`). If this is set, it will take precedence over the default value for those fields. - `encoding` (Deprecated [v0.124.0]: use `logs::encoding`, `traces::encoding`, or `metrics::encoding`). @@ -217,3 +221,29 @@ attributes with the prefix "kafka.header.", i.e. } ... ``` + +#### Regex topic patterns with exclusions + +When using the `franz-go` client, you can consume from multiple topics using regex patterns +and exclude specific topics from consumption. This is useful when you want to consume from +a dynamic set of topics but need to filter out certain ones. + +**Note:** Both `topic` and `exclude_topic` must use regex patterns (prefix with `^`) for +exclusion to work. This feature is only available with the franz-go client. + +```yaml +receivers: + kafka: + logs: + topic: "^logs-.*" # Consume from all topics matching logs-* + exclude_topic: "^logs-(test|dev)$" # Exclude logs-test and logs-dev + metrics: + topic: "^metrics-.*" + exclude_topic: "^metrics-internal-.*$" +``` + +In the example above: +- For logs: the receiver will consume from topics like `logs-prod`, `logs-staging`, `logs-app` + but will exclude `logs-test` and `logs-dev` +- For metrics: the receiver will consume from topics like `metrics-app`, `metrics-infra` + but will exclude any topics starting with `metrics-internal-` diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index df9a7bdaf78fb..6ce5890f71f35 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -135,6 +135,9 @@ type TopicEncodingConfig struct { // // Defaults to "otlp_proto". Encoding string `mapstructure:"encoding"` + + // Optional exclude topic option, used only in regex mode. + ExcludeTopic string `mapstructure:"exclude_topic"` } type MessageMarking struct { diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index 9629132c4a9e9..9c8089121ceb9 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -307,6 +307,35 @@ func TestLoadConfig(t *testing.T) { }, }, }, + { + id: component.NewIDWithName(metadata.Type, "regex_topic_with_exclusion"), + expected: &Config{ + ClientConfig: configkafka.NewDefaultClientConfig(), + ConsumerConfig: configkafka.NewDefaultConsumerConfig(), + Logs: TopicEncodingConfig{ + Topic: "^logs-.*", + ExcludeTopic: "^logs-(test|dev)$", + Encoding: "otlp_proto", + }, + Metrics: TopicEncodingConfig{ + Topic: "^metrics-.*", + ExcludeTopic: "^metrics-internal-.*$", + Encoding: "otlp_proto", + }, + Traces: TopicEncodingConfig{ + Topic: "^traces-.*", + ExcludeTopic: "^traces-debug-.*$", + Encoding: "otlp_proto", + }, + Profiles: TopicEncodingConfig{ + Topic: "otlp_profiles", + Encoding: "otlp_proto", + }, + ErrorBackOff: configretry.BackOffConfig{ + Enabled: false, + }, + }, + }, } for _, tt := range tests { diff --git a/receiver/kafkareceiver/consumer_franz.go b/receiver/kafkareceiver/consumer_franz.go index 49f60e4ef9781..81851136538ea 100644 --- a/receiver/kafkareceiver/consumer_franz.go +++ b/receiver/kafkareceiver/consumer_franz.go @@ -51,6 +51,7 @@ type topicPartition struct { type franzConsumer struct { config *Config topics []string + excludeTopics []string settings receiver.Settings telemetryBuilder *metadata.TelemetryBuilder newConsumeFn newConsumeMessageFunc @@ -123,6 +124,7 @@ func newFranzKafkaConsumer( config *Config, set receiver.Settings, topics []string, + excludeTopics []string, newConsumeFn newConsumeMessageFunc, ) (*franzConsumer, error) { telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) @@ -133,6 +135,7 @@ func newFranzKafkaConsumer( return &franzConsumer{ config: config, topics: topics, + excludeTopics: excludeTopics, newConsumeFn: newConsumeFn, settings: set, telemetryBuilder: telemetryBuilder, @@ -212,6 +215,7 @@ func (c *franzConsumer) Start(ctx context.Context, host component.Host) error { c.config.ClientConfig, c.config.ConsumerConfig, c.topics, + c.excludeTopics, c.settings.Logger, opts..., ) diff --git a/receiver/kafkareceiver/consumer_franz_test.go b/receiver/kafkareceiver/consumer_franz_test.go index 7870ba3f3351a..adc68c776e3db 100644 --- a/receiver/kafkareceiver/consumer_franz_test.go +++ b/receiver/kafkareceiver/consumer_franz_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/pdata/testdata" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/otel/attribute" + "go.uber.org/zap/zapcore" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/configkafka" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadata" @@ -199,7 +200,7 @@ func TestConsumerShutdownConsuming(t *testing.T) { test := func(tb testing.TB, expected int64) { ctx := t.Context() consumeFn, consuming := newConsumeFunc() - consumer, e := newFranzKafkaConsumer(cfg, settings, []string{topic}, consumeFn) + consumer, e := newFranzKafkaConsumer(cfg, settings, []string{topic}, nil, consumeFn) require.NoError(tb, e) require.NoError(tb, consumer.Start(ctx, componenttest.NewNopHost())) require.NoError(tb, kafkaClient.ProduceSync(ctx, rs...).FirstErr()) @@ -239,7 +240,12 @@ func TestConsumerShutdownNotStarted(t *testing.T) { _, cfg := mustNewFakeCluster(t, kfake.SeedTopics(1, "test")) settings, _, _ := mustNewSettings(t) - c, err := newFranzKafkaConsumer(cfg, settings, []string{"test"}, nil) + consumeFn := func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) { + return func(_ context.Context, _ kafkaMessage, _ attribute.Set) error { + return nil + }, nil + } + c, err := newFranzKafkaConsumer(cfg, settings, []string{"test"}, nil, consumeFn) require.NoError(t, err) for range 2 { @@ -285,7 +291,7 @@ func TestRaceLostVsConsume(t *testing.T) { }, nil } - c, err := newFranzKafkaConsumer(cfg, settings, []string{topic}, consumeFn) + c, err := newFranzKafkaConsumer(cfg, settings, []string{topic}, nil, consumeFn) require.NoError(t, err) require.NoError(t, c.Start(t.Context(), componenttest.NewNopHost())) @@ -318,7 +324,7 @@ func TestLost(t *testing.T) { return nil }, nil } - c, err := newFranzKafkaConsumer(cfg, settings, []string{"test"}, consumeFn) + c, err := newFranzKafkaConsumer(cfg, settings, []string{"test"}, nil, consumeFn) require.NoError(t, err) require.NoError(t, c.Start(t.Context(), componenttest.NewNopHost())) defer func() { require.NoError(t, c.Shutdown(t.Context())) }() @@ -361,7 +367,7 @@ func TestFranzConsumer_UseLeaderEpoch_Smoke(t *testing.T) { {Topic: topic, Value: data}, } - c, err := newFranzKafkaConsumer(cfg, settings, []string{topic}, consumeFn) + c, err := newFranzKafkaConsumer(cfg, settings, []string{topic}, nil, consumeFn) require.NoError(t, err) require.NoError(t, c.Start(t.Context(), componenttest.NewNopHost())) require.NoError(t, kafkaClient.ProduceSync(t.Context(), rs...).FirstErr()) @@ -394,3 +400,132 @@ func TestMakeUseLeaderEpochAdjuster_ClearsEpoch(t *testing.T) { require.Equal(t, kgo.NewOffset().At(42).WithEpoch(-1), out["t"][0]) require.Equal(t, kgo.NewOffset().At(100).WithEpoch(-1), out["t"][1]) } + +// TestExcludeTopicWithRegex tests that exclude_topic works correctly with regex topic patterns. +// It creates three topics (logs-a, logs-b, logs-c) matching the pattern ^logs-.* +// and excludes logs-a and logs-b using ^logs-(a|b)$, expecting only logs-c to be consumed. +func TestExcludeTopicWithRegex(t *testing.T) { + setFranzGo(t, true) + + // Create three topics: logs-a, logs-b, logs-c + kafkaClient, cfg := mustNewFakeCluster(t, + kfake.SeedTopics(1, "logs-a"), + kfake.SeedTopics(1, "logs-b"), + kfake.SeedTopics(1, "logs-c"), + ) + + cfg.ConsumerConfig = configkafka.ConsumerConfig{ + GroupID: t.Name(), + InitialOffset: "earliest", + AutoCommit: configkafka.AutoCommitConfig{Enable: true, Interval: 100 * time.Millisecond}, + } + + // Prepare test data + traces := testdata.GenerateTraces(5) + data, err := (&ptrace.ProtoMarshaler{}).MarshalTraces(traces) + require.NoError(t, err) + + // Produce records to all three topics + rs := []*kgo.Record{ + {Topic: "logs-a", Value: data}, + {Topic: "logs-b", Value: data}, + {Topic: "logs-c", Value: data}, + } + require.NoError(t, kafkaClient.ProduceSync(t.Context(), rs...).FirstErr()) + + // Track which topics were consumed + consumedTopics := make(map[string]int) + var mu sync.Mutex + var called atomic.Int64 + + settings, _, _ := mustNewSettings(t) + consumeFn := func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) { + return func(_ context.Context, msg kafkaMessage, _ attribute.Set) error { + mu.Lock() + consumedTopics[msg.topic()]++ + mu.Unlock() + called.Add(1) + return nil + }, nil + } + + // Create consumer with regex topic pattern and exclude pattern + c, err := newFranzKafkaConsumer( + cfg, + settings, + []string{"^logs-.*"}, // Match all logs-* topics + []string{"^logs-(a|b)$"}, // Exclude logs-a and logs-b + consumeFn, + ) + require.NoError(t, err) + require.NoError(t, c.Start(t.Context(), componenttest.NewNopHost())) + defer func() { require.NoError(t, c.Shutdown(t.Context())) }() + + // Wait for consumption (should only consume 1 record from logs-c) + deadline := time.After(5 * time.Second) + for called.Load() < 1 { + select { + case <-deadline: + t.Fatalf("expected to consume 1 record, got %d", called.Load()) + case <-time.After(50 * time.Millisecond): + } + } + + // Give it a bit more time to ensure no other messages are consumed + time.Sleep(500 * time.Millisecond) + + // Verify results + mu.Lock() + defer mu.Unlock() + + assert.Equal(t, int64(1), called.Load(), "should consume exactly 1 record") + assert.Equal(t, 0, consumedTopics["logs-a"], "logs-a should be excluded") + assert.Equal(t, 0, consumedTopics["logs-b"], "logs-b should be excluded") + assert.Equal(t, 1, consumedTopics["logs-c"], "logs-c should be consumed") +} + +// TestExcludeTopicWithoutRegex tests that a warning is logged when exclude_topic +// is configured but the topic pattern doesn't use regex. +func TestExcludeTopicWithoutRegex(t *testing.T) { + setFranzGo(t, true) + + _, cfg := mustNewFakeCluster(t, kfake.SeedTopics(1, "test-topic")) + cfg.ConsumerConfig = configkafka.ConsumerConfig{ + GroupID: t.Name(), + InitialOffset: "earliest", + AutoCommit: configkafka.AutoCommitConfig{Enable: true, Interval: 100 * time.Millisecond}, + } + + settings, _, observedLogs := mustNewSettings(t) + consumeFn := func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) { + return func(_ context.Context, _ kafkaMessage, _ attribute.Set) error { + return nil + }, nil + } + + // Create consumer with non-regex topic and exclude_topic + c, err := newFranzKafkaConsumer( + cfg, + settings, + []string{"test-topic"}, // Not a regex pattern + []string{"^exclude-pattern$"}, // Exclude pattern (won't be used) + consumeFn, + ) + require.NoError(t, err) + require.NoError(t, c.Start(t.Context(), componenttest.NewNopHost())) + defer func() { require.NoError(t, c.Shutdown(t.Context())) }() + + // Give it time to start and log the warning + time.Sleep(200 * time.Millisecond) + + // Verify warning was logged + var foundWarning bool + for _, log := range observedLogs.All() { + if log.Level == zapcore.WarnLevel && + log.Message == "exclude_topic is configured but will be ignored because topic pattern does not use regex (must start with '^')" { + foundWarning = true + break + } + } + assert.True(t, foundWarning, "expected warning about exclude_topic being ignored without regex") +} diff --git a/receiver/kafkareceiver/consumer_sarama.go b/receiver/kafkareceiver/consumer_sarama.go index 6087b283cedd7..3a908e772310c 100644 --- a/receiver/kafkareceiver/consumer_sarama.go +++ b/receiver/kafkareceiver/consumer_sarama.go @@ -29,6 +29,7 @@ func newSaramaConsumer( config *Config, set receiver.Settings, topics []string, + excludeTopics []string, newConsumeFn newConsumeMessageFunc, ) (*saramaConsumer, error) { telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) @@ -36,6 +37,11 @@ func newSaramaConsumer( return nil, err } + // Sarama doesn't support exclude topics, log a warning if configured + if len(excludeTopics) > 0 { + set.Logger.Warn("exclude_topic is configured but is not supported when using Sarama consumer (only supported with franz-go)") + } + return &saramaConsumer{ config: config, topics: topics, diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index f9b1b751d091f..7a315d79d5a6b 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -87,7 +87,7 @@ func newLogsReceiver(config *Config, set receiver.Settings, nextConsumer consume ) }, nil } - return newReceiver(config, set, []string{config.Logs.Topic}, newConsumeMessageFunc) + return newReceiver(config, set, []string{config.Logs.Topic}, []string{config.Logs.ExcludeTopic}, newConsumeMessageFunc) } func newMetricsReceiver(config *Config, set receiver.Settings, nextConsumer consumer.Metrics) (receiver.Metrics, error) { @@ -111,7 +111,7 @@ func newMetricsReceiver(config *Config, set receiver.Settings, nextConsumer cons ) }, nil } - return newReceiver(config, set, []string{config.Metrics.Topic}, newConsumeMessageFunc) + return newReceiver(config, set, []string{config.Metrics.Topic}, []string{config.Metrics.ExcludeTopic}, newConsumeMessageFunc) } func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consumer.Traces) (receiver.Traces, error) { @@ -135,7 +135,7 @@ func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consu ) }, nil } - return newReceiver(config, set, []string{config.Traces.Topic}, consumeFn) + return newReceiver(config, set, []string{config.Traces.Topic}, []string{config.Traces.ExcludeTopic}, consumeFn) } func newProfilesReceiver(config *Config, set receiver.Settings, nextConsumer xconsumer.Profiles) (xreceiver.Profiles, error) { @@ -159,22 +159,23 @@ func newProfilesReceiver(config *Config, set receiver.Settings, nextConsumer xco ) }, nil } - return newReceiver(config, set, []string{config.Profiles.Topic}, consumeFn) + return newReceiver(config, set, []string{config.Profiles.Topic}, []string{config.Profiles.ExcludeTopic}, consumeFn) } func newReceiver( config *Config, set receiver.Settings, topics []string, + excludeTopics []string, consumeFn func(host component.Host, obsrecv *receiverhelper.ObsReport, telBldr *metadata.TelemetryBuilder, ) (consumeMessageFunc, error), ) (component.Component, error) { if franzGoConsumerFeatureGate.IsEnabled() { - return newFranzKafkaConsumer(config, set, topics, consumeFn) + return newFranzKafkaConsumer(config, set, topics, excludeTopics, consumeFn) } - return newSaramaConsumer(config, set, topics, consumeFn) + return newSaramaConsumer(config, set, topics, excludeTopics, consumeFn) } type logsHandler struct { diff --git a/receiver/kafkareceiver/testdata/config.yaml b/receiver/kafkareceiver/testdata/config.yaml index b7a07344b0710..96dc1ec54710c 100644 --- a/receiver/kafkareceiver/testdata/config.yaml +++ b/receiver/kafkareceiver/testdata/config.yaml @@ -97,4 +97,17 @@ kafka/message_marking_on_permanent_error_inherited: encoding: otlp_proto message_marking: after: true - on_error: true \ No newline at end of file + on_error: true +kafka/regex_topic_with_exclusion: + logs: + topic: "^logs-.*" + exclude_topic: "^logs-(test|dev)$" + encoding: otlp_proto + metrics: + topic: "^metrics-.*" + exclude_topic: "^metrics-internal-.*$" + encoding: otlp_proto + traces: + topic: "^traces-.*" + exclude_topic: "^traces-debug-.*$" + encoding: otlp_proto