Skip to content

Commit 8087ca2

Browse files
author
A.Samet İleri
authored
Merge pull request #59 from Trendyol/feature/#55
feat: add error description to message header
2 parents fd90ec8 + 02e970a commit 8087ca2

9 files changed

+125
-27
lines changed

internal/consumer.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
)
1010

1111
type Consumer interface {
12-
ReadMessage(ctx context.Context) (*MessageWrapper, error)
12+
ReadMessage(ctx context.Context) (*segmentio.Message, error)
1313
Stop()
1414
}
1515

@@ -53,7 +53,7 @@ func newConsumer(kafkaConfig *kafka.Config) *kafkaConsumer {
5353
}
5454
}
5555

56-
func (k kafkaConsumer) ReadMessage(ctx context.Context) (*MessageWrapper, error) {
56+
func (k kafkaConsumer) ReadMessage(ctx context.Context) (*segmentio.Message, error) {
5757
msg, err := k.consumer.ReadMessage(ctx)
5858
if err != nil {
5959
if isContextCancelled(err) {
@@ -63,7 +63,7 @@ func (k kafkaConsumer) ReadMessage(ctx context.Context) (*MessageWrapper, error)
6363
return nil, err
6464
}
6565

66-
return NewMessageWrapper(msg), nil
66+
return &msg, nil
6767
}
6868

6969
func isContextCancelled(err error) bool {

internal/cronsumer.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,20 @@ func (k *kafkaCronsumer) Listen(ctx context.Context, strategyName string, cancel
4848
startTime := time.Now()
4949
startTimeUnixNano := startTime.UnixNano()
5050

51+
retryStrategy := kafka.GetBackoffStrategy(strategyName)
52+
5153
for {
52-
msg, err := k.kafkaConsumer.ReadMessage(ctx)
54+
m, err := k.kafkaConsumer.ReadMessage(ctx)
5355
if err != nil {
54-
k.cfg.Logger.Errorf("Message could not read, error %v", err)
56+
k.cfg.Logger.Warnf("Message could not read, error %v", err)
5557
return
5658
}
57-
if msg == nil {
59+
if m == nil {
5860
return
5961
}
6062

63+
msg := NewMessageWrapper(*m, strategyName)
64+
6165
if msg.ProduceTime >= startTimeUnixNano {
6266
(*cancelFuncWrapper)()
6367

@@ -70,8 +74,6 @@ func (k *kafkaCronsumer) Listen(ctx context.Context, strategyName string, cancel
7074
return
7175
}
7276

73-
retryStrategy := kafka.GetBackoffStrategy(strategyName)
74-
7577
if retryStrategy.String() == kafka.FixedBackOffStrategy {
7678
k.sendToMessageChannel(*msg)
7779
continue
@@ -105,6 +107,7 @@ func (k *kafkaCronsumer) GetMetric() *CronsumerMetric {
105107
func (k *kafkaCronsumer) processMessage() {
106108
for msg := range k.messageChannel {
107109
if err := k.consumeFn(msg.Message); err != nil {
110+
msg.AddHeader(createErrHeader(err))
108111
k.produce(msg)
109112
}
110113
}

internal/cronsumer_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,8 @@ type mockConsumer struct{}
196196
func (c mockConsumer) Stop() {
197197
}
198198

199-
func (c mockConsumer) ReadMessage(ctx context.Context) (*MessageWrapper, error) {
200-
return &MessageWrapper{}, nil
199+
func (c mockConsumer) ReadMessage(ctx context.Context) (*segmentio.Message, error) {
200+
return &segmentio.Message{}, nil
201201
}
202202

203203
type mockProducer struct {

internal/message.go

+23-15
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const (
1515
RetryHeaderKey = "x-retry-count"
1616
RetryAttemptHeaderKey = "x-retry-attempt-count"
1717
MessageProduceTimeHeaderKey = "x-produce-time"
18+
MessageErrHeaderKey = "x-error-message"
1819
)
1920

2021
type MessageWrapper struct {
@@ -24,22 +25,29 @@ type MessageWrapper struct {
2425
RetryAttemptCount int
2526
}
2627

27-
func NewMessageWrapper(msg segmentio.Message) *MessageWrapper {
28-
return &MessageWrapper{
29-
RetryCount: getRetryCount(&msg),
30-
RetryAttemptCount: getRetryAttemptCount(&msg),
31-
ProduceTime: getMessageProduceTime(&msg),
32-
Message: kafka.Message{
33-
Topic: msg.Topic,
34-
Partition: msg.Partition,
35-
Offset: msg.Offset,
36-
HighWaterMark: msg.HighWaterMark,
37-
Key: msg.Key,
38-
Value: msg.Value,
39-
Headers: FromHeaders(msg.Headers),
40-
Time: msg.Time,
41-
},
28+
func NewMessageWrapper(msg segmentio.Message, strategyName string) *MessageWrapper {
29+
mw := &MessageWrapper{
30+
RetryCount: getRetryCount(&msg),
31+
ProduceTime: getMessageProduceTime(&msg),
4232
}
33+
34+
// Don't add x-retry-attempt-count for fixed strategy.
35+
if strategyName != kafka.FixedBackOffStrategy {
36+
mw.RetryAttemptCount = getRetryAttemptCount(&msg)
37+
}
38+
39+
mw.Message = kafka.Message{
40+
Topic: msg.Topic,
41+
Partition: msg.Partition,
42+
Offset: msg.Offset,
43+
HighWaterMark: msg.HighWaterMark,
44+
Key: msg.Key,
45+
Value: msg.Value,
46+
Headers: FromHeaders(msg.Headers),
47+
Time: msg.Time,
48+
}
49+
50+
return mw
4351
}
4452

4553
func (m *MessageWrapper) To(increaseRetry bool, increaseRetryAttempt bool) segmentio.Message {

internal/message_header.go

+7
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@ func FromHeaders(sh []segmentio.Header) []kafka.Header {
3333
return r
3434
}
3535

36+
func createErrHeader(consumeErr error) kafka.Header {
37+
return kafka.Header{
38+
Key: MessageErrHeaderKey,
39+
Value: []byte(consumeErr.Error()),
40+
}
41+
}
42+
3643
func getRetryCount(message *segmentio.Message) int {
3744
for i := range message.Headers {
3845
if message.Headers[i].Key != RetryHeaderKey {

internal/message_header_test.go

+18
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package internal
22

33
import (
44
"bytes"
5+
"errors"
56
"strconv"
67
"testing"
78

@@ -190,3 +191,20 @@ func Test_getRetryAttempt(t *testing.T) {
190191
}
191192
})
192193
}
194+
195+
func TestMessage_CreateErrHeader(t *testing.T) {
196+
// Given
197+
e := errors.New("err")
198+
199+
// When
200+
h := createErrHeader(e)
201+
202+
// Then
203+
if h.Key != MessageErrHeaderKey {
204+
t.Fatalf("Header key must be equal to X-ErrMessage")
205+
}
206+
207+
if h.Value == nil {
208+
t.Fatalf("Header value must be present")
209+
}
210+
}

internal/message_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,19 @@ func Test_NewMessageWrapper(t *testing.T) {
2626
WriterData: "1",
2727
Time: time.Now(),
2828
}
29+
2930
// When
30-
actual := NewMessageWrapper(expected)
31+
actual := NewMessageWrapper(expected, FixedBackOffStrategy)
3132
actualHeader := actual.Headers[0]
3233
expectedHeader := expected.Headers[0]
34+
3335
// Then
3436
if actual.Topic != expected.Topic {
3537
t.Errorf("Expected: %s, Actual: %s", expected.Topic, actual.Topic)
3638
}
3739
if actual.Partition != expected.Partition {
3840
t.Errorf("Expected: %d, Actual: %d", expected.Partition, actual.Partition)
3941
}
40-
4142
if actual.Offset != expected.Offset {
4243
t.Errorf("Expected: %d, Actual: %d", expected.Offset, actual.Offset)
4344
}

pkg/kafka/cronsumer_message.go

+12
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,18 @@ func (mb *MessageBuilder) WithHighWatermark(highWaterMark int64) *MessageBuilder
6161
return mb
6262
}
6363

64+
// AddHeader works as a idempotent function
65+
func (m *Message) AddHeader(header Header) {
66+
for i := range m.Headers {
67+
if m.Headers[i].Key == header.Key {
68+
m.Headers[i].Value = header.Value
69+
return
70+
}
71+
}
72+
73+
m.Headers = append(m.Headers, header)
74+
}
75+
6476
func (mb *MessageBuilder) Build() Message {
6577
m := Message{}
6678

pkg/kafka/cronsumer_message_test.go

+49
Original file line numberDiff line numberDiff line change
@@ -129,3 +129,52 @@ func Test_WithHighWatermark(t *testing.T) {
129129
t.Errorf("Expected: %d, Actual: %d", expected, *actual)
130130
}
131131
}
132+
133+
func TestMessage_AddHeader(t *testing.T) {
134+
t.Run("When_New_Header_Comes", func(t *testing.T) {
135+
// Given
136+
m := Message{
137+
Headers: []Header{
138+
{Key: "foo", Value: []byte("fooValue")},
139+
},
140+
}
141+
142+
// When
143+
m.AddHeader(Header{Key: "bar", Value: []byte("barValue")})
144+
145+
// Then
146+
headers := m.Headers
147+
if len(headers) != 2 {
148+
t.Fatalf("Header length must be equal to 2")
149+
}
150+
if headers[1].Key != "bar" {
151+
t.Fatalf("Header key must be equal to bar")
152+
}
153+
if !bytes.Equal(headers[1].Value, []byte("barValue")) {
154+
t.Fatalf("Header value must be equal to barValue")
155+
}
156+
})
157+
t.Run("When_Same_Header_Comes", func(t *testing.T) {
158+
// Given
159+
m := Message{
160+
Headers: []Header{
161+
{Key: "foo", Value: []byte("fooValue")},
162+
},
163+
}
164+
165+
// When
166+
m.AddHeader(Header{Key: "foo", Value: []byte("barValue")})
167+
168+
// Then
169+
headers := m.Headers
170+
if len(headers) != 1 {
171+
t.Fatalf("Header length must be equal to 1")
172+
}
173+
if headers[0].Key != "foo" {
174+
t.Fatalf("Header key must be equal to foo")
175+
}
176+
if !bytes.Equal(headers[0].Value, []byte("barValue")) {
177+
t.Fatalf("Header value must be equal to barValue")
178+
}
179+
})
180+
}

0 commit comments

Comments
 (0)