Skip to content

Commit 5b48d80

Browse files
authored
fix: use topic attribute function in manager (#614)
## ❓ Why is this being changed kafka manager is not using topic attribute function causing some labels to be missing when reporting consumer lag ## 🧑‍💻 What is being changed use the topic attribute function in the config if available ## ✅ How to validate the change tests have been updated to ensure the label is there
1 parent 28ffad4 commit 5b48d80

File tree

2 files changed

+50
-20
lines changed

2 files changed

+50
-20
lines changed

kafka/manager.go

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -136,22 +136,30 @@ func (m *Manager) DeleteTopics(ctx context.Context, topics ...apmqueue.Topic) er
136136
deleteErrors = append(deleteErrors,
137137
fmt.Errorf("failed to delete topic %q: %w", topic, err),
138138
)
139+
attrs := []attribute.KeyValue{
140+
semconv.MessagingSystemKey.String("kafka"),
141+
attribute.String("outcome", "failure"),
142+
attribute.String("topic", topic),
143+
}
144+
if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) {
145+
attrs = append(attrs, kv)
146+
}
139147
m.deleted.Add(context.Background(), 1, metric.WithAttributeSet(
140-
attribute.NewSet(
141-
semconv.MessagingSystemKey.String("kafka"),
142-
attribute.String("outcome", "failure"),
143-
attribute.String("topic", topic),
144-
),
148+
attribute.NewSet(attrs...),
145149
))
146150
}
147151
continue
148152
}
153+
attrs := []attribute.KeyValue{
154+
semconv.MessagingSystemKey.String("kafka"),
155+
attribute.String("outcome", "success"),
156+
attribute.String("topic", topic),
157+
}
158+
if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) {
159+
attrs = append(attrs, kv)
160+
}
149161
m.deleted.Add(context.Background(), 1, metric.WithAttributeSet(
150-
attribute.NewSet(
151-
semconv.MessagingSystemKey.String("kafka"),
152-
attribute.String("outcome", "success"),
153-
attribute.String("topic", topic),
154-
),
162+
attribute.NewSet(attrs...),
155163
))
156164
logger.Info("deleted kafka topic")
157165
}
@@ -284,23 +292,31 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m
284292
count := memberAssignments[key]
285293
count++
286294
memberAssignments[key] = count
295+
attrs := []attribute.KeyValue{
296+
attribute.String("group", l.Group),
297+
attribute.String("topic", topic),
298+
attribute.Int("partition", int(partition)),
299+
}
300+
if kv := m.cfg.TopicAttributeFunc(topic); kv != (attribute.KeyValue{}) {
301+
attrs = append(attrs, kv)
302+
}
287303
o.ObserveInt64(
288304
consumerGroupLagMetric, lag.Lag,
289-
metric.WithAttributeSet(attribute.NewSet(
290-
attribute.String("group", l.Group),
291-
attribute.String("topic", topic),
292-
attribute.Int("partition", int(partition)),
293-
)),
305+
metric.WithAttributeSet(attribute.NewSet(attrs...)),
294306
)
295307
}
296308
}
297309
for key, count := range memberAssignments {
310+
attrs := []attribute.KeyValue{
311+
attribute.String("group", l.Group),
312+
attribute.String("topic", key.topic),
313+
attribute.String("client_id", key.clientID),
314+
}
315+
if kv := m.cfg.TopicAttributeFunc(key.topic); kv != (attribute.KeyValue{}) {
316+
attrs = append(attrs, kv)
317+
}
298318
o.ObserveInt64(assignmentMetric, count, metric.WithAttributeSet(
299-
attribute.NewSet(
300-
attribute.String("group", l.Group),
301-
attribute.String("topic", key.topic),
302-
attribute.String("client_id", key.clientID),
303-
),
319+
attribute.NewSet(attrs...),
304320
))
305321
}
306322
})

