@@ -94,14 +94,20 @@ func (k *kafkaStream) Read() (*store.ReadMessageContent, error) {
9494 return nil , errKafkaClosed
9595 }
9696 k .creditSemaphore .Acquire (1 ) // TODO: Size-based credits
97- return k .kafkaConverter (m ), nil
97+ c := k .kafkaConverter (m )
98+ c .Message .Message .SequenceNumber = common .Int64Ptr (k .getNextSequenceNumber ())
99+ return c , nil
98100}
99101
100102// ResponseHeaders returns the response headers sent from the server. This will block until server headers have been received.
101103func (k * kafkaStream ) ResponseHeaders () (map [string ]string , error ) {
102104 return nil , errors .New (`unimplemented` )
103105}
104106
107+ func (k * kafkaStream ) getNextSequenceNumber () int64 {
108+ return atomic .AddInt64 (& k .seqNo , 1 )
109+ }
110+
105111/*
106112 * Setup & Utility
107113 */
@@ -114,55 +120,52 @@ func OpenKafkaStream(c <-chan *s.ConsumerMessage, kafkaMessageConverter KafkaMes
114120 kafkaConverter : kafkaMessageConverter ,
115121 }
116122 if k .kafkaConverter == nil {
117- k .kafkaConverter = GetDefaultKafkaMessageConverter (& k . seqNo , k .logger )
123+ k .kafkaConverter = GetDefaultKafkaMessageConverter (k .logger )
118124 }
119125 return k
120126}
121127
122128// GetDefaultKafkaMessageConverter returns the default kafka message converter
123- func GetDefaultKafkaMessageConverter (seqNo * int64 , logger bark.Logger ) KafkaMessageConverter {
129+ func GetDefaultKafkaMessageConverter (logger bark.Logger ) KafkaMessageConverter {
124130 return func (m * s.ConsumerMessage ) (c * store.ReadMessageContent ) {
125131 c = & store.ReadMessageContent {
126132 Type : store .ReadMessageContentTypePtr (store .ReadMessageContentType_MESSAGE ),
127- }
128-
129- c .Message = & store.ReadMessage {
130- Address : common .Int64Ptr (
131- int64 (kafkaAddresser .GetStoreAddress (
132- & TopicPartition {
133- Topic : m .Topic ,
134- Partition : m .Partition ,
135- },
136- m .Offset ,
137- func () bark.Logger {
138- return logger .WithFields (bark.Fields {
139- `module` : `kafkaStream` ,
133+ Message : & store.ReadMessage {
134+ Address : common .Int64Ptr (
135+ int64 (kafkaAddresser .GetStoreAddress (
136+ & TopicPartition {
137+ Topic : m .Topic ,
138+ Partition : m .Partition ,
139+ },
140+ m .Offset ,
141+ func () bark.Logger {
142+ return logger .WithFields (bark.Fields {
143+ `module` : `kafkaStream` ,
144+ `topic` : m .Topic ,
145+ `partition` : m .Partition ,
146+ })
147+ },
148+ ))),
149+ Message : & store.AppendMessage {
150+ Payload : & cherami.PutMessage {
151+ Data : m .Value ,
152+ UserContext : map [string ]string {
153+ `key` : string (m .Key ),
140154 `topic` : m .Topic ,
141- `partition` : m .Partition ,
142- })
155+ `partition` : strconv .Itoa (int (m .Partition )),
156+ `offset` : strconv .Itoa (int (m .Offset )),
157+ },
158+ // TODO: Checksum?
143159 },
144- ))),
145- }
146-
147- c .Message .Message = & store.AppendMessage {
148- SequenceNumber : common .Int64Ptr (atomic .AddInt64 (seqNo , 1 )),
160+ },
161+ },
149162 }
150163
151164 if ! m .Timestamp .IsZero () {
152165 // only set if kafka is version 0.10+
153166 c .Message .Message .EnqueueTimeUtc = common .Int64Ptr (m .Timestamp .UnixNano ())
154167 }
155168
156- c .Message .Message .Payload = & cherami.PutMessage {
157- Data : m .Value ,
158- UserContext : map [string ]string {
159- `key` : string (m .Key ),
160- `topic` : m .Topic ,
161- `partition` : strconv .Itoa (int (m .Partition )),
162- `offset` : strconv .Itoa (int (m .Offset )),
163- },
164- // TODO: Checksum?
165- }
166169 return c
167170 }
168171}
0 commit comments