Skip to content

Commit 741d6f4

Browse files
Feature / enable&disable duration (#81)
* feat: naming change * feat: duration validation disabled * feat: integration test added * feat: nonstopwork var defined for duration controls * feat: modify readme * feat: renaming and goroutine leak fix * feat: integration test topic name changed * feat: linter fix
1 parent 6066a2b commit 741d6f4

File tree

9 files changed

+195
-138
lines changed

9 files changed

+195
-138
lines changed

README.md

+36-36
Large diffs are not rendered by default.

internal/cron.go

-83
This file was deleted.

internal/cronsumer.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
88
)
99

10-
type kafkaCronsumer struct {
10+
type cronsumer struct {
1111
messageChannel chan MessageWrapper
1212

1313
kafkaConsumer Consumer
@@ -23,11 +23,11 @@ type kafkaCronsumer struct {
2323
cfg *kafka.Config
2424
}
2525

26-
func newKafkaCronsumer(cfg *kafka.Config, c func(message kafka.Message) error) *kafkaCronsumer {
26+
func newCronsumer(cfg *kafka.Config, c func(message kafka.Message) error) *cronsumer {
2727
cfg.SetDefaults()
2828
cfg.Validate()
2929

30-
return &kafkaCronsumer{
30+
return &cronsumer{
3131
cfg: cfg,
3232
messageChannel: make(chan MessageWrapper),
3333
kafkaConsumer: newConsumer(cfg),
@@ -40,13 +40,13 @@ func newKafkaCronsumer(cfg *kafka.Config, c func(message kafka.Message) error) *
4040
}
4141
}
4242

43-
func (k *kafkaCronsumer) SetupConcurrentWorkers(concurrency int) {
43+
func (k *cronsumer) SetupConcurrentWorkers(concurrency int) {
4444
for i := 0; i < concurrency; i++ {
4545
go k.processMessage()
4646
}
4747
}
4848

49-
func (k *kafkaCronsumer) Listen(ctx context.Context, strategyName string, cancelFuncWrapper *func()) {
49+
func (k *cronsumer) Listen(ctx context.Context, strategyName string, cancelFuncWrapper *func()) {
5050
startTime := time.Now()
5151
startTimeUnixNano := startTime.UnixNano()
5252

@@ -102,17 +102,17 @@ func (k *kafkaCronsumer) Listen(ctx context.Context, strategyName string, cancel
102102
}
103103
}
104104

105-
func (k *kafkaCronsumer) Stop() {
105+
func (k *cronsumer) Stop() {
106106
close(k.messageChannel)
107107
k.kafkaConsumer.Stop()
108108
k.kafkaProducer.Close()
109109
}
110110

111-
func (k *kafkaCronsumer) GetMetric() *CronsumerMetric {
111+
func (k *cronsumer) GetMetric() *CronsumerMetric {
112112
return k.metric
113113
}
114114

115-
func (k *kafkaCronsumer) processMessage() {
115+
func (k *cronsumer) processMessage() {
116116
for msg := range k.messageChannel {
117117
if err := k.consumeFn(msg.Message); err != nil {
118118
msg.AddHeader(createErrHeader(err))
@@ -121,20 +121,20 @@ func (k *kafkaCronsumer) processMessage() {
121121
}
122122
}
123123

124-
func (k *kafkaCronsumer) sendToMessageChannel(msg MessageWrapper) {
124+
func (k *cronsumer) sendToMessageChannel(msg MessageWrapper) {
125125
defer k.recoverMessage(msg)
126126
k.messageChannel <- msg
127127
}
128128

129-
func (k *kafkaCronsumer) recoverMessage(msg MessageWrapper) {
129+
func (k *cronsumer) recoverMessage(msg MessageWrapper) {
130130
// sending MessageWrapper to closed channel panic could be occurred cause of concurrency for exception topic listeners
131131
if r := recover(); r != nil {
132132
k.cfg.Logger.Warnf("Recovered MessageWrapper: %s", string(msg.Value))
133133
k.produce(msg)
134134
}
135135
}
136136

137-
func (k *kafkaCronsumer) produce(msg MessageWrapper) {
137+
func (k *cronsumer) produce(msg MessageWrapper) {
138138
if msg.IsGteMaxRetryCount(k.maxRetry) {
139139
k.cfg.Logger.Infof("Message from %s exceeds to retry limit %d. KafkaMessage: %s", k.cfg.Consumer.Topic, k.maxRetry, msg.Value)
140140

@@ -157,6 +157,6 @@ func (k *kafkaCronsumer) produce(msg MessageWrapper) {
157157
}
158158
}
159159

160-
func (k *kafkaCronsumer) isDeadLetterTopicFeatureEnabled() bool {
160+
func (k *cronsumer) isDeadLetterTopicFeatureEnabled() bool {
161161
return k.deadLetterTopic != ""
162162
}

internal/cronsumer_client.go

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
9+
"github.com/Trendyol/kafka-cronsumer/pkg/kafka"
10+
11+
"github.com/Trendyol/kafka-cronsumer/pkg/logger"
12+
13+
gocron "github.com/robfig/cron/v3"
14+
)
15+
16+
type cronsumerClient struct {
17+
cfg *kafka.Config
18+
cron *gocron.Cron
19+
consumer *cronsumer
20+
metricCollectors []prometheus.Collector
21+
}
22+
23+
func NewCronsumer(cfg *kafka.Config, fn kafka.ConsumeFn) kafka.Cronsumer {
24+
c := newCronsumer(cfg, fn)
25+
26+
return &cronsumerClient{
27+
cron: gocron.New(),
28+
consumer: c,
29+
cfg: cfg,
30+
metricCollectors: []prometheus.Collector{NewCollector(cfg.MetricPrefix, c.metric)},
31+
}
32+
}
33+
34+
func (s *cronsumerClient) WithLogger(logger logger.Interface) {
35+
s.cfg.Logger = logger
36+
}
37+
38+
func (s *cronsumerClient) Start() {
39+
s.setup()
40+
s.cron.Start()
41+
}
42+
43+
func (s *cronsumerClient) Run() {
44+
s.setup()
45+
s.cron.Run()
46+
}
47+
48+
func (s *cronsumerClient) Stop() {
49+
s.cron.Stop()
50+
s.consumer.Stop()
51+
}
52+
53+
func (s *cronsumerClient) Produce(message kafka.Message) error {
54+
return s.consumer.kafkaProducer.Produce(message)
55+
}
56+
57+
func (s *cronsumerClient) ProduceBatch(messages []kafka.Message) error {
58+
return s.consumer.kafkaProducer.ProduceBatch(messages)
59+
}
60+
61+
func (s *cronsumerClient) GetMetricCollectors() []prometheus.Collector {
62+
return s.metricCollectors
63+
}
64+
65+
func (s *cronsumerClient) setup() {
66+
cfg := s.cfg.Consumer
67+
68+
s.consumer.SetupConcurrentWorkers(cfg.Concurrency)
69+
schedule, err := gocron.ParseStandard(cfg.Cron)
70+
if err != nil {
71+
panic("Cron parse error: " + err.Error())
72+
}
73+
74+
_, _ = s.cron.AddFunc(cfg.Cron, func() {
75+
cancelFuncWrapper := s.startListen(cfg)
76+
if cfg.Duration == kafka.NonStopWork {
77+
now := time.Now()
78+
nextRun := schedule.Next(now)
79+
duration := nextRun.Sub(now)
80+
time.AfterFunc(duration, cancelFuncWrapper)
81+
} else {
82+
time.AfterFunc(cfg.Duration, cancelFuncWrapper)
83+
}
84+
})
85+
}
86+
87+
func (s *cronsumerClient) startListen(cfg kafka.ConsumerConfig) func() {
88+
s.cfg.Logger.Debug("Consuming " + cfg.Topic + " started at time: " + time.Now().String())
89+
90+
ctx, cancel := context.WithCancel(context.Background())
91+
cancelFuncWrapper := func() {
92+
s.cfg.Logger.Debug("Consuming " + cfg.Topic + " paused at " + time.Now().String())
93+
cancel()
94+
}
95+
96+
go s.consumer.Listen(ctx, cfg.BackOffStrategy.String(), &cancelFuncWrapper)
97+
return cancelFuncWrapper
98+
}
File renamed without changes.

internal/cronsumer_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func Test_Produce_Max_Retry_Count_Reach(t *testing.T) {
2828
var firstConsumerFn kafka.ConsumeFn = func(message kafka.Message) error {
2929
return nil
3030
}
31-
c := &kafkaCronsumer{
31+
c := &cronsumer{
3232
cfg: kafkaConfig,
3333
messageChannel: make(chan MessageWrapper),
3434
kafkaConsumer: mockConsumer{},
@@ -63,7 +63,7 @@ func Test_Produce_Max_Retry_Count_Reach_Dead_Letter_Topic_Feature_Enabled(t *tes
6363
var firstConsumerFn kafka.ConsumeFn = func(message kafka.Message) error {
6464
return nil
6565
}
66-
c := &kafkaCronsumer{
66+
c := &cronsumer{
6767
cfg: &kafka.Config{
6868
Logger: logger.New("info"),
6969
},
@@ -112,7 +112,7 @@ func Test_Produce_With_Retry(t *testing.T) {
112112
return nil
113113
}
114114
producer := newMockProducer()
115-
c := &kafkaCronsumer{
115+
c := &cronsumer{
116116
cfg: kafkaConfig,
117117
messageChannel: make(chan MessageWrapper),
118118
kafkaConsumer: mockConsumer{},
@@ -159,7 +159,7 @@ func Test_Recover_Message(t *testing.T) {
159159
return nil
160160
}
161161
producer := newMockProducer()
162-
c := &kafkaCronsumer{
162+
c := &cronsumer{
163163
cfg: kafkaConfig,
164164
messageChannel: make(chan MessageWrapper),
165165
kafkaConsumer: mockConsumer{},

pkg/kafka/config.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ const (
2020
FixedBackOffStrategy = "fixed"
2121
)
2222

23+
//nolint:all
24+
var NonStopWork time.Duration = 0
25+
2326
type Config struct {
2427
Brokers []string `yaml:"brokers"`
2528
Consumer ConsumerConfig `yaml:"consumer"`
@@ -139,9 +142,6 @@ func (c *Config) Validate() {
139142
if c.Consumer.Cron == "" {
140143
panic("you have to set cron expression")
141144
}
142-
if c.Consumer.Duration == 0 {
143-
panic("you have to set panic duration")
144-
}
145145
if !isValidBackOffStrategy(c.Consumer.BackOffStrategy) {
146146
panic("you have to set valid backoff strategy")
147147
}
File renamed without changes.

test/integration/integration_test.go

+42
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,48 @@ func Test_Should_Discard_Message_When_Header_Filter_Defined(t *testing.T) {
447447
}
448448
}
449449

450+
func Test_Should_Consume_Exception_Message_Successfully_When_Duration_Zero(t *testing.T) {
451+
// Given
452+
t.Parallel()
453+
topic := "exception-no-duration"
454+
_, cleanUp := createTopic(t, topic)
455+
defer cleanUp()
456+
457+
config := &kafka.Config{
458+
Brokers: []string{"localhost:9092"},
459+
Consumer: kafka.ConsumerConfig{
460+
GroupID: "sample-consumer",
461+
Topic: topic,
462+
Cron: "*/1 * * * *",
463+
Duration: kafka.NonStopWork, // duration set as 0
464+
},
465+
LogLevel: "info",
466+
}
467+
468+
waitMessageCh := make(chan kafka.Message)
469+
470+
var consumeFn kafka.ConsumeFn = func(message kafka.Message) error {
471+
fmt.Printf("consumer > Message received: %s\n", string(message.Value))
472+
waitMessageCh <- message
473+
return nil
474+
}
475+
476+
c := cronsumer.New(config, consumeFn)
477+
c.Start()
478+
479+
// When
480+
expectedMessage := kafka.Message{Topic: topic, Value: []byte("some message")}
481+
if err := c.Produce(expectedMessage); err != nil {
482+
fmt.Println("Produce err", err.Error())
483+
}
484+
485+
// Then
486+
actualMessage := <-waitMessageCh
487+
if !bytes.Equal(actualMessage.Value, expectedMessage.Value) {
488+
t.Errorf("Expected: %s, Actual: %s", expectedMessage.Value, actualMessage.Value)
489+
}
490+
}
491+
450492
func getRetryCount(message kafka.Message) int {
451493
for _, header := range message.Headers {
452494
if header.Key == "x-retry-count" {

0 commit comments

Comments
 (0)