Skip to content

Commit ac1f317

Browse files
feat: enable balancer support to the retry configuration (#123)
* feat: enable balancer support to the retry configuration * chore: add balancer tests --------- Co-authored-by: Abdulsametileri <[email protected]>
1 parent f9bc1d6 commit ac1f317

File tree

8 files changed

+108
-4
lines changed

8 files changed

+108
-4
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
258258
| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | |
259259
| `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | |
260260
| `batchConfiguration.preBatchFn` | This function enable for transforming messages before batch consuming starts | |
261+
| `batchConfiguration.balancer` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Balancer) | leastBytes |
261262
| `tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" |
262263
| `tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
263264
| `sasl.authType` | `SCRAM` or `PLAIN` | |

balancer.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package kafka
2+
3+
import "github.com/segmentio/kafka-go"
4+
5+
type Balancer kafka.Balancer
6+
7+
func GetBalancerCRC32() Balancer {
8+
return &kafka.CRC32Balancer{}
9+
}
10+
11+
func GetBalancerHash() Balancer {
12+
return &kafka.Hash{}
13+
}
14+
15+
func GetBalancerLeastBytes() Balancer {
16+
return &kafka.LeastBytes{}
17+
}
18+
19+
func GetBalancerMurmur2Balancer() Balancer {
20+
return &kafka.Murmur2Balancer{}
21+
}
22+
23+
func GetBalancerReferenceHash() Balancer {
24+
return &kafka.ReferenceHash{}
25+
}
26+
27+
func GetBalancerRoundRobin() Balancer {
28+
return &kafka.RoundRobin{}
29+
}

balancer_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package kafka
2+
3+
import (
4+
"reflect"
5+
"testing"
6+
)
7+
8+
func TestGetBalancerCRC32(t *testing.T) {
9+
balancer := GetBalancerCRC32()
10+
if balancer == nil {
11+
t.Error("Expected non-nil balancer, got nil")
12+
}
13+
if reflect.TypeOf(balancer).String() != "*kafka.CRC32Balancer" {
14+
t.Errorf("Expected *kafka.CRC32Balancer, got %s", reflect.TypeOf(balancer).String())
15+
}
16+
}
17+
18+
func TestGetBalancerHash(t *testing.T) {
19+
balancer := GetBalancerHash()
20+
if balancer == nil {
21+
t.Error("Expected non-nil balancer, got nil")
22+
}
23+
if reflect.TypeOf(balancer).String() != "*kafka.Hash" {
24+
t.Errorf("Expected *kafka.Hash, got %s", reflect.TypeOf(balancer).String())
25+
}
26+
}
27+
28+
func TestGetBalancerLeastBytes(t *testing.T) {
29+
balancer := GetBalancerLeastBytes()
30+
if balancer == nil {
31+
t.Error("Expected non-nil balancer, got nil")
32+
}
33+
if reflect.TypeOf(balancer).String() != "*kafka.LeastBytes" {
34+
t.Errorf("Expected *kafka.LeastBytes, got %s", reflect.TypeOf(balancer).String())
35+
}
36+
}
37+
38+
func TestGetBalancerMurmur2Balancer(t *testing.T) {
39+
balancer := GetBalancerMurmur2Balancer()
40+
if balancer == nil {
41+
t.Error("Expected non-nil balancer, got nil")
42+
}
43+
if reflect.TypeOf(balancer).String() != "*kafka.Murmur2Balancer" {
44+
t.Errorf("Expected *kafka.Murmur2Balancer, got %s", reflect.TypeOf(balancer).String())
45+
}
46+
}
47+
48+
func TestGetBalancerReferenceHash(t *testing.T) {
49+
balancer := GetBalancerReferenceHash()
50+
if balancer == nil {
51+
t.Error("Expected non-nil balancer, got nil")
52+
}
53+
if reflect.TypeOf(balancer).String() != "*kafka.ReferenceHash" {
54+
t.Errorf("Expected *kafka.ReferenceHash, got %s", reflect.TypeOf(balancer).String())
55+
}
56+
}
57+
58+
func TestGetBalancerRoundRobinh(t *testing.T) {
59+
balancer := GetBalancerRoundRobin()
60+
if balancer == nil {
61+
t.Error("Expected non-nil balancer, got nil")
62+
}
63+
if reflect.TypeOf(balancer).String() != "*kafka.RoundRobin" {
64+
t.Errorf("Expected *kafka.RoundRobin, got %s", reflect.TypeOf(balancer).String())
65+
}
66+
}

consumer_config.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ package kafka
33
import (
44
"time"
55

6+
"github.com/segmentio/kafka-go"
7+
68
"go.opentelemetry.io/otel"
79
"go.opentelemetry.io/otel/propagation"
810
"go.opentelemetry.io/otel/trace"
911

1012
kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
1113
lcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/logger"
12-
13-
"github.com/segmentio/kafka-go"
1414
)
1515

1616
type ReaderConfig kafka.ReaderConfig
@@ -84,6 +84,9 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
8484
StartOffset: kcronsumer.ToStringOffset(cfg.Reader.StartOffset),
8585
RetentionTime: cfg.Reader.RetentionTime,
8686
},
87+
Producer: kcronsumer.ProducerConfig{
88+
Balancer: cfg.RetryConfiguration.Balancer,
89+
},
8790
LogLevel: lcronsumer.Level(cfg.RetryConfiguration.LogLevel),
8891
}
8992

@@ -155,6 +158,7 @@ type RetryConfiguration struct {
155158
Rack string
156159
LogLevel LogLevel
157160
Brokers []string
161+
Balancer Balancer
158162
MaxRetry int
159163
WorkDuration time.Duration
160164
SkipMessageByHeaderFn SkipMessageByHeaderFn

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/Trendyol/kafka-konsumer/v2
33
go 1.19
44

55
require (
6-
github.com/Trendyol/kafka-cronsumer v1.5.0
6+
github.com/Trendyol/kafka-cronsumer v1.5.1
77
github.com/Trendyol/otel-kafka-konsumer v0.0.7
88
github.com/ansrivas/fiberprometheus/v2 v2.6.1
99
github.com/gofiber/fiber/v2 v2.52.1

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
github.com/Trendyol/kafka-cronsumer v1.5.0 h1:MI0/ncHrlCvOV0Ro4h9avm2izsNprBw4QfabiSnzm0U=
22
github.com/Trendyol/kafka-cronsumer v1.5.0/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
3+
github.com/Trendyol/kafka-cronsumer v1.5.1 h1:L8RLxo8zSGOfVpjtXLUqL3PsJLZdeoFcOvN1yCY/GyQ=
4+
github.com/Trendyol/kafka-cronsumer v1.5.1/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
35
github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4=
46
github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
57
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=

test/integration/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ require (
1010
)
1111

1212
require (
13-
github.com/Trendyol/kafka-cronsumer v1.5.0 // indirect
13+
github.com/Trendyol/kafka-cronsumer v1.5.1 // indirect
1414
github.com/Trendyol/otel-kafka-konsumer v0.0.7 // indirect
1515
github.com/andybalholm/brotli v1.0.5 // indirect
1616
github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect

test/integration/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
github.com/Trendyol/kafka-cronsumer v1.5.0 h1:MI0/ncHrlCvOV0Ro4h9avm2izsNprBw4QfabiSnzm0U=
22
github.com/Trendyol/kafka-cronsumer v1.5.0/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
3+
github.com/Trendyol/kafka-cronsumer v1.5.1 h1:L8RLxo8zSGOfVpjtXLUqL3PsJLZdeoFcOvN1yCY/GyQ=
4+
github.com/Trendyol/kafka-cronsumer v1.5.1/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
35
github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4=
46
github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
57
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=

0 commit comments

Comments
 (0)