Skip to content

Commit d261a9a

Browse files
author
A.Samet İleri
authored
Merge pull request #76 from Trendyol/feature/verify-topic
feat: add verify topic feature
2 parents 94c756a + cbc6017 commit d261a9a

File tree

7 files changed

+205
-2
lines changed

7 files changed

+205
-2
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ func StartAPI(cfg kafka.Config, metricCollectors ...prometheus.Collector) {
141141
| `consumer.groupId` | Exception consumer group id | | exception-consumer-group |
142142
| `consumer.maxRetry` | Maximum retry value for attempting to retry a message | 3 | |
143143
| `consumer.concurrency` | Number of goroutines used at listeners | 1 | |
144+
| `consumer.verifyTopicOnStartup` | it checks existence of the given retry topic on the kafka cluster. | false | |
144145
| `consumer.minBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/[email protected]#ReaderConfig.MinBytes) | 1 | |
145146
| `consumer.maxBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/[email protected]#ReaderConfig.MaxBytes) | 1 MB | |
146147
| `consumer.maxWait` | [see doc](https://pkg.go.dev/github.com/segmentio/[email protected]#ReaderConfig.MaxWait) | 10s | |

cronsumer.go

+18
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,29 @@ package cronsumer
55
import (
66
"github.com/Trendyol/kafka-cronsumer/internal"
77
"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
8+
"github.com/Trendyol/kafka-cronsumer/pkg/logger"
89
)
910

1011
// New returns the newly created kafka consumer instance.
1112
// config.Config specifies cron, duration and so many parameters.
1213
// ConsumeFn describes how to consume messages from specified topic.
1314
func New(cfg *kafka.Config, c kafka.ConsumeFn) kafka.Cronsumer {
15+
cfg.Logger = logger.New(cfg.LogLevel)
16+
17+
if cfg.Consumer.VerifyTopicOnStartup {
18+
kclient, err := internal.NewKafkaClient(cfg)
19+
if err != nil {
20+
panic("panic when initializing kafka client for verify topic error: " + err.Error())
21+
}
22+
exist, err := internal.VerifyTopics(kclient, cfg.Consumer.Topic)
23+
if err != nil {
24+
panic("panic " + err.Error())
25+
}
26+
if !exist {
27+
panic("topic: " + cfg.Consumer.Topic + " does not exist, please check cluster authority etc.")
28+
}
29+
cfg.Logger.Infof("Topic [%s] verified successfully!", cfg.Consumer.Topic)
30+
}
31+
1432
return internal.NewCronsumer(cfg, c)
1533
}

examples/single-consumer/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ func main() {
1212
Brokers: []string{"localhost:29092"},
1313
Consumer: kafka.ConsumerConfig{
1414
GroupID: "sample-consumer",
15-
Topic: "exception",
15+
Topic: "exception-not-exist",
1616
Cron: "*/1 * * * *",
1717
Duration: 20 * time.Second,
1818
},

internal/cron.go

-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ type cronsumer struct {
2121
}
2222

2323
func NewCronsumer(cfg *kafka.Config, fn kafka.ConsumeFn) kafka.Cronsumer {
24-
cfg.Logger = logger.New(cfg.LogLevel)
2524
c := newKafkaCronsumer(cfg, fn)
2625

2726
return &cronsumer{

internal/verify_topic.go

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
8+
segmentio "github.com/segmentio/kafka-go"
9+
)
10+
11+
type kafkaClient interface {
12+
Metadata(ctx context.Context, req *segmentio.MetadataRequest) (*segmentio.MetadataResponse, error)
13+
GetClient() *segmentio.Client
14+
}
15+
16+
type client struct {
17+
*segmentio.Client
18+
}
19+
20+
func NewKafkaClient(cfg *kafka.Config) (kafkaClient, error) {
21+
kc := client{
22+
Client: &segmentio.Client{
23+
Addr: segmentio.TCP(cfg.Brokers...),
24+
},
25+
}
26+
27+
transport := &segmentio.Transport{
28+
MetadataTopics: []string{cfg.Consumer.Topic},
29+
}
30+
31+
if cfg.SASL.Enabled {
32+
transport.TLS = NewTLSConfig(cfg.SASL)
33+
transport.SASL = Mechanism(cfg.SASL)
34+
}
35+
36+
kc.Transport = transport
37+
return &kc, nil
38+
}
39+
40+
func (c *client) GetClient() *segmentio.Client {
41+
return c.Client
42+
}
43+
44+
func VerifyTopics(client kafkaClient, topics ...string) (bool, error) {
45+
metadata, err := client.Metadata(context.Background(), &segmentio.MetadataRequest{
46+
Topics: topics,
47+
})
48+
if err != nil {
49+
return false, fmt.Errorf("error when during verifyTopics metadata request %w", err)
50+
}
51+
return checkTopicsWithinMetadata(metadata, topics)
52+
}
53+
54+
func checkTopicsWithinMetadata(metadata *segmentio.MetadataResponse, topics []string) (bool, error) {
55+
metadataTopics := make(map[string]struct{}, len(metadata.Topics))
56+
for _, topic := range metadata.Topics {
57+
if topic.Error != nil {
58+
continue
59+
}
60+
metadataTopics[topic.Name] = struct{}{}
61+
}
62+
63+
for _, topic := range topics {
64+
if _, exist := metadataTopics[topic]; !exist {
65+
return false, nil
66+
}
67+
}
68+
return true, nil
69+
}

internal/verify_topic_test.go

+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
9+
segmentio "github.com/segmentio/kafka-go"
10+
)
11+
12+
type mockKafkaClientWrapper struct {
13+
wantErr bool
14+
wantExistTopic bool
15+
}
16+
17+
func (m mockKafkaClientWrapper) GetClient() *segmentio.Client {
18+
return &segmentio.Client{}
19+
}
20+
21+
func (m mockKafkaClientWrapper) Metadata(_ context.Context, _ *segmentio.MetadataRequest) (*segmentio.MetadataResponse, error) {
22+
if m.wantErr {
23+
return nil, errors.New("metadataReqErr")
24+
}
25+
26+
if !m.wantExistTopic {
27+
return &segmentio.MetadataResponse{
28+
Topics: []segmentio.Topic{
29+
{Name: "topic1", Error: segmentio.UnknownTopicOrPartition},
30+
{Name: "topic2", Error: nil},
31+
},
32+
}, nil
33+
}
34+
35+
return &segmentio.MetadataResponse{
36+
Topics: []segmentio.Topic{
37+
{Name: "topic1", Error: nil},
38+
{Name: "topic2", Error: nil},
39+
},
40+
}, nil
41+
}
42+
43+
func Test_kafkaClientWrapper_VerifyTopics(t *testing.T) {
44+
t.Run("Should_Return_Error_When_Metadata_Request_Has_Failed", func(t *testing.T) {
45+
// Given
46+
mockClient := mockKafkaClientWrapper{wantErr: true}
47+
48+
// When
49+
_, err := VerifyTopics(mockClient, "topic1")
50+
51+
// Then
52+
if err == nil {
53+
t.Error("metadata request must be failed!")
54+
}
55+
})
56+
t.Run("Should_Return_False_When_Given_Topic_Does_Not_Exist", func(t *testing.T) {
57+
// Given
58+
mockClient := mockKafkaClientWrapper{wantExistTopic: false}
59+
60+
// When
61+
exist, err := VerifyTopics(mockClient, "topic1")
62+
63+
// Then
64+
if exist {
65+
t.Errorf("topic %s must not exist", "topic1")
66+
}
67+
if err != nil {
68+
t.Error("err must be nil")
69+
}
70+
})
71+
t.Run("Should_Return_True_When_Given_Topic_Exist", func(t *testing.T) {
72+
// Given
73+
mockClient := mockKafkaClientWrapper{wantExistTopic: true}
74+
75+
// When
76+
exist, err := VerifyTopics(mockClient, "topic1")
77+
78+
// Then
79+
if !exist {
80+
t.Errorf("topic %s must exist", "topic1")
81+
}
82+
if err != nil {
83+
t.Error("err must be nil")
84+
}
85+
})
86+
}
87+
88+
func Test_newKafkaClient(t *testing.T) {
89+
// Given
90+
cfg := &kafka.Config{Brokers: []string{"127.0.0.1:9092"}, Consumer: kafka.ConsumerConfig{Topic: "topic"}}
91+
92+
// When
93+
sut, err := NewKafkaClient(cfg)
94+
95+
// Then
96+
if sut.GetClient().Addr.String() != "127.0.0.1:9092" {
97+
t.Errorf("broker address must be 127.0.0.1:9092")
98+
}
99+
if err != nil {
100+
t.Errorf("err must be nil")
101+
}
102+
}
103+
104+
func Test_kClient_GetClient(t *testing.T) {
105+
// Given
106+
mockClient := mockKafkaClientWrapper{}
107+
108+
// When
109+
sut := mockClient.GetClient()
110+
111+
// Then
112+
if sut == nil {
113+
t.Error("client must not be nil")
114+
}
115+
}

pkg/kafka/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ type ConsumerConfig struct {
7575
Cron string `yaml:"cron"`
7676
BackOffStrategy BackoffStrategyInterface `yaml:"backOffStrategy"`
7777
SkipMessageByHeaderFn SkipMessageByHeaderFn `yaml:"skipMessageByHeaderFn"`
78+
VerifyTopicOnStartup bool `yaml:"verifyTopicOnStartup"`
7879
}
7980

8081
type ProducerConfig struct {

0 commit comments

Comments
 (0)