Skip to content

Commit 58b3fd7

Browse files
feat: added support for message timestamp (#915)
* feat: added support for message timestamp Signed-off-by: Antonio Pedro <[email protected]> * added ConsumerIT tests Signed-off-by: Antonio Pedro <[email protected]> * SuppressWarnings checkstyle's ParameterNumber rule Signed-off-by: Antonio Pedro <[email protected]> * Update generated documentation Signed-off-by: Paolo Patierno <[email protected]> Signed-off-by: Antonio Pedro <[email protected]> * feat: added support for message timestamp Signed-off-by: Antonio Pedro <[email protected]> * added ConsumerIT tests Signed-off-by: Antonio Pedro <[email protected]> * SuppressWarnings checkstyle's ParameterNumber rule Signed-off-by: Antonio Pedro <[email protected]> * Update generated documentation Signed-off-by: Paolo Patierno <[email protected]> Signed-off-by: Antonio Pedro <[email protected]> * added CHANGELOG and fix nits Signed-off-by: Antonio Pedro <[email protected]> * added ConsumerIT tests Signed-off-by: Antonio Pedro <[email protected]> * SuppressWarnings checkstyle's ParameterNumber rule Signed-off-by: Antonio Pedro <[email protected]> * added CHANGELOG and fix nits Signed-off-by: Antonio Pedro <[email protected]> * Minor refactoring on the CHANGELOG Signed-off-by: Paolo Patierno <[email protected]> --------- Signed-off-by: Antonio Pedro <[email protected]> Signed-off-by: Paolo Patierno <[email protected]> Co-authored-by: Paolo Patierno <[email protected]>
1 parent 5a995e7 commit 58b3fd7

File tree

11 files changed

+236
-62
lines changed

11 files changed

+236
-62
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
## 0.30.0
44

55
* Dependency updates (Kafka 3.7.1, Vert.x 4.5.9, Netty 4.1.111.Final, Prometheus JMX Collector 1.0.1, Prometheus Client 1.3.1)
6+
* Added support for message timestamp.
7+
* Setting the timestamp on a message sent via the `send` API.
8+
* Getting the timestamp on receiving a message via the `poll` API.
69

710
## 0.29.0
811

documentation/book/api/definitions.adoc

+4
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ __optional__|<<_kafkaheaderlist,KafkaHeaderList>>
5555
__optional__|integer (int64)
5656
|**partition** +
5757
__optional__|integer (int32)
58+
|**timestamp** +
59+
__optional__|integer (int64)
5860
|**topic** +
5961
__optional__|string
6062
|===
@@ -222,6 +224,8 @@ __optional__|< <<_partition,Partition>> > array
222224
__optional__|<<_kafkaheaderlist,KafkaHeaderList>>
223225
|**partition** +
224226
__optional__|integer (int32)
227+
|**timestamp** +
228+
__optional__|integer (int64)
225229
|===
226230

227231

src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424
/**
2525
* Implementation of a message converter to deal with the "binary" embedded data format
2626
*/
27+
@SuppressWarnings("checkstyle:NPathComplexity")
2728
public class HttpBinaryMessageConverter implements MessageConverter<byte[], byte[], byte[], byte[]> {
2829

2930
@Override
3031
public ProducerRecord<byte[], byte[]> toKafkaRecord(String kafkaTopic, Integer partition, byte[] message) {
3132

3233
Integer partitionFromBody = null;
34+
Long timestamp = null;
3335
byte[] key = null;
3436
byte[] value = null;
3537
Headers headers = new RecordHeaders();
@@ -43,6 +45,9 @@ public ProducerRecord<byte[], byte[]> toKafkaRecord(String kafkaTopic, Integer p
4345
if (json.has("value")) {
4446
value = DatatypeConverter.parseBase64Binary(json.get("value").asText());
4547
}
48+
if (json.has("timestamp")) {
49+
timestamp = json.get("timestamp").asLong();
50+
}
4651
if (json.has("headers")) {
4752
ArrayNode jsonArray = (ArrayNode) json.get("headers");
4853
for (JsonNode jsonObject: jsonArray) {
@@ -59,7 +64,7 @@ public ProducerRecord<byte[], byte[]> toKafkaRecord(String kafkaTopic, Integer p
5964
partitionFromBody = partition;
6065
}
6166
}
62-
return new ProducerRecord<>(kafkaTopic, partitionFromBody, key, value, headers);
67+
return new ProducerRecord<>(kafkaTopic, partitionFromBody, timestamp, key, value, headers);
6368
}
6469

6570
@Override
@@ -98,6 +103,7 @@ public byte[] toMessages(ConsumerRecords<byte[], byte[]> records) {
98103
DatatypeConverter.printBase64Binary(record.value()) : null);
99104
jsonObject.put("partition", record.partition());
100105
jsonObject.put("offset", record.offset());
106+
jsonObject.put("timestamp", record.timestamp());
101107

102108
ArrayNode headers = JsonUtils.createArrayNode();
103109

src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424
/**
2525
* Implementation of a message converter to deal with the "json" embedded data format
2626
*/
27+
@SuppressWarnings("checkstyle:NPathComplexity")
2728
public class HttpJsonMessageConverter implements MessageConverter<byte[], byte[], byte[], byte[]> {
2829

2930
@Override
3031
public ProducerRecord<byte[], byte[]> toKafkaRecord(String kafkaTopic, Integer partition, byte[] message) {
3132

3233
Integer partitionFromBody = null;
34+
Long timestamp = null;
3335
byte[] key = null;
3436
byte[] value = null;
3537
Headers headers = new RecordHeaders();
@@ -43,9 +45,12 @@ public ProducerRecord<byte[], byte[]> toKafkaRecord(String kafkaTopic, Integer p
4345
if (json.has("value")) {
4446
value = JsonUtils.jsonToBytes(json.get("value"));
4547
}
48+
if (json.has("timestamp")) {
49+
timestamp = json.get("timestamp").asLong();
50+
}
4651
if (json.has("headers")) {
4752
ArrayNode jsonArray = (ArrayNode) json.get("headers");
48-
for (JsonNode jsonObject: jsonArray) {
53+
for (JsonNode jsonObject : jsonArray) {
4954
headers.add(new RecordHeader(jsonObject.get("key").asText(), DatatypeConverter.parseBase64Binary(jsonObject.get("value").asText())));
5055
}
5156
}
@@ -59,7 +64,7 @@ public ProducerRecord<byte[], byte[]> toKafkaRecord(String kafkaTopic, Integer p
5964
partitionFromBody = partition;
6065
}
6166
}
62-
return new ProducerRecord<>(kafkaTopic, partitionFromBody, key, value, headers);
67+
return new ProducerRecord<>(kafkaTopic, partitionFromBody, timestamp, key, value, headers);
6368
}
6469

6570
@Override
@@ -97,10 +102,11 @@ public byte[] toMessages(ConsumerRecords<byte[], byte[]> records) {
97102
JsonUtils.bytesToJson(record.value()) : null);
98103
jsonObject.put("partition", record.partition());
99104
jsonObject.put("offset", record.offset());
105+
jsonObject.put("timestamp", record.timestamp());
100106

101107
ArrayNode headers = JsonUtils.createArrayNode();
102108

103-
for (Header kafkaHeader: record.headers()) {
109+
for (Header kafkaHeader : record.headers()) {
104110
ObjectNode header = JsonUtils.createObjectNode();
105111

106112
header.put("key", kafkaHeader.key());

src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class HttpTextMessageConverter implements MessageConverter<byte[], byte[]
3030
public ProducerRecord<byte[], byte[]> toKafkaRecord(String kafkaTopic, Integer partition, byte[] message) {
3131

3232
Integer partitionFromBody = null;
33+
Long timestamp = null;
3334
byte[] key = null;
3435
byte[] value = null;
3536
Headers headers = new RecordHeaders();
@@ -51,6 +52,9 @@ public ProducerRecord<byte[], byte[]> toKafkaRecord(String kafkaTopic, Integer p
5152
}
5253
value = valueNode.asText().getBytes();
5354
}
55+
if (json.has("timestamp")) {
56+
timestamp = json.get("timestamp").asLong();
57+
}
5458
if (json.has("headers")) {
5559
ArrayNode jsonArray = (ArrayNode) json.get("headers");
5660
for (JsonNode jsonObject : jsonArray) {
@@ -67,7 +71,7 @@ public ProducerRecord<byte[], byte[]> toKafkaRecord(String kafkaTopic, Integer p
6771
partitionFromBody = partition;
6872
}
6973
}
70-
return new ProducerRecord<>(kafkaTopic, partitionFromBody, key, value, headers);
74+
return new ProducerRecord<>(kafkaTopic, partitionFromBody, timestamp, key, value, headers);
7175
}
7276

7377
@Override
@@ -101,6 +105,7 @@ public byte[] toMessages(ConsumerRecords<byte[], byte[]> records) {
101105
jsonObject.set("value", record.value() != null ? new TextNode(new String(record.value())) : null);
102106
jsonObject.put("partition", record.partition());
103107
jsonObject.put("offset", record.offset());
108+
jsonObject.put("timestamp", record.timestamp());
104109

105110
ArrayNode headers = JsonUtils.createArrayNode();
106111

src/main/resources/openapi.json

+18-4
Original file line numberDiff line numberDiff line change
@@ -778,7 +778,8 @@
778778
"foo": "bar"
779779
},
780780
"partition": 0,
781-
"offset": 2
781+
"offset": 2,
782+
"timestamp": 1591897790000
782783
},
783784
{
784785
"topic": "topic",
@@ -788,7 +789,8 @@
788789
"bar2"
789790
],
790791
"partition": 1,
791-
"offset": 3
792+
"offset": 3,
793+
"timestamp": 1591897790002
792794
}
793795
]
794796
}
@@ -1516,6 +1518,7 @@
15161518
"partition": 0,
15171519
"topic": "topic",
15181520
"value": "value1",
1521+
"timestamp": 1591897790000,
15191522
"headers": [
15201523
{
15211524
"key": "key1",
@@ -1566,6 +1569,10 @@
15661569
},
15671570
"headers": {
15681571
"$ref": "#/components/schemas/KafkaHeaderList"
1572+
},
1573+
"timestamp": {
1574+
"format": "int64",
1575+
"type": "integer"
15691576
}
15701577
},
15711578
"title": "ConsumerRecord",
@@ -1583,14 +1590,16 @@
15831590
"key": "key1",
15841591
"value": "value1",
15851592
"partition": 0,
1586-
"offset": 2
1593+
"offset": 2,
1594+
"timestamp": 1591897790000
15871595
},
15881596
{
15891597
"topic": "topic",
15901598
"key": "key2",
15911599
"value": "value2",
15921600
"partition": 1,
1593-
"offset": 3
1601+
"offset": 3,
1602+
"timestamp": 1591897790000
15941603
}
15951604
]
15961605
},
@@ -1846,6 +1855,10 @@
18461855
"format": "int32",
18471856
"type": "integer"
18481857
},
1858+
"timestamp": {
1859+
"format": "int64",
1860+
"type": "integer"
1861+
},
18491862
"value": {
18501863
"oneOf": [
18511864
{
@@ -1882,6 +1895,7 @@
18821895
"example": {
18831896
"key": "key1",
18841897
"partition": 0,
1898+
"timestamp": 1591897790000,
18851899
"value": "value1",
18861900
"headers": [
18871901
{

src/main/resources/openapiv2.json

+14-2
Original file line numberDiff line numberDiff line change
@@ -1351,6 +1351,7 @@
13511351
"partition": 0,
13521352
"topic": "topic",
13531353
"value": "value1",
1354+
"timestamp": 1591897790000,
13541355
"headers": [
13551356
{
13561357
"key": "key1",
@@ -1391,6 +1392,10 @@
13911392
},
13921393
"headers": {
13931394
"$ref": "#/definitions/KafkaHeaderList"
1395+
},
1396+
"timestamp": {
1397+
"format": "int64",
1398+
"type": "integer"
13941399
}
13951400
},
13961401
"title": "ConsumerRecord",
@@ -1408,14 +1413,16 @@
14081413
"key": "key1",
14091414
"value": "value1",
14101415
"partition": 0,
1411-
"offset": 2
1416+
"offset": 2,
1417+
"timestamp": 1591897790000
14121418
},
14131419
{
14141420
"topic": "topic",
14151421
"key": "key2",
14161422
"value": "value2",
14171423
"partition": 1,
1418-
"offset": 3
1424+
"offset": 3,
1425+
"timestamp": 1591897790002
14191426
}
14201427
]
14211428
},
@@ -1660,6 +1667,10 @@
16601667
"format": "int32",
16611668
"type": "integer"
16621669
},
1670+
"timestamp": {
1671+
"format": "int64",
1672+
"type": "integer"
1673+
},
16631674
"value": {
16641675
"type": [
16651676
"array",
@@ -1683,6 +1694,7 @@
16831694
"example": {
16841695
"key": "key1",
16851696
"partition": 0,
1697+
"timestamp": 1591897790000,
16861698
"value": "value1",
16871699
"headers": [
16881700
{

0 commit comments

Comments
 (0)