Skip to content

Commit 1822363

Browse files
authored
fix: better protect against nil topicAttributeFunc (#618)
## ❓ Why is this being changed systemtests are failing because of a nil topic attribute func. The topic manager does not handle nil func. See https://github.com/elastic/apm-queue/actions/runs/12741942829/job/35509307227 ## 🧑‍💻 What is being changed set the default func in the common config instead of the metrics hook ## ✅ How to validate the change run systemtests
1 parent 5b48d80 commit 1822363

File tree

4 files changed

+24
-26
lines changed

4 files changed

+24
-26
lines changed

kafka/common.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/twmb/franz-go/pkg/sasl/plain"
3535
"github.com/twmb/franz-go/plugin/kzap"
3636
"go.opentelemetry.io/otel"
37+
"go.opentelemetry.io/otel/attribute"
3738
"go.opentelemetry.io/otel/metric"
3839
"go.opentelemetry.io/otel/trace"
3940
"go.uber.org/zap"
@@ -247,6 +248,12 @@ func (cfg *CommonConfig) finalize() error {
247248
if cfg.TopicLogFieldFunc != nil {
248249
cfg.TopicLogFieldFunc = topicFieldFunc(cfg.TopicLogFieldFunc)
249250
}
251+
if cfg.TopicAttributeFunc == nil {
252+
cfg.TopicAttributeFunc = func(topic string) attribute.KeyValue {
253+
return attribute.KeyValue{}
254+
}
255+
}
256+
250257
return errors.Join(errs...)
251258
}
252259

kafka/common_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func TestCommonConfig(t *testing.T) {
4747
t.Helper()
4848
err := in.finalize()
4949
require.NoError(t, err)
50+
in.TopicAttributeFunc = nil
5051
in.TopicLogFieldFunc = nil
5152
in.hooks = nil
5253
assert.Equal(t, expected, in)

kafka/consumer_test.go

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -780,10 +780,7 @@ func TestConsumerConfigFinalizer(t *testing.T) {
780780
}
781781
err := cfg.finalize()
782782
require.NoError(t, err)
783-
assert.NotNil(t, cfg.Processor)
784-
cfg.Processor = nil
785-
assert.NotNil(t, cfg.Logger)
786-
cfg.Logger = nil
783+
assertNotNilOptions(t, &cfg)
787784

788785
assert.Equal(t, ConsumerConfig{
789786
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
@@ -805,10 +802,7 @@ func TestConsumerConfigFinalizer(t *testing.T) {
805802
}
806803
err := cfg.finalize()
807804
require.NoError(t, err)
808-
assert.NotNil(t, cfg.Processor)
809-
cfg.Processor = nil
810-
assert.NotNil(t, cfg.Logger)
811-
cfg.Logger = nil
805+
assertNotNilOptions(t, &cfg)
812806

813807
assert.Equal(t, ConsumerConfig{
814808
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
@@ -830,10 +824,7 @@ func TestConsumerConfigFinalizer(t *testing.T) {
830824
}
831825
err := cfg.finalize()
832826
require.NoError(t, err)
833-
assert.NotNil(t, cfg.Processor)
834-
cfg.Processor = nil
835-
assert.NotNil(t, cfg.Logger)
836-
cfg.Logger = nil
827+
assertNotNilOptions(t, &cfg)
837828

838829
assert.Equal(t, ConsumerConfig{
839830
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
@@ -855,10 +846,7 @@ func TestConsumerConfigFinalizer(t *testing.T) {
855846
}
856847
err := cfg.finalize()
857848
require.NoError(t, err)
858-
assert.NotNil(t, cfg.Processor)
859-
cfg.Processor = nil
860-
assert.NotNil(t, cfg.Logger)
861-
cfg.Logger = nil
849+
assertNotNilOptions(t, &cfg)
862850

863851
assert.Equal(t, ConsumerConfig{
864852
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
@@ -880,10 +868,7 @@ func TestConsumerConfigFinalizer(t *testing.T) {
880868
}
881869
err := cfg.finalize()
882870
require.NoError(t, err)
883-
assert.NotNil(t, cfg.Processor)
884-
cfg.Processor = nil
885-
assert.NotNil(t, cfg.Logger)
886-
cfg.Logger = nil
871+
assertNotNilOptions(t, &cfg)
887872

888873
assert.Equal(t, ConsumerConfig{
889874
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
@@ -909,6 +894,17 @@ func TestConsumerConfigFinalizer(t *testing.T) {
909894
})
910895
}
911896

897+
func assertNotNilOptions(t testing.TB, cfg *ConsumerConfig) {
898+
t.Helper()
899+
900+
assert.NotNil(t, cfg.Processor)
901+
cfg.Processor = nil
902+
assert.NotNil(t, cfg.Logger)
903+
cfg.Logger = nil
904+
assert.NotNil(t, cfg.TopicAttributeFunc)
905+
cfg.TopicAttributeFunc = nil
906+
}
907+
912908
func newConsumer(t testing.TB, cfg ConsumerConfig) *Consumer {
913909
if cfg.MaxPollWait <= 0 {
914910
// Lower MaxPollWait, ShutdownGracePeriod to speed up execution.

kafka/metrics.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -299,12 +299,6 @@ func newKgoHooks(mp metric.MeterProvider, namespace, topicPrefix string,
299299
return nil, formatMetricError(throttlingDurationKey, err)
300300
}
301301

302-
if topicAttributeFunc == nil {
303-
topicAttributeFunc = func(topic string) attribute.KeyValue {
304-
return attribute.KeyValue{}
305-
}
306-
}
307-
308302
return &metricHooks{
309303
namespace: namespace,
310304
topicPrefix: topicPrefix,

0 commit comments

Comments
 (0)