kafka/manager_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ func TestManagerDeleteTopics(t *testing.T) {
6666
commonConfig.Logger = zap.New(core)
6767
commonConfig.TracerProvider = tp
6868
commonConfig.MeterProvider = mt.MeterProvider
69+
commonConfig.TopicAttributeFunc = func(topic string) attribute.KeyValue { return attribute.KeyValue{} }
6970
m, err := NewManager(ManagerConfig{CommonConfig: commonConfig})
7071
require.NoError(t, err)
7172
t.Cleanup(func() { m.Close() })
@@ -180,6 +181,9 @@ func TestManagerMetrics(t *testing.T) {
180181
commonConfig.Logger = zap.New(core)
181182
commonConfig.TracerProvider = tp
182183
commonConfig.MeterProvider = mp
184+
commonConfig.TopicAttributeFunc = func(topic string) attribute.KeyValue {
185+
return attribute.Bool("foo", true)
186+
}
183187
m, err := NewManager(ManagerConfig{CommonConfig: commonConfig})
184188
require.NoError(t, err)
185189
t.Cleanup(func() { m.Close() })
@@ -459,34 +463,39 @@ func TestManagerMetrics(t *testing.T) {
459463
attribute.String("group", "consumer1"),
460464
attribute.String("topic", "topic1"),
461465
attribute.Int("partition", 1),
466+
attribute.Bool("foo", true),
462467
),
463468
Value: 0, // end offset = 1, committed = 1
464469
}, {
465470
Attributes: attribute.NewSet(
466471
attribute.String("group", "consumer2"),
467472
attribute.String("topic", "topic1"),
468473
attribute.Int("partition", 2),
474+
attribute.Bool("foo", true),
469475
),
470476
Value: 1, // end offset = 2, committed = 1
471477
}, {
472478
Attributes: attribute.NewSet(
473479
attribute.String("group", "consumer2"),
474480
attribute.String("topic", "topic2"),
475481
attribute.Int("partition", 3),
482+
attribute.Bool("foo", true),
476483
),
477484
Value: 2, // end offset = 3, committed = 1
478485
}, {
479486
Attributes: attribute.NewSet(
480487
attribute.String("group", "consumer3"),
481488
attribute.String("topic", "topic3"),
482489
attribute.Int("partition", 4),
490+
attribute.Bool("foo", true),
483491
),
484492
Value: 4, // end offset = 4, nothing committed
485493
}, {
486494
Attributes: attribute.NewSet(
487495
attribute.String("group", "consumer3"),
488496
attribute.String("topic", "mytopic"),
489497
attribute.Int("partition", 1),
498+
attribute.Bool("foo", true),
490499
),
491500
Value: 1, // end offset = 1, nothing committed
492501
}},
@@ -498,34 +507,39 @@ func TestManagerMetrics(t *testing.T) {
498507
attribute.String("client_id", "client_id"),
499508
attribute.String("group", "consumer1"),
500509
attribute.String("topic", "topic1"),
510+
attribute.Bool("foo", true),
501511
),
502512
Value: 1,
503513
}, {
504514
Attributes: attribute.NewSet(
505515
attribute.String("client_id", "client_id"),
506516
attribute.String("group", "consumer2"),
507517
attribute.String("topic", "topic2"),
518+
attribute.Bool("foo", true),
508519
),
509520
Value: 1,
510521
}, {
511522
Attributes: attribute.NewSet(
512523
attribute.String("client_id", "client_id"),
513524
attribute.String("group", "consumer2"),
514525
attribute.String("topic", "topic1"),
526+
attribute.Bool("foo", true),
515527
),
516528
Value: 1,
517529
}, {
518530
Attributes: attribute.NewSet(
519531
attribute.String("client_id", "client_id"),
520532
attribute.String("group", "consumer3"),
521533
attribute.String("topic", "topic3"),
534+
attribute.Bool("foo", true),
522535
),
523536
Value: 1,
524537
}, {
525538
Attributes: attribute.NewSet(
526539
attribute.String("client_id", "client_id"),
527540
attribute.String("group", "consumer3"),
528541
attribute.String("topic", "mytopic"),
542+
attribute.Bool("foo", true),
529543
),
530544
Value: 1,
531545
}},

0 commit comments

Comments
 (0)