Skip to content

Commit b5f78fb

Browse files
authored
ignore unknown topic warning (#709)
1 parent ba12bed commit b5f78fb

File tree

2 files changed

+137
-0
lines changed

2 files changed

+137
-0
lines changed

kafka/manager.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,15 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m
277277
}
278278
for partition, lag := range partitions {
279279
if lag.Err != nil {
280+
if lag.Err == kerr.UnknownTopicOrPartition {
281+
logger.Debug("error getting consumer group lag",
282+
zap.String("group", l.Group),
283+
zap.String("topic", topic),
284+
zap.Int32("partition", partition),
285+
zap.Error(lag.Err),
286+
)
287+
continue
288+
}
280289
logger.Warn("error getting consumer group lag",
281290
zap.String("group", l.Group),
282291
zap.String("topic", topic),

kafka/manager_test.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,134 @@ func TestListTopics(t *testing.T) {
771771
assert.Equal(t, []string{"name_space-mytopic", "name_space-topic1", "name_space-topic3"}, topics)
772772
}
773773

774+
func TestUnknownTopicOrPartition(t *testing.T) {
775+
testCases := []struct {
776+
desc string
777+
logLevel zapcore.Level
778+
}{
779+
{
780+
desc: "log UNKNOWN_TOPIC_OR_PARTITION error for consumer lag",
781+
logLevel: zapcore.DebugLevel,
782+
},
783+
{
784+
desc: "ignore UNKNOWN_TOPIC_OR_PARTITION error for consumer lag",
785+
logLevel: zapcore.WarnLevel,
786+
},
787+
}
788+
for _, tc := range testCases {
789+
reader := metric.NewManualReader()
790+
mp := metric.NewMeterProvider(metric.WithReader(reader))
791+
defer mp.Shutdown(context.Background())
792+
793+
cluster, commonConfig := newFakeCluster(t)
794+
core, observedLogs := observer.New(tc.logLevel)
795+
796+
commonConfig.Logger = zap.New(core)
797+
commonConfig.MeterProvider = mp
798+
799+
m, err := NewManager(ManagerConfig{CommonConfig: commonConfig})
800+
require.NoError(t, err)
801+
t.Cleanup(func() { m.Close() })
802+
803+
registration, err := m.MonitorConsumerLag([]apmqueue.TopicConsumer{
804+
{
805+
Topic: "topic",
806+
Consumer: "consumer",
807+
},
808+
})
809+
require.NoError(t, err)
810+
t.Cleanup(func() { registration.Unregister() })
811+
812+
var describeGroupsRequest *kmsg.DescribeGroupsRequest
813+
cluster.ControlKey(kmsg.DescribeGroups.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
814+
describeGroupsRequest = req.(*kmsg.DescribeGroupsRequest)
815+
return &kmsg.DescribeGroupsResponse{
816+
Version: describeGroupsRequest.Version,
817+
Groups: []kmsg.DescribeGroupsResponseGroup{
818+
{
819+
Group: "consumer",
820+
ProtocolType: "consumer",
821+
Members: []kmsg.DescribeGroupsResponseGroupMember{{
822+
MemberAssignment: (&kmsg.ConsumerMemberAssignment{
823+
Version: 2,
824+
Topics: []kmsg.ConsumerMemberAssignmentTopic{{
825+
Topic: "name_space-topic",
826+
Partitions: []int32{1},
827+
}},
828+
}).AppendTo(nil),
829+
}},
830+
},
831+
},
832+
}, nil, true
833+
})
834+
835+
var offsetFetchRequest *kmsg.OffsetFetchRequest
836+
cluster.ControlKey(kmsg.OffsetFetch.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
837+
offsetFetchRequest = req.(*kmsg.OffsetFetchRequest)
838+
return &kmsg.OffsetFetchResponse{
839+
Version: offsetFetchRequest.Version,
840+
Groups: []kmsg.OffsetFetchResponseGroup{{
841+
Group: "consumer",
842+
Topics: []kmsg.OffsetFetchResponseGroupTopic{{
843+
Topic: "name_space-topic",
844+
Partitions: []kmsg.OffsetFetchResponseGroupTopicPartition{{
845+
Partition: 1,
846+
Offset: 100,
847+
}},
848+
}},
849+
}},
850+
}, nil, true
851+
})
852+
853+
var listOffsetsRequest *kmsg.ListOffsetsRequest
854+
cluster.ControlKey(kmsg.ListOffsets.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
855+
cluster.KeepControl()
856+
listOffsetsRequest = req.(*kmsg.ListOffsetsRequest)
857+
return &kmsg.ListOffsetsResponse{
858+
Version: listOffsetsRequest.Version,
859+
Topics: []kmsg.ListOffsetsResponseTopic{{
860+
Topic: "name_space-topic",
861+
Partitions: []kmsg.ListOffsetsResponseTopicPartition{{
862+
Partition: 1,
863+
Offset: 200,
864+
ErrorCode: kerr.UnknownTopicOrPartition.Code,
865+
}},
866+
}},
867+
}, nil, true
868+
})
869+
870+
rm := metricdata.ResourceMetrics{}
871+
err = reader.Collect(context.Background(), &rm)
872+
require.NoError(t, err)
873+
874+
matchingLogs := observedLogs.FilterFieldKey("group")
875+
actual := matchingLogs.AllUntimed()
876+
877+
if tc.logLevel == zapcore.DebugLevel {
878+
expected := []observer.LoggedEntry{
879+
{
880+
Entry: zapcore.Entry{
881+
Level: zapcore.DebugLevel,
882+
LoggerName: "kafka",
883+
Message: "error getting consumer group lag",
884+
},
885+
Context: []zapcore.Field{
886+
zap.String("namespace", "name_space"),
887+
zap.String("group", "consumer"),
888+
zap.String("topic", "topic"),
889+
zap.Int32("partition", 1),
890+
zap.Error(kerr.UnknownTopicOrPartition),
891+
},
892+
},
893+
}
894+
assert.Len(t, actual, 1)
895+
assert.Equal(t, expected, actual)
896+
} else {
897+
assert.Empty(t, actual)
898+
}
899+
}
900+
}
901+
774902
func newFakeCluster(t testing.TB) (*kfake.Cluster, CommonConfig) {
775903
cluster, err := kfake.NewCluster(
776904
// Just one broker to simplify dealing with sharded requests.

0 commit comments

Comments
 (0)