Skip to content

Commit b3a4d8e

Browse files
authored
kakfa: properly start at last offset when using a consumer group (#3629)
1 parent d7d7e67 commit b3a4d8e

File tree

3 files changed

+12
-5
lines changed

3 files changed

+12
-5
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ require (
6868
github.com/prometheus/prom2json v1.3.0
6969
github.com/r3labs/diff/v2 v2.14.1
7070
github.com/sanity-io/litter v1.5.5
71-
github.com/segmentio/kafka-go v0.4.45
71+
github.com/segmentio/kafka-go v0.4.48
7272
github.com/shirou/gopsutil/v3 v3.23.5
7373
github.com/sirupsen/logrus v1.9.3
7474
github.com/slack-go/slack v0.16.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -632,8 +632,8 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
632632
github.com/sanity-io/litter v1.5.5 h1:iE+sBxPBzoK6uaEP5Lt3fHNgpKcHXc/A2HGETy0uJQo=
633633
github.com/sanity-io/litter v1.5.5/go.mod h1:9gzJgR2i4ZpjZHsKvUXIRQVk7P+yM3e+jAF7bU2UI5U=
634634
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
635-
github.com/segmentio/kafka-go v0.4.45 h1:prqrZp1mMId4kI6pyPolkLsH6sWOUmDxmmucbL4WS6E=
636-
github.com/segmentio/kafka-go v0.4.45/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
635+
github.com/segmentio/kafka-go v0.4.48 h1:9jyu9CWK4W5W+SroCe8EffbrRZVqAOkuaLd/ApID4Vs=
636+
github.com/segmentio/kafka-go v0.4.48/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
637637
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
638638
github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8=
639639
github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I=

pkg/acquisition/modules/kafka/kafka.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,12 @@ func (k *KafkaSource) Dump() any {
157157
}
158158

159159
func (k *KafkaSource) ReadMessage(ctx context.Context, out chan types.Event) error {
160-
// Start processing from latest Offset
161-
k.Reader.SetOffsetAt(ctx, time.Now())
160+
if k.Config.GroupID == "" {
161+
err := k.Reader.SetOffset(kafka.LastOffset)
162+
if err != nil {
163+
return fmt.Errorf("while setting offset for reader on topic '%s': %w", k.Config.Topic, err)
164+
}
165+
}
162166

163167
for {
164168
k.logger.Tracef("reading message from topic '%s'", k.Config.Topic)
@@ -279,6 +283,7 @@ func (kc *KafkaConfiguration) NewDialer() (*kafka.Dialer, error) {
279283
}
280284
dialer.TLS = tlsConfig
281285
}
286+
282287
return dialer, nil
283288
}
284289

@@ -297,6 +302,8 @@ func (kc *KafkaConfiguration) NewReader(dialer *kafka.Dialer, logger *log.Entry)
297302

298303
if kc.GroupID != "" {
299304
rConf.GroupID = kc.GroupID
305+
// kafka-go does not support calling SetOffset while using a consumer group
306+
rConf.StartOffset = kafka.LastOffset
300307
} else if kc.Partition != 0 {
301308
rConf.Partition = kc.Partition
302309
} else {

0 commit comments

Comments
 (0)