diff --git a/CHANGELOG.md b/CHANGELOG.md index 912b7465..21fa8d54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ ## 0.30.0 * Dependency updates (Kafka 3.7.1, Vert.x 4.5.9, Netty 4.1.111.Final, Prometheus JMX Collector 1.0.1, Prometheus Client 1.3.1) +* Added support for message timestamp. + * Setting the timestamp on a message sent via the `send` API. + * Getting the timestamp on receiving a message via the `poll` API. ## 0.29.0 diff --git a/documentation/book/api/definitions.adoc b/documentation/book/api/definitions.adoc index 7b6f521e..a4183f53 100644 --- a/documentation/book/api/definitions.adoc +++ b/documentation/book/api/definitions.adoc @@ -55,6 +55,8 @@ __optional__|<<_kafkaheaderlist,KafkaHeaderList>> __optional__|integer (int64) |**partition** + __optional__|integer (int32) +|**timestamp** + +__optional__|integer (int64) |**topic** + __optional__|string |=== @@ -222,6 +224,8 @@ __optional__|< <<_partition,Partition>> > array __optional__|<<_kafkaheaderlist,KafkaHeaderList>> |**partition** + __optional__|integer (int32) +|**timestamp** + +__optional__|integer (int64) |=== diff --git a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java index 5a6fc0d9..6a779ca1 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java @@ -24,12 +24,14 @@ /** * Implementation of a message converter to deal with the "binary" embedded data format */ +@SuppressWarnings("checkstyle:NPathComplexity") public class HttpBinaryMessageConverter implements MessageConverter { @Override public ProducerRecord toKafkaRecord(String kafkaTopic, Integer partition, byte[] message) { Integer partitionFromBody = null; + Long timestamp = null; byte[] key = null; byte[] value = null; Headers headers = new RecordHeaders(); @@ -43,6 +45,9 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p if (json.has("value")) { value = DatatypeConverter.parseBase64Binary(json.get("value").asText()); } + if (json.has("timestamp")) { + timestamp = json.get("timestamp").asLong(); + } if (json.has("headers")) { ArrayNode jsonArray = (ArrayNode) json.get("headers"); for (JsonNode jsonObject: jsonArray) { @@ -59,7 +64,7 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p partitionFromBody = partition; } } - return new ProducerRecord<>(kafkaTopic, partitionFromBody, key, value, headers); + return new ProducerRecord<>(kafkaTopic, partitionFromBody, timestamp, key, value, headers); } @Override @@ -98,6 +103,7 @@ public byte[] toMessages(ConsumerRecords records) { DatatypeConverter.printBase64Binary(record.value()) : null); jsonObject.put("partition", record.partition()); jsonObject.put("offset", record.offset()); + jsonObject.put("timestamp", record.timestamp()); ArrayNode headers = JsonUtils.createArrayNode(); diff --git a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java index 7077d35d..589f2c2f 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java @@ -24,12 +24,14 @@ /** * Implementation of a message converter to deal with the "json" embedded data format */ +@SuppressWarnings("checkstyle:NPathComplexity") public class HttpJsonMessageConverter implements MessageConverter { @Override public ProducerRecord toKafkaRecord(String kafkaTopic, Integer partition, byte[] message) { Integer partitionFromBody = null; + Long timestamp = null; byte[] key = null; byte[] value = null; Headers headers = new RecordHeaders(); @@ -43,9 +45,12 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p if (json.has("value")) { value = JsonUtils.jsonToBytes(json.get("value")); } + if (json.has("timestamp")) { + timestamp = json.get("timestamp").asLong(); + } if (json.has("headers")) { ArrayNode jsonArray = (ArrayNode) json.get("headers"); - for (JsonNode jsonObject: jsonArray) { + for (JsonNode jsonObject : jsonArray) { headers.add(new RecordHeader(jsonObject.get("key").asText(), DatatypeConverter.parseBase64Binary(jsonObject.get("value").asText()))); } } @@ -59,7 +64,7 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p partitionFromBody = partition; } } - return new ProducerRecord<>(kafkaTopic, partitionFromBody, key, value, headers); + return new ProducerRecord<>(kafkaTopic, partitionFromBody, timestamp, key, value, headers); } @Override @@ -97,10 +102,11 @@ public byte[] toMessages(ConsumerRecords records) { JsonUtils.bytesToJson(record.value()) : null); jsonObject.put("partition", record.partition()); jsonObject.put("offset", record.offset()); + jsonObject.put("timestamp", record.timestamp()); ArrayNode headers = JsonUtils.createArrayNode(); - for (Header kafkaHeader: record.headers()) { + for (Header kafkaHeader : record.headers()) { ObjectNode header = JsonUtils.createObjectNode(); header.put("key", kafkaHeader.key()); diff --git a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java index 366eab3c..0efc790f 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java @@ -30,6 +30,7 @@ public class HttpTextMessageConverter implements MessageConverter toKafkaRecord(String kafkaTopic, Integer partition, byte[] message) { Integer partitionFromBody = null; + Long timestamp = null; byte[] key = null; byte[] value = null; Headers headers = new RecordHeaders(); @@ -51,6 +52,9 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p } value = valueNode.asText().getBytes(); } + if (json.has("timestamp")) { + timestamp = json.get("timestamp").asLong(); + } if (json.has("headers")) { ArrayNode jsonArray = (ArrayNode) json.get("headers"); for (JsonNode jsonObject : jsonArray) { @@ -67,7 +71,7 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p partitionFromBody = partition; } } - return new ProducerRecord<>(kafkaTopic, partitionFromBody, key, value, headers); + return new ProducerRecord<>(kafkaTopic, partitionFromBody, timestamp, key, value, headers); } @Override @@ -101,6 +105,7 @@ public byte[] toMessages(ConsumerRecords records) { jsonObject.set("value", record.value() != null ? new TextNode(new String(record.value())) : null); jsonObject.put("partition", record.partition()); jsonObject.put("offset", record.offset()); + jsonObject.put("timestamp", record.timestamp()); ArrayNode headers = JsonUtils.createArrayNode(); diff --git a/src/main/resources/openapi.json b/src/main/resources/openapi.json index 80a86262..e617327a 100644 --- a/src/main/resources/openapi.json +++ b/src/main/resources/openapi.json @@ -778,7 +778,8 @@ "foo": "bar" }, "partition": 0, - "offset": 2 + "offset": 2, + "timestamp": 1591897790000 }, { "topic": "topic", @@ -788,7 +789,8 @@ "bar2" ], "partition": 1, - "offset": 3 + "offset": 3, + "timestamp": 1591897790002 } ] } @@ -1516,6 +1518,7 @@ "partition": 0, "topic": "topic", "value": "value1", + "timestamp": 1591897790000, "headers": [ { "key": "key1", @@ -1566,6 +1569,10 @@ }, "headers": { "$ref": "#/components/schemas/KafkaHeaderList" + }, + "timestamp": { + "format": "int64", + "type": "integer" } }, "title": "ConsumerRecord", @@ -1583,14 +1590,16 @@ "key": "key1", "value": "value1", "partition": 0, - "offset": 2 + "offset": 2, + "timestamp": 1591897790000 }, { "topic": "topic", "key": "key2", "value": "value2", "partition": 1, - "offset": 3 + "offset": 3, + "timestamp": 1591897790000 } ] }, @@ -1846,6 +1855,10 @@ "format": "int32", "type": "integer" }, + "timestamp": { + "format": "int64", + "type": "integer" + }, "value": { "oneOf": [ { @@ -1882,6 +1895,7 @@ "example": { "key": "key1", "partition": 0, + "timestamp": 1591897790000, "value": "value1", "headers": [ { diff --git a/src/main/resources/openapiv2.json b/src/main/resources/openapiv2.json index f27fb741..c5f6430a 100644 --- a/src/main/resources/openapiv2.json +++ b/src/main/resources/openapiv2.json @@ -1351,6 +1351,7 @@ "partition": 0, "topic": "topic", "value": "value1", + "timestamp": 1591897790000, "headers": [ { "key": "key1", @@ -1391,6 +1392,10 @@ }, "headers": { "$ref": "#/definitions/KafkaHeaderList" + }, + "timestamp": { + "format": "int64", + "type": "integer" } }, "title": "ConsumerRecord", @@ -1408,14 +1413,16 @@ "key": "key1", "value": "value1", "partition": 0, - "offset": 2 + "offset": 2, + "timestamp": 1591897790000 }, { "topic": "topic", "key": "key2", "value": "value2", "partition": 1, - "offset": 3 + "offset": 3, + "timestamp": 1591897790002 } ] }, @@ -1660,6 +1667,10 @@ "format": "int32", "type": "integer" }, + "timestamp": { + "format": "int64", + "type": "integer" + }, "value": { "type": [ "array", @@ -1683,6 +1694,7 @@ "example": { "key": "key1", "partition": 0, + "timestamp": 1591897790000, "value": "value1", "headers": [ { diff --git a/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java b/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java index 5c51ab8a..cfcb691d 100644 --- a/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java +++ b/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java @@ -35,17 +35,20 @@ public BasicKafkaClient(String bootstrapServer) { /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param timeoutMs timeout for the sending messages - * @param topicName topic name where messages are send - * @param messageCount message count - * @param headers kafka headers - * @param message specific message to send - * @param partition partition count, how many shards/partitions will topic have + * + * @param timeoutMs timeout for the sending messages + * @param topicName topic name where messages are send + * @param messageCount message count + * @param headers kafka headers + * @param message specific message to send + * @param partition partition count, how many shards/partitions will topic have + * @param timestamp timestamp of the message * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ + @SuppressWarnings("checkstyle:ParameterNumber") public int sendStringMessagesPlain(long timeoutMs, String topicName, int messageCount, List headers, - String message, int partition, boolean withNullKeyRecord) { + String message, int partition, Long timestamp, boolean withNullKeyRecord) { CompletableFuture resultPromise = new CompletableFuture<>(); IntPredicate msgCntPredicate = x -> x == messageCount; @@ -57,7 +60,7 @@ public int sendStringMessagesPlain(long timeoutMs, String topicName, int message properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "producer-sender-plain-"); properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name); - try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition) + try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition, timestamp) .withProperties(properties) .withHeaders(headers) .withNullKeyRecord(withNullKeyRecord) @@ -74,46 +77,51 @@ public int sendStringMessagesPlain(long timeoutMs, String topicName, int message /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send + * + * @param topicName topic name where messages are send * @param messageCount message count * @return sent message count */ public int sendStringMessagesPlain(String topicName, int messageCount) { return sendStringMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, - List.of(), "\"Hello\" : \"World\"", 0, false); + List.of(), "\"Hello\" : \"World\"", 0, null, false); } /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send - * @param message content to be sent + * + * @param topicName topic name where messages are send + * @param message content to be sent * @param messageCount message count * @param partition partition, which will be selected * @return sent message count */ public int sendStringMessagesPlain(String topicName, String message, int messageCount, int partition) { return sendStringMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, - List.of(), message, partition, true); + List.of(), message, partition, null, true); } public int sendStringMessagesPlain(String topicName, String message, int messageCount, int partition, boolean withNullKeyRecord) { return sendStringMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, - List.of(), message, partition, withNullKeyRecord); + List.of(), message, partition, null, withNullKeyRecord); } /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param timeoutMs timeout for the sending messages - * @param topicName topic name where messages are send - * @param messageCount message count - * @param headers kafka headers - * @param message specific message to send - * @param partition partition count, how many shards/partitions will topic have + * + * @param timeoutMs timeout for the sending messages + * @param topicName topic name where messages are send + * @param messageCount message count + * @param headers kafka headers + * @param message specific message to send + * @param partition partition count, how many shards/partitions will topic have + * @param timestamp timestamp of the message * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ + @SuppressWarnings("checkstyle:ParameterNumber") public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCount, List headers, - String message, int partition, boolean withNullKeyRecord) { + String message, int partition, Long timestamp, boolean withNullKeyRecord) { CompletableFuture resultPromise = new CompletableFuture<>(); IntPredicate msgCntPredicate = x -> x == messageCount; @@ -124,7 +132,7 @@ public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCo properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServer); properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "producer-sender-plain-" + new Random().nextInt(Integer.MAX_VALUE)); properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name); - try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition) + try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition, timestamp) .withProperties(properties) .withHeaders(headers) .withNullKeyRecord(withNullKeyRecord) @@ -140,36 +148,52 @@ public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCo /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send + * + * @param topicName topic name where messages are send * @param messageCount message count - * @param message specific message to send - * @param partition partition count, how many shards/partitions will topic have + * @param message specific message to send + * @param timestamp timestamp of the message + * @return sent message count + */ + public int sendJsonMessagesPlain(String topicName, int messageCount, String message, Long timestamp) { + return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(), + message, 0, timestamp, false); + } + + /** + * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting + * + * @param topicName topic name where messages are send + * @param messageCount message count + * @param message specific message to send + * @param partition partition count, how many shards/partitions will topic have * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ public int sendJsonMessagesPlain(String topicName, int messageCount, String message, int partition, boolean withNullKeyRecord) { return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(), - message, partition, withNullKeyRecord); + message, partition, null, withNullKeyRecord); } /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send - * @param messageCount message count - * @param headers kafka headers - * @param message specific message to send + * + * @param topicName topic name where messages are send + * @param messageCount message count + * @param headers kafka headers + * @param message specific message to send * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ public int sendJsonMessagesPlain(String topicName, int messageCount, List headers, String message, boolean withNullKeyRecord) { - return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, headers, message, 0, - withNullKeyRecord); + return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, headers, message, 0, null, withNullKeyRecord); } /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send + * + * @param topicName topic name where messages are send * @param messageCount message count * @param message specific message to send * @param partition partition count, how many shards/partitions will topic have @@ -177,36 +201,39 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, List implements AutoCloseabl private final List headers; private final String message; private final int partition; + private final Long timestamp; private final boolean withNullKeyRecord; public Producer(ProducerBuilder producerBuilder) { @@ -43,6 +44,7 @@ public Producer(ProducerBuilder producerBuilder) { this.headers = producerBuilder.headers; this.message = producerBuilder.message; this.partition = producerBuilder.partition; + this.timestamp = producerBuilder.timestamp; this.withNullKeyRecord = producerBuilder.withNullKeyRecord; this.vertx = Vertx.vertx(); } @@ -62,9 +64,9 @@ protected void handleClient() { resultPromise.complete(numSent.get()); } }); - vertx.setPeriodic(1000, id -> sendNext(producer, topic, headers, message, partition, withNullKeyRecord)); + vertx.setPeriodic(1000, id -> sendNext(producer, topic, headers, message, partition, timestamp, withNullKeyRecord)); } else { - sendNext(producer, topic, headers, message, partition, withNullKeyRecord); + sendNext(producer, topic, headers, message, partition, timestamp, withNullKeyRecord); } } @@ -77,15 +79,15 @@ public void close() { } private void sendNext(KafkaProducer producer, String topic, List headers, - String message, int partition, boolean withNullKeyRecord) { + String message, int partition, Long timestamp, boolean withNullKeyRecord) { if (msgCntPredicate.negate().test(numSent.get())) { KafkaProducerRecord record; if (withNullKeyRecord) { - record = KafkaProducerRecord.create(topic, null, message, partition); + record = KafkaProducerRecord.create(topic, null, message, timestamp, partition); } else { - record = KafkaProducerRecord.create(topic, "key-" + numSent.get(), message + "-" + numSent.get(), partition); + record = KafkaProducerRecord.create(topic, "key-" + numSent.get(), message + "-" + numSent.get(), timestamp, partition); } record.addHeaders(headers); @@ -104,12 +106,12 @@ record = KafkaProducerRecord.create(topic, "key-" + numSent.get(), message + "-" } if (msgCntPredicate.negate().test(-1)) { - sendNext(producer, topic, headers, message, partition, withNullKeyRecord); + sendNext(producer, topic, headers, message, partition, timestamp, withNullKeyRecord); } } else { LOGGER.error("Producer cannot connect to topic {}", topic, done.cause()); - sendNext(producer, topic, headers, message, partition, withNullKeyRecord); + sendNext(producer, topic, headers, message, partition, timestamp, withNullKeyRecord); } }); @@ -137,18 +139,19 @@ public static class ProducerBuilder { private final String topic; private final String message; private final int partition; - + private final Long timestamp; private Properties properties; private List headers = Collections.emptyList(); private boolean withNullKeyRecord = false; public ProducerBuilder(CompletableFuture resultPromise, IntPredicate msgCntPredicate, String topic, - String message, int partition) { + String message, int partition, Long timestamp) { this.resultPromise = resultPromise; this.msgCntPredicate = msgCntPredicate; this.topic = topic; this.message = message; this.partition = partition; + this.timestamp = timestamp; } public ProducerBuilder withProperties(Properties properties) { diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java index 1bd287e5..f9d5776d 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java @@ -533,6 +533,49 @@ void receiveSimpleMessage(VertxTestContext context) throws InterruptedException, assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true)); } + @Test + void receiveMessageWithTimestamp(VertxTestContext context) throws InterruptedException, ExecutionException, TimeoutException { + KafkaFuture future = adminClientFacade.createTopic(topic); + + future.get(); + String sentBody = "Simple message"; + long timestamp = System.currentTimeMillis(); + basicKafkaClient.sendJsonMessagesPlain(topic, 1, sentBody, timestamp); + + // create consumer + // subscribe to a topic + consumerService() + .createConsumer(context, groupId, consumerJson) + .subscribeConsumer(context, groupId, name, topic); + + CompletableFuture consume = new CompletableFuture<>(); + // consume records + consumerService() + .consumeRecordsRequest(groupId, name, BridgeContentType.KAFKA_JSON_JSON) + .as(BodyCodec.jsonArray()) + .send(ar -> { + context.verify(() -> { + assertThat(ar.succeeded(), is(true)); + HttpResponse response = ar.result(); + assertThat(response.statusCode(), is(HttpResponseStatus.OK.code())); + JsonObject jsonResponse = response.body().getJsonObject(0); + + long recordTimestamp = jsonResponse.getLong("timestamp"); + + assertThat(recordTimestamp, is(timestamp)); + }); + consume.complete(true); + }); + + consume.get(TEST_TIMEOUT, TimeUnit.SECONDS); + + // consumer deletion + consumerService() + .deleteConsumer(context, groupId, name); + context.completeNow(); + assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true)); + + } @Test void receiveTextMessage(VertxTestContext context) throws InterruptedException, ExecutionException, TimeoutException { KafkaFuture future = adminClientFacade.createTopic(topic); diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java index dae57e90..252f9519 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java @@ -107,7 +107,6 @@ void sendSimpleMessageToPartition(VertxTestContext context) throws InterruptedEx KafkaFuture future = adminClientFacade.createTopic(topic, 2, 1); String value = "message-value"; - int partition = 1; JsonArray records = new JsonArray(); @@ -153,6 +152,57 @@ void sendSimpleMessageToPartition(VertxTestContext context) throws InterruptedEx assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true)); } + @Test + void sendSimpleMessageWithTimestamp(VertxTestContext context) throws InterruptedException, ExecutionException { + KafkaFuture future = adminClientFacade.createTopic(topic); + + String value = "message-value"; + long timestamp = System.currentTimeMillis(); + + JsonArray records = new JsonArray(); + JsonObject json = new JsonObject(); + json.put("value", value); + json.put("timestamp", timestamp); + records.add(json); + + JsonObject root = new JsonObject(); + root.put("records", records); + + future.get(); + + producerService() + .sendRecordsRequest(topic, root, BridgeContentType.KAFKA_JSON_JSON) + .sendJsonObject(root, verifyOK(context)); + + Properties consumerProperties = Consumer.fillDefaultProperties(); + consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUri); + + KafkaConsumer consumer = KafkaConsumer.create(vertx, consumerProperties, + new StringDeserializer(), new KafkaJsonDeserializer<>(String.class)); + consumer.handler(record -> { + context.verify(() -> { + assertThat(record.value(), is(value)); + assertThat(record.topic(), is(topic)); + assertThat(record.partition(), is(0)); + assertThat(record.offset(), is(0L)); + assertThat(record.key(), nullValue()); + assertThat(record.timestamp(), is(timestamp)); + }); + LOGGER.info("Message consumed topic={} partition={} offset={}, key={}, value={}, timestamp={}", + record.topic(), record.partition(), record.offset(), record.key(), record.value(), record.timestamp()); + consumer.close(); + context.completeNow(); + }); + + consumer.subscribe(topic, done -> { + if (!done.succeeded()) { + context.failNow(done.cause()); + } + }); + + assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true)); + } + @Test void sendSimpleMessageWithKey(VertxTestContext context) throws InterruptedException, ExecutionException { KafkaFuture future = adminClientFacade.createTopic(topic, 2, 1);