Skip to content

Commit 218a516

Browse files
authored
feat: Verify topic by default (#84)
1 parent 51636c8 commit 218a516

File tree

3 files changed

+15
-17
lines changed

3 files changed

+15
-17
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,6 @@ func StartAPI(cfg kafka.Config, metricCollectors ...prometheus.Collector) {
141141
| `consumer.groupId` | Exception consumer group id | | exception-consumer-group |
142142
| `consumer.maxRetry` | Maximum retry value for attempting to retry a message | 3 | |
143143
| `consumer.concurrency` | Number of goroutines used at listeners | 1 | |
144-
| `consumer.verifyTopicOnStartup` | it checks existence of the given retry topic on the kafka cluster. | false | |
145144
| `consumer.minBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/[email protected]#ReaderConfig.MinBytes) | 1 | |
146145
| `consumer.maxBytes` | [see doc](https://pkg.go.dev/github.com/segmentio/[email protected]#ReaderConfig.MaxBytes) | 1 MB | |
147146
| `consumer.maxWait` | [see doc](https://pkg.go.dev/github.com/segmentio/[email protected]#ReaderConfig.MaxWait) | 10s | |

cronsumer.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,21 @@ import (
1313
// ConsumeFn describes how to consume messages from specified topic.
1414
func New(cfg *kafka.Config, c kafka.ConsumeFn) kafka.Cronsumer {
1515
cfg.Logger = logger.New(cfg.LogLevel)
16+
verifyTopicOnStartup(cfg)
17+
cfg.Logger.Infof("Topic [%s] verified successfully!", cfg.Consumer.Topic)
18+
return internal.NewCronsumer(cfg, c)
19+
}
1620

17-
if cfg.Consumer.VerifyTopicOnStartup {
18-
kclient, err := internal.NewKafkaClient(cfg)
19-
if err != nil {
20-
panic("panic when initializing kafka client for verify topic error: " + err.Error())
21-
}
22-
exist, err := internal.VerifyTopics(kclient, cfg.Consumer.Topic)
23-
if err != nil {
24-
panic("panic " + err.Error())
25-
}
26-
if !exist {
27-
panic("topic: " + cfg.Consumer.Topic + " does not exist, please check cluster authority etc.")
28-
}
29-
cfg.Logger.Infof("Topic [%s] verified successfully!", cfg.Consumer.Topic)
21+
func verifyTopicOnStartup(cfg *kafka.Config) {
22+
kclient, err := internal.NewKafkaClient(cfg)
23+
if err != nil {
24+
panic("panic when initializing kafka client for verify topic error: " + err.Error())
25+
}
26+
exist, err := internal.VerifyTopics(kclient, cfg.Consumer.Topic)
27+
if err != nil {
28+
panic("panic " + err.Error())
29+
}
30+
if !exist {
31+
panic("topic: " + cfg.Consumer.Topic + " does not exist, please check cluster authority etc.")
3032
}
31-
32-
return internal.NewCronsumer(cfg, c)
3333
}

pkg/kafka/config.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ type ConsumerConfig struct {
7878
Cron string `yaml:"cron"`
7979
BackOffStrategy BackoffStrategyInterface `yaml:"backOffStrategy"`
8080
SkipMessageByHeaderFn SkipMessageByHeaderFn `yaml:"skipMessageByHeaderFn"`
81-
VerifyTopicOnStartup bool `yaml:"verifyTopicOnStartup"`
8281
QueueCapacity int `yaml:"queueCapacity"`
8382
}
8483

0 commit comments

Comments
 (0)