From 14a9d3314ee5193b9f7f4774cb7e338883072811 Mon Sep 17 00:00:00 2001 From: Antonio Pedro Date: Wed, 3 Jul 2024 03:31:13 +0530 Subject: [PATCH 01/13] feat: added support for message timestamp Signed-off-by: Antonio Pedro --- .../converter/HttpBinaryMessageConverter.java | 7 ++- .../converter/HttpJsonMessageConverter.java | 11 ++-- .../converter/HttpTextMessageConverter.java | 7 ++- src/main/resources/openapi.json | 22 ++++++-- src/main/resources/openapiv2.json | 16 +++++- .../strimzi/kafka/bridge/http/ProducerIT.java | 52 ++++++++++++++++++- 6 files changed, 103 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java index 5a6fc0d9..5937d48f 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java @@ -30,6 +30,7 @@ public class HttpBinaryMessageConverter implements MessageConverter toKafkaRecord(String kafkaTopic, Integer partition, byte[] message) { Integer partitionFromBody = null; + Long timestamp = null; byte[] key = null; byte[] value = null; Headers headers = new RecordHeaders(); @@ -43,6 +44,9 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p if (json.has("value")) { value = DatatypeConverter.parseBase64Binary(json.get("value").asText()); } + if (json.has("timestamp")) { + timestamp = json.get("timestamp").asLong(); + } if (json.has("headers")) { ArrayNode jsonArray = (ArrayNode) json.get("headers"); for (JsonNode jsonObject: jsonArray) { @@ -59,7 +63,7 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p partitionFromBody = partition; } } - return new ProducerRecord<>(kafkaTopic, partitionFromBody, key, value, headers); + return new ProducerRecord<>(kafkaTopic, partitionFromBody, timestamp, key, value, headers); } @Override @@ -98,6 +102,7 @@ public byte[] toMessages(ConsumerRecords records) { DatatypeConverter.printBase64Binary(record.value()) : null); jsonObject.put("partition", record.partition()); jsonObject.put("offset", record.offset()); + jsonObject.put("timestamp", record.timestamp()); ArrayNode headers = JsonUtils.createArrayNode(); diff --git a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java index 7077d35d..5161ebf2 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java @@ -30,6 +30,7 @@ public class HttpJsonMessageConverter implements MessageConverter toKafkaRecord(String kafkaTopic, Integer partition, byte[] message) { Integer partitionFromBody = null; + Long timestamp = null; byte[] key = null; byte[] value = null; Headers headers = new RecordHeaders(); @@ -43,9 +44,12 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p if (json.has("value")) { value = JsonUtils.jsonToBytes(json.get("value")); } + if (json.has("timestamp")) { + timestamp = json.get("timestamp").asLong(); + } if (json.has("headers")) { ArrayNode jsonArray = (ArrayNode) json.get("headers"); - for (JsonNode jsonObject: jsonArray) { + for (JsonNode jsonObject : jsonArray) { headers.add(new RecordHeader(jsonObject.get("key").asText(), DatatypeConverter.parseBase64Binary(jsonObject.get("value").asText()))); } } @@ -59,7 +63,7 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p partitionFromBody = partition; } } - return new ProducerRecord<>(kafkaTopic, partitionFromBody, key, value, headers); + return new ProducerRecord<>(kafkaTopic, partitionFromBody, timestamp, key, value, headers); } @Override @@ -97,10 +101,11 @@ public byte[] toMessages(ConsumerRecords records) { JsonUtils.bytesToJson(record.value()) : null); jsonObject.put("partition", record.partition()); jsonObject.put("offset", record.offset()); + jsonObject.put("timestamp", record.timestamp()); ArrayNode headers = JsonUtils.createArrayNode(); - for (Header kafkaHeader: record.headers()) { + for (Header kafkaHeader : record.headers()) { ObjectNode header = JsonUtils.createObjectNode(); header.put("key", kafkaHeader.key()); diff --git a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java index 366eab3c..0efc790f 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java @@ -30,6 +30,7 @@ public class HttpTextMessageConverter implements MessageConverter toKafkaRecord(String kafkaTopic, Integer partition, byte[] message) { Integer partitionFromBody = null; + Long timestamp = null; byte[] key = null; byte[] value = null; Headers headers = new RecordHeaders(); @@ -51,6 +52,9 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p } value = valueNode.asText().getBytes(); } + if (json.has("timestamp")) { + timestamp = json.get("timestamp").asLong(); + } if (json.has("headers")) { ArrayNode jsonArray = (ArrayNode) json.get("headers"); for (JsonNode jsonObject : jsonArray) { @@ -67,7 +71,7 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p partitionFromBody = partition; } } - return new ProducerRecord<>(kafkaTopic, partitionFromBody, key, value, headers); + return new ProducerRecord<>(kafkaTopic, partitionFromBody, timestamp, key, value, headers); } @Override @@ -101,6 +105,7 @@ public byte[] toMessages(ConsumerRecords records) { jsonObject.set("value", record.value() != null ? new TextNode(new String(record.value())) : null); jsonObject.put("partition", record.partition()); jsonObject.put("offset", record.offset()); + jsonObject.put("timestamp", record.timestamp()); ArrayNode headers = JsonUtils.createArrayNode(); diff --git a/src/main/resources/openapi.json b/src/main/resources/openapi.json index 1cb570dc..a4347b38 100644 --- a/src/main/resources/openapi.json +++ b/src/main/resources/openapi.json @@ -778,7 +778,8 @@ "foo": "bar" }, "partition": 0, - "offset": 2 + "offset": 2, + "timestamp": 1591897790000 }, { "topic": "topic", @@ -788,7 +789,8 @@ "bar2" ], "partition": 1, - "offset": 3 + "offset": 3, + "timestamp": 1591897790002 } ] } @@ -1480,6 +1482,7 @@ "partition": 0, "topic": "topic", "value": "value1", + "timestamp": 1591897790000, "headers": [ { "key": "key1", @@ -1530,6 +1533,10 @@ }, "headers": { "$ref": "#/components/schemas/KafkaHeaderList" + }, + "timestamp": { + "format": "int64", + "type": "integer" } }, "title": "ConsumerRecord", @@ -1547,14 +1554,16 @@ "key": "key1", "value": "value1", "partition": 0, - "offset": 2 + "offset": 2, + "timestamp": 1591897790000 }, { "topic": "topic", "key": "key2", "value": "value2", "partition": 1, - "offset": 3 + "offset": 3, + "timestamp": 1591897790000 } ] }, @@ -1810,6 +1819,10 @@ "format": "int32", "type": "integer" }, + "timestamp": { + "format": "int64", + "type": "integer" + }, "value": { "oneOf": [ { @@ -1846,6 +1859,7 @@ "example": { "key": "key1", "partition": 0, + "timestamp": 1591897790000, "value": "value1", "headers": [ { diff --git a/src/main/resources/openapiv2.json b/src/main/resources/openapiv2.json index 44596bd0..8622b0fa 100644 --- a/src/main/resources/openapiv2.json +++ b/src/main/resources/openapiv2.json @@ -1317,6 +1317,7 @@ "partition": 0, "topic": "topic", "value": "value1", + "timestamp": 1591897790000, "headers": [ { "key": "key1", @@ -1357,6 +1358,10 @@ }, "headers": { "$ref": "#/definitions/KafkaHeaderList" + }, + "timestamp": { + "format": "int64", + "type": "integer" } }, "title": "ConsumerRecord", @@ -1374,14 +1379,16 @@ "key": "key1", "value": "value1", "partition": 0, - "offset": 2 + "offset": 2, + "timestamp": 1591897790000 }, { "topic": "topic", "key": "key2", "value": "value2", "partition": 1, - "offset": 3 + "offset": 3, + "timestamp": 1591897790002 } ] }, @@ -1626,6 +1633,10 @@ "format": "int32", "type": "integer" }, + "timestamp": { + "format": "int64", + "type": "integer" + }, "value": { "type": [ "array", @@ -1649,6 +1660,7 @@ "example": { "key": "key1", "partition": 0, + "timestamp": 1591897790000, "value": "value1", "headers": [ { diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java index dae57e90..252f9519 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java @@ -107,7 +107,6 @@ void sendSimpleMessageToPartition(VertxTestContext context) throws InterruptedEx KafkaFuture future = adminClientFacade.createTopic(topic, 2, 1); String value = "message-value"; - int partition = 1; JsonArray records = new JsonArray(); @@ -153,6 +152,57 @@ void sendSimpleMessageToPartition(VertxTestContext context) throws InterruptedEx assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true)); } + @Test + void sendSimpleMessageWithTimestamp(VertxTestContext context) throws InterruptedException, ExecutionException { + KafkaFuture future = adminClientFacade.createTopic(topic); + + String value = "message-value"; + long timestamp = System.currentTimeMillis(); + + JsonArray records = new JsonArray(); + JsonObject json = new JsonObject(); + json.put("value", value); + json.put("timestamp", timestamp); + records.add(json); + + JsonObject root = new JsonObject(); + root.put("records", records); + + future.get(); + + producerService() + .sendRecordsRequest(topic, root, BridgeContentType.KAFKA_JSON_JSON) + .sendJsonObject(root, verifyOK(context)); + + Properties consumerProperties = Consumer.fillDefaultProperties(); + consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUri); + + KafkaConsumer consumer = KafkaConsumer.create(vertx, consumerProperties, + new StringDeserializer(), new KafkaJsonDeserializer<>(String.class)); + consumer.handler(record -> { + context.verify(() -> { + assertThat(record.value(), is(value)); + assertThat(record.topic(), is(topic)); + assertThat(record.partition(), is(0)); + assertThat(record.offset(), is(0L)); + assertThat(record.key(), nullValue()); + assertThat(record.timestamp(), is(timestamp)); + }); + LOGGER.info("Message consumed topic={} partition={} offset={}, key={}, value={}, timestamp={}", + record.topic(), record.partition(), record.offset(), record.key(), record.value(), record.timestamp()); + consumer.close(); + context.completeNow(); + }); + + consumer.subscribe(topic, done -> { + if (!done.succeeded()) { + context.failNow(done.cause()); + } + }); + + assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true)); + } + @Test void sendSimpleMessageWithKey(VertxTestContext context) throws InterruptedException, ExecutionException { KafkaFuture future = adminClientFacade.createTopic(topic, 2, 1); From 170db87d1f4ca4111f7287f317ffe30012f59905 Mon Sep 17 00:00:00 2001 From: Antonio Pedro Date: Thu, 4 Jul 2024 23:29:23 +0530 Subject: [PATCH 02/13] added ConsumerIT tests Signed-off-by: Antonio Pedro --- .../converter/HttpBinaryMessageConverter.java | 1 + .../converter/HttpJsonMessageConverter.java | 1 + .../bridge/clients/BasicKafkaClient.java | 132 +++++++++++------- .../kafka/bridge/clients/Producer.java | 21 +-- .../strimzi/kafka/bridge/http/ConsumerIT.java | 43 ++++++ 5 files changed, 136 insertions(+), 62 deletions(-) diff --git a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java index 5937d48f..6a779ca1 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java @@ -24,6 +24,7 @@ /** * Implementation of a message converter to deal with the "binary" embedded data format */ +@SuppressWarnings("checkstyle:NPathComplexity") public class HttpBinaryMessageConverter implements MessageConverter { @Override diff --git a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java index 5161ebf2..589f2c2f 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java @@ -24,6 +24,7 @@ /** * Implementation of a message converter to deal with the "json" embedded data format */ +@SuppressWarnings("checkstyle:NPathComplexity") public class HttpJsonMessageConverter implements MessageConverter { @Override diff --git a/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java b/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java index 5c51ab8a..ac55b1b4 100644 --- a/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java +++ b/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java @@ -35,17 +35,19 @@ public BasicKafkaClient(String bootstrapServer) { /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param timeoutMs timeout for the sending messages - * @param topicName topic name where messages are send - * @param messageCount message count - * @param headers kafka headers - * @param message specific message to send - * @param partition partition count, how many shards/partitions will topic have + * + * @param timeoutMs timeout for the sending messages + * @param topicName topic name where messages are send + * @param messageCount message count + * @param headers kafka headers + * @param message specific message to send + * @param partition partition count, how many shards/partitions will topic have + * @param timestamp timestamp of the message * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ public int sendStringMessagesPlain(long timeoutMs, String topicName, int messageCount, List headers, - String message, int partition, boolean withNullKeyRecord) { + String message, int partition, Long timestamp, boolean withNullKeyRecord) { CompletableFuture resultPromise = new CompletableFuture<>(); IntPredicate msgCntPredicate = x -> x == messageCount; @@ -57,11 +59,11 @@ public int sendStringMessagesPlain(long timeoutMs, String topicName, int message properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "producer-sender-plain-"); properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name); - try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition) - .withProperties(properties) - .withHeaders(headers) - .withNullKeyRecord(withNullKeyRecord) - .build()) { + try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition, timestamp) + .withProperties(properties) + .withHeaders(headers) + .withNullKeyRecord(withNullKeyRecord) + .build()) { plainProducer.getVertx().deployVerticle(plainProducer); @@ -74,46 +76,50 @@ public int sendStringMessagesPlain(long timeoutMs, String topicName, int message /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send + * + * @param topicName topic name where messages are send * @param messageCount message count * @return sent message count */ public int sendStringMessagesPlain(String topicName, int messageCount) { return sendStringMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, - List.of(), "\"Hello\" : \"World\"", 0, false); + List.of(), "\"Hello\" : \"World\"", 0, null, false); } /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send - * @param message content to be sent + * + * @param topicName topic name where messages are send + * @param message content to be sent * @param messageCount message count - * @param partition partition, which will be selected + * @param partition partition, which will be selected * @return sent message count */ public int sendStringMessagesPlain(String topicName, String message, int messageCount, int partition) { return sendStringMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, - List.of(), message, partition, true); + List.of(), message, partition, null, true); } public int sendStringMessagesPlain(String topicName, String message, int messageCount, int partition, boolean withNullKeyRecord) { return sendStringMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, - List.of(), message, partition, withNullKeyRecord); + List.of(), message, partition, null, withNullKeyRecord); } /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param timeoutMs timeout for the sending messages - * @param topicName topic name where messages are send - * @param messageCount message count - * @param headers kafka headers - * @param message specific message to send - * @param partition partition count, how many shards/partitions will topic have + * + * @param timeoutMs timeout for the sending messages + * @param topicName topic name where messages are send + * @param messageCount message count + * @param headers kafka headers + * @param message specific message to send + * @param partition partition count, how many shards/partitions will topic have + * @param timestamp timestamp of the message * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCount, List headers, - String message, int partition, boolean withNullKeyRecord) { + String message, int partition, Long timestamp, boolean withNullKeyRecord) { CompletableFuture resultPromise = new CompletableFuture<>(); IntPredicate msgCntPredicate = x -> x == messageCount; @@ -124,11 +130,11 @@ public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCo properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServer); properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "producer-sender-plain-" + new Random().nextInt(Integer.MAX_VALUE)); properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name); - try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition) - .withProperties(properties) - .withHeaders(headers) - .withNullKeyRecord(withNullKeyRecord) - .build()) { + try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition, timestamp) + .withProperties(properties) + .withHeaders(headers) + .withNullKeyRecord(withNullKeyRecord) + .build()) { plainProducer.getVertx().deployVerticle(plainProducer); return plainProducer.getResultPromise().get(timeoutMs, TimeUnit.MILLISECONDS); @@ -140,73 +146,92 @@ public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCo /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send + * + * @param topicName topic name where messages are send * @param messageCount message count - * @param message specific message to send - * @param partition partition count, how many shards/partitions will topic have + * @param message specific message to send + * @param timestamp timestamp of the message + * @return sent message count + */ + public int sendJsonMessagesPlain(String topicName, int messageCount, String message, Long timestamp) { + return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(), + message, 0, timestamp, false); + } + + /** + * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting + * + * @param topicName topic name where messages are send + * @param messageCount message count + * @param message specific message to send + * @param partition partition count, how many shards/partitions will topic have * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ public int sendJsonMessagesPlain(String topicName, int messageCount, String message, int partition, boolean withNullKeyRecord) { return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(), - message, partition, withNullKeyRecord); + message, partition,null, withNullKeyRecord); } /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send - * @param messageCount message count - * @param headers kafka headers - * @param message specific message to send + * + * @param topicName topic name where messages are send + * @param messageCount message count + * @param headers kafka headers + * @param message specific message to send * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ public int sendJsonMessagesPlain(String topicName, int messageCount, List headers, String message, boolean withNullKeyRecord) { - return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, headers, message, 0, - withNullKeyRecord); + return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, headers, message, 0,null, withNullKeyRecord); } /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send + * + * @param topicName topic name where messages are send * @param messageCount message count - * @param message specific message to send - * @param partition partition count, how many shards/partitions will topic have + * @param message specific message to send + * @param partition partition count, how many shards/partitions will topic have * @return sent message count */ public int sendJsonMessagesPlain(String topicName, int messageCount, String message, int partition) { return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(), - message, partition, false); + message, partition,null, false); } /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send + * + * @param topicName topic name where messages are send * @param messageCount message count - * @param message specific message to send + * @param message specific message to send * @return sent message count */ public int sendJsonMessagesPlain(String topicName, int messageCount, String message) { return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(), - message, 0, false); + message, 0, null,false); } /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send + * + * @param topicName topic name where messages are send * @param messageCount message count * @return sent message count */ public int sendJsonMessagesPlain(String topicName, int messageCount) { return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(), - "{\"Hello\" : \"World\"}", 0, false); + "{\"Hello\" : \"World\"}", 0, null, false); } /** * Receive messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param timeoutMs timeout for the receiving messages - * @param topicName topic name from messages are received + * + * @param timeoutMs timeout for the receiving messages + * @param topicName topic name from messages are received * @param messageCount message count * @return received message count */ @@ -240,7 +265,8 @@ public int receiveStringMessagesPlain(long timeoutMs, String topicName, int mess /** * Receive messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name from messages are received + * + * @param topicName topic name from messages are received * @param messageCount message count * @return received message count */ diff --git a/src/test/java/io/strimzi/kafka/bridge/clients/Producer.java b/src/test/java/io/strimzi/kafka/bridge/clients/Producer.java index 406d615b..2de92921 100644 --- a/src/test/java/io/strimzi/kafka/bridge/clients/Producer.java +++ b/src/test/java/io/strimzi/kafka/bridge/clients/Producer.java @@ -32,6 +32,7 @@ public class Producer extends ClientHandlerBase implements AutoCloseabl private final List headers; private final String message; private final int partition; + private final Long timestamp; private final boolean withNullKeyRecord; public Producer(ProducerBuilder producerBuilder) { @@ -43,6 +44,7 @@ public Producer(ProducerBuilder producerBuilder) { this.headers = producerBuilder.headers; this.message = producerBuilder.message; this.partition = producerBuilder.partition; + this.timestamp = producerBuilder.timestamp; this.withNullKeyRecord = producerBuilder.withNullKeyRecord; this.vertx = Vertx.vertx(); } @@ -62,9 +64,9 @@ protected void handleClient() { resultPromise.complete(numSent.get()); } }); - vertx.setPeriodic(1000, id -> sendNext(producer, topic, headers, message, partition, withNullKeyRecord)); + vertx.setPeriodic(1000, id -> sendNext(producer, topic, headers, message, partition, timestamp, withNullKeyRecord)); } else { - sendNext(producer, topic, headers, message, partition, withNullKeyRecord); + sendNext(producer, topic, headers, message, partition, timestamp, withNullKeyRecord); } } @@ -77,15 +79,15 @@ public void close() { } private void sendNext(KafkaProducer producer, String topic, List headers, - String message, int partition, boolean withNullKeyRecord) { + String message, int partition, Long timestamp, boolean withNullKeyRecord) { if (msgCntPredicate.negate().test(numSent.get())) { KafkaProducerRecord record; if (withNullKeyRecord) { - record = KafkaProducerRecord.create(topic, null, message, partition); + record = KafkaProducerRecord.create(topic, null, message, timestamp, partition); } else { - record = KafkaProducerRecord.create(topic, "key-" + numSent.get(), message + "-" + numSent.get(), partition); + record = KafkaProducerRecord.create(topic, "key-" + numSent.get(), message + "-" + numSent.get(), timestamp, partition); } record.addHeaders(headers); @@ -104,12 +106,12 @@ record = KafkaProducerRecord.create(topic, "key-" + numSent.get(), message + "-" } if (msgCntPredicate.negate().test(-1)) { - sendNext(producer, topic, headers, message, partition, withNullKeyRecord); + sendNext(producer, topic, headers, message, partition, timestamp, withNullKeyRecord); } } else { LOGGER.error("Producer cannot connect to topic {}", topic, done.cause()); - sendNext(producer, topic, headers, message, partition, withNullKeyRecord); + sendNext(producer, topic, headers, message, partition, timestamp, withNullKeyRecord); } }); @@ -137,18 +139,19 @@ public static class ProducerBuilder { private final String topic; private final String message; private final int partition; - + private final Long timestamp; private Properties properties; private List headers = Collections.emptyList(); private boolean withNullKeyRecord = false; public ProducerBuilder(CompletableFuture resultPromise, IntPredicate msgCntPredicate, String topic, - String message, int partition) { + String message, int partition, Long timestamp) { this.resultPromise = resultPromise; this.msgCntPredicate = msgCntPredicate; this.topic = topic; this.message = message; this.partition = partition; + this.timestamp = timestamp; } public ProducerBuilder withProperties(Properties properties) { diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java index 1bd287e5..f9d5776d 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java @@ -533,6 +533,49 @@ void receiveSimpleMessage(VertxTestContext context) throws InterruptedException, assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true)); } + @Test + void receiveMessageWithTimestamp(VertxTestContext context) throws InterruptedException, ExecutionException, TimeoutException { + KafkaFuture future = adminClientFacade.createTopic(topic); + + future.get(); + String sentBody = "Simple message"; + long timestamp = System.currentTimeMillis(); + basicKafkaClient.sendJsonMessagesPlain(topic, 1, sentBody, timestamp); + + // create consumer + // subscribe to a topic + consumerService() + .createConsumer(context, groupId, consumerJson) + .subscribeConsumer(context, groupId, name, topic); + + CompletableFuture consume = new CompletableFuture<>(); + // consume records + consumerService() + .consumeRecordsRequest(groupId, name, BridgeContentType.KAFKA_JSON_JSON) + .as(BodyCodec.jsonArray()) + .send(ar -> { + context.verify(() -> { + assertThat(ar.succeeded(), is(true)); + HttpResponse response = ar.result(); + assertThat(response.statusCode(), is(HttpResponseStatus.OK.code())); + JsonObject jsonResponse = response.body().getJsonObject(0); + + long recordTimestamp = jsonResponse.getLong("timestamp"); + + assertThat(recordTimestamp, is(timestamp)); + }); + consume.complete(true); + }); + + consume.get(TEST_TIMEOUT, TimeUnit.SECONDS); + + // consumer deletion + consumerService() + .deleteConsumer(context, groupId, name); + context.completeNow(); + assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true)); + + } @Test void receiveTextMessage(VertxTestContext context) throws InterruptedException, ExecutionException, TimeoutException { KafkaFuture future = adminClientFacade.createTopic(topic); From 3b7dbb56ef96731331a7575140f9b7076ac76d1d Mon Sep 17 00:00:00 2001 From: Antonio Pedro Date: Thu, 4 Jul 2024 23:48:29 +0530 Subject: [PATCH 03/13] SuppressWarnings checkstyle's ParameterNumber rule Signed-off-by: Antonio Pedro --- .../strimzi/kafka/bridge/clients/BasicKafkaClient.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java b/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java index ac55b1b4..9c510adb 100644 --- a/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java +++ b/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java @@ -46,6 +46,7 @@ public BasicKafkaClient(String bootstrapServer) { * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ + @SuppressWarnings("checkstyle:ParameterNumber") public int sendStringMessagesPlain(long timeoutMs, String topicName, int messageCount, List headers, String message, int partition, Long timestamp, boolean withNullKeyRecord) { CompletableFuture resultPromise = new CompletableFuture<>(); @@ -118,6 +119,7 @@ public int sendStringMessagesPlain(String topicName, String message, int message * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ + @SuppressWarnings("checkstyle:ParameterNumber") public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCount, List headers, String message, int partition, Long timestamp, boolean withNullKeyRecord) { CompletableFuture resultPromise = new CompletableFuture<>(); @@ -170,7 +172,7 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, String mess */ public int sendJsonMessagesPlain(String topicName, int messageCount, String message, int partition, boolean withNullKeyRecord) { return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(), - message, partition,null, withNullKeyRecord); + message, partition, null, withNullKeyRecord); } /** @@ -185,7 +187,7 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, String mess */ public int sendJsonMessagesPlain(String topicName, int messageCount, List headers, String message, boolean withNullKeyRecord) { - return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, headers, message, 0,null, withNullKeyRecord); + return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, headers, message, 0, null, withNullKeyRecord); } /** @@ -199,7 +201,7 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, List Date: Mon, 22 Jul 2024 10:17:23 +0200 Subject: [PATCH 04/13] Update generated documentation Signed-off-by: Paolo Patierno Signed-off-by: Antonio Pedro --- documentation/book/api/definitions.adoc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/documentation/book/api/definitions.adoc b/documentation/book/api/definitions.adoc index 7b6f521e..a4183f53 100644 --- a/documentation/book/api/definitions.adoc +++ b/documentation/book/api/definitions.adoc @@ -55,6 +55,8 @@ __optional__|<<_kafkaheaderlist,KafkaHeaderList>> __optional__|integer (int64) |**partition** + __optional__|integer (int32) +|**timestamp** + +__optional__|integer (int64) |**topic** + __optional__|string |=== @@ -222,6 +224,8 @@ __optional__|< <<_partition,Partition>> > array __optional__|<<_kafkaheaderlist,KafkaHeaderList>> |**partition** + __optional__|integer (int32) +|**timestamp** + +__optional__|integer (int64) |=== From 47f8fcc80ea430360be0fc5f8081a2108dcdf558 Mon Sep 17 00:00:00 2001 From: Antonio Pedro Date: Wed, 3 Jul 2024 03:31:13 +0530 Subject: [PATCH 05/13] feat: added support for message timestamp Signed-off-by: Antonio Pedro --- .../converter/HttpBinaryMessageConverter.java | 7 ++- .../converter/HttpJsonMessageConverter.java | 11 ++-- .../converter/HttpTextMessageConverter.java | 7 ++- src/main/resources/openapi.json | 22 ++++++-- src/main/resources/openapiv2.json | 16 +++++- .../strimzi/kafka/bridge/http/ProducerIT.java | 52 ++++++++++++++++++- 6 files changed, 103 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java index 5a6fc0d9..5937d48f 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java @@ -30,6 +30,7 @@ public class HttpBinaryMessageConverter implements MessageConverter toKafkaRecord(String kafkaTopic, Integer partition, byte[] message) { Integer partitionFromBody = null; + Long timestamp = null; byte[] key = null; byte[] value = null; Headers headers = new RecordHeaders(); @@ -43,6 +44,9 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p if (json.has("value")) { value = DatatypeConverter.parseBase64Binary(json.get("value").asText()); } + if (json.has("timestamp")) { + timestamp = json.get("timestamp").asLong(); + } if (json.has("headers")) { ArrayNode jsonArray = (ArrayNode) json.get("headers"); for (JsonNode jsonObject: jsonArray) { @@ -59,7 +63,7 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p partitionFromBody = partition; } } - return new ProducerRecord<>(kafkaTopic, partitionFromBody, key, value, headers); + return new ProducerRecord<>(kafkaTopic, partitionFromBody, timestamp, key, value, headers); } @Override @@ -98,6 +102,7 @@ public byte[] toMessages(ConsumerRecords records) { DatatypeConverter.printBase64Binary(record.value()) : null); jsonObject.put("partition", record.partition()); jsonObject.put("offset", record.offset()); + jsonObject.put("timestamp", record.timestamp()); ArrayNode headers = JsonUtils.createArrayNode(); diff --git a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java index 7077d35d..5161ebf2 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java @@ -30,6 +30,7 @@ public class HttpJsonMessageConverter implements MessageConverter toKafkaRecord(String kafkaTopic, Integer partition, byte[] message) { Integer partitionFromBody = null; + Long timestamp = null; byte[] key = null; byte[] value = null; Headers headers = new RecordHeaders(); @@ -43,9 +44,12 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p if (json.has("value")) { value = JsonUtils.jsonToBytes(json.get("value")); } + if (json.has("timestamp")) { + timestamp = json.get("timestamp").asLong(); + } if (json.has("headers")) { ArrayNode jsonArray = (ArrayNode) json.get("headers"); - for (JsonNode jsonObject: jsonArray) { + for (JsonNode jsonObject : jsonArray) { headers.add(new RecordHeader(jsonObject.get("key").asText(), DatatypeConverter.parseBase64Binary(jsonObject.get("value").asText()))); } } @@ -59,7 +63,7 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p partitionFromBody = partition; } } - return new ProducerRecord<>(kafkaTopic, partitionFromBody, key, value, headers); + return new ProducerRecord<>(kafkaTopic, partitionFromBody, timestamp, key, value, headers); } @Override @@ -97,10 +101,11 @@ public byte[] toMessages(ConsumerRecords records) { JsonUtils.bytesToJson(record.value()) : null); jsonObject.put("partition", record.partition()); jsonObject.put("offset", record.offset()); + jsonObject.put("timestamp", record.timestamp()); ArrayNode headers = JsonUtils.createArrayNode(); - for (Header kafkaHeader: record.headers()) { + for (Header kafkaHeader : record.headers()) { ObjectNode header = JsonUtils.createObjectNode(); header.put("key", kafkaHeader.key()); diff --git a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java index 366eab3c..0efc790f 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java @@ -30,6 +30,7 @@ public class HttpTextMessageConverter implements MessageConverter toKafkaRecord(String kafkaTopic, Integer partition, byte[] message) { Integer partitionFromBody = null; + Long timestamp = null; byte[] key = null; byte[] value = null; Headers headers = new RecordHeaders(); @@ -51,6 +52,9 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p } value = valueNode.asText().getBytes(); } + if (json.has("timestamp")) { + timestamp = json.get("timestamp").asLong(); + } if (json.has("headers")) { ArrayNode jsonArray = (ArrayNode) json.get("headers"); for (JsonNode jsonObject : jsonArray) { @@ -67,7 +71,7 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p partitionFromBody = partition; } } - return new ProducerRecord<>(kafkaTopic, partitionFromBody, key, value, headers); + return new ProducerRecord<>(kafkaTopic, partitionFromBody, timestamp, key, value, headers); } @Override @@ -101,6 +105,7 @@ public byte[] toMessages(ConsumerRecords records) { jsonObject.set("value", record.value() != null ? new TextNode(new String(record.value())) : null); jsonObject.put("partition", record.partition()); jsonObject.put("offset", record.offset()); + jsonObject.put("timestamp", record.timestamp()); ArrayNode headers = JsonUtils.createArrayNode(); diff --git a/src/main/resources/openapi.json b/src/main/resources/openapi.json index 80a86262..e617327a 100644 --- a/src/main/resources/openapi.json +++ b/src/main/resources/openapi.json @@ -778,7 +778,8 @@ "foo": "bar" }, "partition": 0, - "offset": 2 + "offset": 2, + "timestamp": 1591897790000 }, { "topic": "topic", @@ -788,7 +789,8 @@ "bar2" ], "partition": 1, - "offset": 3 + "offset": 3, + "timestamp": 1591897790002 } ] } @@ -1516,6 +1518,7 @@ "partition": 0, "topic": "topic", "value": "value1", + "timestamp": 1591897790000, "headers": [ { "key": "key1", @@ -1566,6 +1569,10 @@ }, "headers": { "$ref": "#/components/schemas/KafkaHeaderList" + }, + "timestamp": { + "format": "int64", + "type": "integer" } }, "title": "ConsumerRecord", @@ -1583,14 +1590,16 @@ "key": "key1", "value": "value1", "partition": 0, - "offset": 2 + "offset": 2, + "timestamp": 1591897790000 }, { "topic": "topic", "key": "key2", "value": "value2", "partition": 1, - "offset": 3 + "offset": 3, + "timestamp": 1591897790000 } ] }, @@ -1846,6 +1855,10 @@ "format": "int32", "type": "integer" }, + "timestamp": { + "format": "int64", + "type": "integer" + }, "value": { "oneOf": [ { @@ -1882,6 +1895,7 @@ "example": { "key": "key1", "partition": 0, + "timestamp": 1591897790000, "value": "value1", "headers": [ { diff --git a/src/main/resources/openapiv2.json b/src/main/resources/openapiv2.json index f27fb741..c5f6430a 100644 --- a/src/main/resources/openapiv2.json +++ b/src/main/resources/openapiv2.json @@ -1351,6 +1351,7 @@ "partition": 0, "topic": "topic", "value": "value1", + "timestamp": 1591897790000, "headers": [ { "key": "key1", @@ -1391,6 +1392,10 @@ }, "headers": { "$ref": "#/definitions/KafkaHeaderList" + }, + "timestamp": { + "format": "int64", + "type": "integer" } }, "title": "ConsumerRecord", @@ -1408,14 +1413,16 @@ "key": "key1", "value": "value1", "partition": 0, - "offset": 2 + "offset": 2, + "timestamp": 1591897790000 }, { "topic": "topic", "key": "key2", "value": "value2", "partition": 1, - "offset": 3 + "offset": 3, + "timestamp": 1591897790002 } ] }, @@ -1660,6 +1667,10 @@ "format": "int32", "type": "integer" }, + "timestamp": { + "format": "int64", + "type": "integer" + }, "value": { "type": [ "array", @@ -1683,6 +1694,7 @@ "example": { "key": "key1", "partition": 0, + "timestamp": 1591897790000, "value": "value1", "headers": [ { diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java index dae57e90..252f9519 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java @@ -107,7 +107,6 @@ void sendSimpleMessageToPartition(VertxTestContext context) throws InterruptedEx KafkaFuture future = adminClientFacade.createTopic(topic, 2, 1); String value = "message-value"; - int partition = 1; JsonArray records = new JsonArray(); @@ -153,6 +152,57 @@ void sendSimpleMessageToPartition(VertxTestContext context) throws InterruptedEx assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true)); } + @Test + void sendSimpleMessageWithTimestamp(VertxTestContext context) throws InterruptedException, ExecutionException { + KafkaFuture future = adminClientFacade.createTopic(topic); + + String value = "message-value"; + long timestamp = System.currentTimeMillis(); + + JsonArray records = new JsonArray(); + JsonObject json = new JsonObject(); + json.put("value", value); + json.put("timestamp", timestamp); + records.add(json); + + JsonObject root = new JsonObject(); + root.put("records", records); + + future.get(); + + producerService() + .sendRecordsRequest(topic, root, BridgeContentType.KAFKA_JSON_JSON) + .sendJsonObject(root, verifyOK(context)); + + Properties consumerProperties = Consumer.fillDefaultProperties(); + consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUri); + + KafkaConsumer consumer = KafkaConsumer.create(vertx, consumerProperties, + new StringDeserializer(), new KafkaJsonDeserializer<>(String.class)); + consumer.handler(record -> { + context.verify(() -> { + assertThat(record.value(), is(value)); + assertThat(record.topic(), is(topic)); + assertThat(record.partition(), is(0)); + assertThat(record.offset(), is(0L)); + assertThat(record.key(), nullValue()); + assertThat(record.timestamp(), is(timestamp)); + }); + LOGGER.info("Message consumed topic={} partition={} offset={}, key={}, value={}, timestamp={}", + record.topic(), record.partition(), record.offset(), record.key(), record.value(), record.timestamp()); + consumer.close(); + context.completeNow(); + }); + + consumer.subscribe(topic, done -> { + if (!done.succeeded()) { + context.failNow(done.cause()); + } + }); + + assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true)); + } + @Test void sendSimpleMessageWithKey(VertxTestContext context) throws InterruptedException, ExecutionException { KafkaFuture future = adminClientFacade.createTopic(topic, 2, 1); From b2ef1a395039980311590e18ef305887cea5faf2 Mon Sep 17 00:00:00 2001 From: Antonio Pedro Date: Thu, 4 Jul 2024 23:29:23 +0530 Subject: [PATCH 06/13] added ConsumerIT tests Signed-off-by: Antonio Pedro --- .../converter/HttpBinaryMessageConverter.java | 1 + .../converter/HttpJsonMessageConverter.java | 1 + .../bridge/clients/BasicKafkaClient.java | 132 +++++++++++------- .../kafka/bridge/clients/Producer.java | 21 +-- .../strimzi/kafka/bridge/http/ConsumerIT.java | 43 ++++++ 5 files changed, 136 insertions(+), 62 deletions(-) diff --git a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java index 5937d48f..6a779ca1 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java @@ -24,6 +24,7 @@ /** * Implementation of a message converter to deal with the "binary" embedded data format */ +@SuppressWarnings("checkstyle:NPathComplexity") public class HttpBinaryMessageConverter implements MessageConverter { @Override diff --git a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java index 5161ebf2..589f2c2f 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java @@ -24,6 +24,7 @@ /** * Implementation of a message converter to deal with the "json" embedded data format */ +@SuppressWarnings("checkstyle:NPathComplexity") public class HttpJsonMessageConverter implements MessageConverter { @Override diff --git a/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java b/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java index 5c51ab8a..ac55b1b4 100644 --- a/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java +++ b/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java @@ -35,17 +35,19 @@ public BasicKafkaClient(String bootstrapServer) { /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param timeoutMs timeout for the sending messages - * @param topicName topic name where messages are send - * @param messageCount message count - * @param headers kafka headers - * @param message specific message to send - * @param partition partition count, how many shards/partitions will topic have + * + * @param timeoutMs timeout for the sending messages + * @param topicName topic name where messages are send + * @param messageCount message count + * @param headers kafka headers + * @param message specific message to send + * @param partition partition count, how many shards/partitions will topic have + * @param timestamp timestamp of the message * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ public int sendStringMessagesPlain(long timeoutMs, String topicName, int messageCount, List headers, - String message, int partition, boolean withNullKeyRecord) { + String message, int partition, Long timestamp, boolean withNullKeyRecord) { CompletableFuture resultPromise = new CompletableFuture<>(); IntPredicate msgCntPredicate = x -> x == messageCount; @@ -57,11 +59,11 @@ public int sendStringMessagesPlain(long timeoutMs, String topicName, int message properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "producer-sender-plain-"); properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name); - try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition) - .withProperties(properties) - .withHeaders(headers) - .withNullKeyRecord(withNullKeyRecord) - .build()) { + try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition, timestamp) + .withProperties(properties) + .withHeaders(headers) + .withNullKeyRecord(withNullKeyRecord) + .build()) { plainProducer.getVertx().deployVerticle(plainProducer); @@ -74,46 +76,50 @@ public int sendStringMessagesPlain(long timeoutMs, String topicName, int message /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send + * + * @param topicName topic name where messages are send * @param messageCount message count * @return sent message count */ public int sendStringMessagesPlain(String topicName, int messageCount) { return sendStringMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, - List.of(), "\"Hello\" : \"World\"", 0, false); + List.of(), "\"Hello\" : \"World\"", 0, null, false); } /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send - * @param message content to be sent + * + * @param topicName topic name where messages are send + * @param message content to be sent * @param messageCount message count - * @param partition partition, which will be selected + * @param partition partition, which will be selected * @return sent message count */ public int sendStringMessagesPlain(String topicName, String message, int messageCount, int partition) { return sendStringMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, - List.of(), message, partition, true); + List.of(), message, partition, null, true); } public int sendStringMessagesPlain(String topicName, String message, int messageCount, int partition, boolean withNullKeyRecord) { return sendStringMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, - List.of(), message, partition, withNullKeyRecord); + List.of(), message, partition, null, withNullKeyRecord); } /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param timeoutMs timeout for the sending messages - * @param topicName topic name where messages are send - * @param messageCount message count - * @param headers kafka headers - * @param message specific message to send - * @param partition partition count, how many shards/partitions will topic have + * + * @param timeoutMs timeout for the sending messages + * @param topicName topic name where messages are send + * @param messageCount message count + * @param headers kafka headers + * @param message specific message to send + * @param partition partition count, how many shards/partitions will topic have + * @param timestamp timestamp of the message * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCount, List headers, - String message, int partition, boolean withNullKeyRecord) { + String message, int partition, Long timestamp, boolean withNullKeyRecord) { CompletableFuture resultPromise = new CompletableFuture<>(); IntPredicate msgCntPredicate = x -> x == messageCount; @@ -124,11 +130,11 @@ public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCo properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServer); properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "producer-sender-plain-" + new Random().nextInt(Integer.MAX_VALUE)); properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name); - try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition) - .withProperties(properties) - .withHeaders(headers) - .withNullKeyRecord(withNullKeyRecord) - .build()) { + try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition, timestamp) + .withProperties(properties) + .withHeaders(headers) + .withNullKeyRecord(withNullKeyRecord) + .build()) { plainProducer.getVertx().deployVerticle(plainProducer); return plainProducer.getResultPromise().get(timeoutMs, TimeUnit.MILLISECONDS); @@ -140,73 +146,92 @@ public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCo /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send + * + * @param topicName topic name where messages are send * @param messageCount message count - * @param message specific message to send - * @param partition partition count, how many shards/partitions will topic have + * @param message specific message to send + * @param timestamp timestamp of the message + * @return sent message count + */ + public int sendJsonMessagesPlain(String topicName, int messageCount, String message, Long timestamp) { + return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(), + message, 0, timestamp, false); + } + + /** + * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting + * + * @param topicName topic name where messages are send + * @param messageCount message count + * @param message specific message to send + * @param partition partition count, how many shards/partitions will topic have * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ public int sendJsonMessagesPlain(String topicName, int messageCount, String message, int partition, boolean withNullKeyRecord) { return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(), - message, partition, withNullKeyRecord); + message, partition,null, withNullKeyRecord); } /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send - * @param messageCount message count - * @param headers kafka headers - * @param message specific message to send + * + * @param topicName topic name where messages are send + * @param messageCount message count + * @param headers kafka headers + * @param message specific message to send * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ public int sendJsonMessagesPlain(String topicName, int messageCount, List headers, String message, boolean withNullKeyRecord) { - return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, headers, message, 0, - withNullKeyRecord); + return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, headers, message, 0,null, withNullKeyRecord); } /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send + * + * @param topicName topic name where messages are send * @param messageCount message count - * @param message specific message to send - * @param partition partition count, how many shards/partitions will topic have + * @param message specific message to send + * @param partition partition count, how many shards/partitions will topic have * @return sent message count */ public int sendJsonMessagesPlain(String topicName, int messageCount, String message, int partition) { return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(), - message, partition, false); + message, partition,null, false); } /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send + * + * @param topicName topic name where messages are send * @param messageCount message count - * @param message specific message to send + * @param message specific message to send * @return sent message count */ public int sendJsonMessagesPlain(String topicName, int messageCount, String message) { return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(), - message, 0, false); + message, 0, null,false); } /** * Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name where messages are send + * + * @param topicName topic name where messages are send * @param messageCount message count * @return sent message count */ public int sendJsonMessagesPlain(String topicName, int messageCount) { return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(), - "{\"Hello\" : \"World\"}", 0, false); + "{\"Hello\" : \"World\"}", 0, null, false); } /** * Receive messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param timeoutMs timeout for the receiving messages - * @param topicName topic name from messages are received + * + * @param timeoutMs timeout for the receiving messages + * @param topicName topic name from messages are received * @param messageCount message count * @return received message count */ @@ -240,7 +265,8 @@ public int receiveStringMessagesPlain(long timeoutMs, String topicName, int mess /** * Receive messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting - * @param topicName topic name from messages are received + * + * @param topicName topic name from messages are received * @param messageCount message count * @return received message count */ diff --git a/src/test/java/io/strimzi/kafka/bridge/clients/Producer.java b/src/test/java/io/strimzi/kafka/bridge/clients/Producer.java index 406d615b..2de92921 100644 --- a/src/test/java/io/strimzi/kafka/bridge/clients/Producer.java +++ b/src/test/java/io/strimzi/kafka/bridge/clients/Producer.java @@ -32,6 +32,7 @@ public class Producer extends ClientHandlerBase implements AutoCloseabl private final List headers; private final String message; private final int partition; + private final Long timestamp; private final boolean withNullKeyRecord; public Producer(ProducerBuilder producerBuilder) { @@ -43,6 +44,7 @@ public Producer(ProducerBuilder producerBuilder) { this.headers = producerBuilder.headers; this.message = producerBuilder.message; this.partition = producerBuilder.partition; + this.timestamp = producerBuilder.timestamp; this.withNullKeyRecord = producerBuilder.withNullKeyRecord; this.vertx = Vertx.vertx(); } @@ -62,9 +64,9 @@ protected void handleClient() { resultPromise.complete(numSent.get()); } }); - vertx.setPeriodic(1000, id -> sendNext(producer, topic, headers, message, partition, withNullKeyRecord)); + vertx.setPeriodic(1000, id -> sendNext(producer, topic, headers, message, partition, timestamp, withNullKeyRecord)); } else { - sendNext(producer, topic, headers, message, partition, withNullKeyRecord); + sendNext(producer, topic, headers, message, partition, timestamp, withNullKeyRecord); } } @@ -77,15 +79,15 @@ public void close() { } private void sendNext(KafkaProducer producer, String topic, List headers, - String message, int partition, boolean withNullKeyRecord) { + String message, int partition, Long timestamp, boolean withNullKeyRecord) { if (msgCntPredicate.negate().test(numSent.get())) { KafkaProducerRecord record; if (withNullKeyRecord) { - record = KafkaProducerRecord.create(topic, null, message, partition); + record = KafkaProducerRecord.create(topic, null, message, timestamp, partition); } else { - record = KafkaProducerRecord.create(topic, "key-" + numSent.get(), message + "-" + numSent.get(), partition); + record = KafkaProducerRecord.create(topic, "key-" + numSent.get(), message + "-" + numSent.get(), timestamp, partition); } record.addHeaders(headers); @@ -104,12 +106,12 @@ record = KafkaProducerRecord.create(topic, "key-" + numSent.get(), message + "-" } if (msgCntPredicate.negate().test(-1)) { - sendNext(producer, topic, headers, message, partition, withNullKeyRecord); + sendNext(producer, topic, headers, message, partition, timestamp, withNullKeyRecord); } } else { LOGGER.error("Producer cannot connect to topic {}", topic, done.cause()); - sendNext(producer, topic, headers, message, partition, withNullKeyRecord); + sendNext(producer, topic, headers, message, partition, timestamp, withNullKeyRecord); } }); @@ -137,18 +139,19 @@ public static class ProducerBuilder { private final String topic; private final String message; private final int partition; - + private final Long timestamp; private Properties properties; private List headers = Collections.emptyList(); private boolean withNullKeyRecord = false; public ProducerBuilder(CompletableFuture resultPromise, IntPredicate msgCntPredicate, String topic, - String message, int partition) { + String message, int partition, Long timestamp) { this.resultPromise = resultPromise; this.msgCntPredicate = msgCntPredicate; this.topic = topic; this.message = message; this.partition = partition; + this.timestamp = timestamp; } public ProducerBuilder withProperties(Properties properties) { diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java index 1bd287e5..f9d5776d 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java @@ -533,6 +533,49 @@ void receiveSimpleMessage(VertxTestContext context) throws InterruptedException, assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true)); } + @Test + void receiveMessageWithTimestamp(VertxTestContext context) throws InterruptedException, ExecutionException, TimeoutException { + KafkaFuture future = adminClientFacade.createTopic(topic); + + future.get(); + String sentBody = "Simple message"; + long timestamp = System.currentTimeMillis(); + basicKafkaClient.sendJsonMessagesPlain(topic, 1, sentBody, timestamp); + + // create consumer + // subscribe to a topic + consumerService() + .createConsumer(context, groupId, consumerJson) + .subscribeConsumer(context, groupId, name, topic); + + CompletableFuture consume = new CompletableFuture<>(); + // consume records + consumerService() + .consumeRecordsRequest(groupId, name, BridgeContentType.KAFKA_JSON_JSON) + .as(BodyCodec.jsonArray()) + .send(ar -> { + context.verify(() -> { + assertThat(ar.succeeded(), is(true)); + HttpResponse response = ar.result(); + assertThat(response.statusCode(), is(HttpResponseStatus.OK.code())); + JsonObject jsonResponse = response.body().getJsonObject(0); + + long recordTimestamp = jsonResponse.getLong("timestamp"); + + assertThat(recordTimestamp, is(timestamp)); + }); + consume.complete(true); + }); + + consume.get(TEST_TIMEOUT, TimeUnit.SECONDS); + + // consumer deletion + consumerService() + .deleteConsumer(context, groupId, name); + context.completeNow(); + assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true)); + + } @Test void receiveTextMessage(VertxTestContext context) throws InterruptedException, ExecutionException, TimeoutException { KafkaFuture future = adminClientFacade.createTopic(topic); From 44f99d5fb3ade79e004a685fdf093d62e4c94cf4 Mon Sep 17 00:00:00 2001 From: Antonio Pedro Date: Thu, 4 Jul 2024 23:48:29 +0530 Subject: [PATCH 07/13] SuppressWarnings checkstyle's ParameterNumber rule Signed-off-by: Antonio Pedro --- .../strimzi/kafka/bridge/clients/BasicKafkaClient.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java b/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java index ac55b1b4..9c510adb 100644 --- a/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java +++ b/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java @@ -46,6 +46,7 @@ public BasicKafkaClient(String bootstrapServer) { * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ + @SuppressWarnings("checkstyle:ParameterNumber") public int sendStringMessagesPlain(long timeoutMs, String topicName, int messageCount, List headers, String message, int partition, Long timestamp, boolean withNullKeyRecord) { CompletableFuture resultPromise = new CompletableFuture<>(); @@ -118,6 +119,7 @@ public int sendStringMessagesPlain(String topicName, String message, int message * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ + @SuppressWarnings("checkstyle:ParameterNumber") public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCount, List headers, String message, int partition, Long timestamp, boolean withNullKeyRecord) { CompletableFuture resultPromise = new CompletableFuture<>(); @@ -170,7 +172,7 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, String mess */ public int sendJsonMessagesPlain(String topicName, int messageCount, String message, int partition, boolean withNullKeyRecord) { return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(), - message, partition,null, withNullKeyRecord); + message, partition, null, withNullKeyRecord); } /** @@ -185,7 +187,7 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, String mess */ public int sendJsonMessagesPlain(String topicName, int messageCount, List headers, String message, boolean withNullKeyRecord) { - return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, headers, message, 0,null, withNullKeyRecord); + return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, headers, message, 0, null, withNullKeyRecord); } /** @@ -199,7 +201,7 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, List Date: Mon, 22 Jul 2024 10:17:23 +0200 Subject: [PATCH 08/13] Update generated documentation Signed-off-by: Paolo Patierno Signed-off-by: Antonio Pedro --- documentation/book/api/definitions.adoc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/documentation/book/api/definitions.adoc b/documentation/book/api/definitions.adoc index 7b6f521e..a4183f53 100644 --- a/documentation/book/api/definitions.adoc +++ b/documentation/book/api/definitions.adoc @@ -55,6 +55,8 @@ __optional__|<<_kafkaheaderlist,KafkaHeaderList>> __optional__|integer (int64) |**partition** + __optional__|integer (int32) +|**timestamp** + +__optional__|integer (int64) |**topic** + __optional__|string |=== @@ -222,6 +224,8 @@ __optional__|< <<_partition,Partition>> > array __optional__|<<_kafkaheaderlist,KafkaHeaderList>> |**partition** + __optional__|integer (int32) +|**timestamp** + +__optional__|integer (int64) |=== From 1c5ef6e0cd27af32c18949d83a388a2c4c4b83bc Mon Sep 17 00:00:00 2001 From: Antonio Pedro Date: Tue, 23 Jul 2024 03:41:37 +0530 Subject: [PATCH 09/13] added CHANGELOG and fix nits Signed-off-by: Antonio Pedro --- CHANGELOG.md | 3 ++ .../bridge/clients/BasicKafkaClient.java | 34 +++++++++---------- .../strimzi/kafka/bridge/http/ProducerIT.java | 1 + 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2bf66932..7d9116c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ ## 0.30.0 * Dependency updates (Kafka 3.7.1, Prometheus JMX Collector 1.0.1, Prometheus Client 1.3.1) +* added support for message timestamp. + * Implemented support for interpreting the timestamp parameter in `ProducerRecord` objects sent to Kafka topics via the bridge. + * Allow users to read the `ConsumerRecord` timestamp on the request's response. ## 0.29.0 diff --git a/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java b/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java index 9c510adb..cfcb691d 100644 --- a/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java +++ b/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java @@ -61,10 +61,10 @@ public int sendStringMessagesPlain(long timeoutMs, String topicName, int message properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name); try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition, timestamp) - .withProperties(properties) - .withHeaders(headers) - .withNullKeyRecord(withNullKeyRecord) - .build()) { + .withProperties(properties) + .withHeaders(headers) + .withNullKeyRecord(withNullKeyRecord) + .build()) { plainProducer.getVertx().deployVerticle(plainProducer); @@ -84,7 +84,7 @@ public int sendStringMessagesPlain(long timeoutMs, String topicName, int message */ public int sendStringMessagesPlain(String topicName, int messageCount) { return sendStringMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, - List.of(), "\"Hello\" : \"World\"", 0, null, false); + List.of(), "\"Hello\" : \"World\"", 0, null, false); } /** @@ -93,7 +93,7 @@ public int sendStringMessagesPlain(String topicName, int messageCount) { * @param topicName topic name where messages are send * @param message content to be sent * @param messageCount message count - * @param partition partition, which will be selected + * @param partition partition, which will be selected * @return sent message count */ public int sendStringMessagesPlain(String topicName, String message, int messageCount, int partition) { @@ -133,10 +133,10 @@ public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCo properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "producer-sender-plain-" + new Random().nextInt(Integer.MAX_VALUE)); properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name); try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition, timestamp) - .withProperties(properties) - .withHeaders(headers) - .withNullKeyRecord(withNullKeyRecord) - .build()) { + .withProperties(properties) + .withHeaders(headers) + .withNullKeyRecord(withNullKeyRecord) + .build()) { plainProducer.getVertx().deployVerticle(plainProducer); return plainProducer.getResultPromise().get(timeoutMs, TimeUnit.MILLISECONDS); @@ -172,7 +172,7 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, String mess */ public int sendJsonMessagesPlain(String topicName, int messageCount, String message, int partition, boolean withNullKeyRecord) { return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(), - message, partition, null, withNullKeyRecord); + message, partition, null, withNullKeyRecord); } /** @@ -195,13 +195,13 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, List future = adminClientFacade.createTopic(topic, 2, 1); String value = "message-value"; + String key = "my-key"; JsonArray records = new JsonArray(); From 5df4dff8928e0eb42e0270ac8176f9488251c9b1 Mon Sep 17 00:00:00 2001 From: Antonio Pedro Date: Thu, 4 Jul 2024 23:29:23 +0530 Subject: [PATCH 10/13] added ConsumerIT tests Signed-off-by: Antonio Pedro --- .../strimzi/kafka/bridge/clients/BasicKafkaClient.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java b/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java index 9c510adb..ac55b1b4 100644 --- a/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java +++ b/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java @@ -46,7 +46,6 @@ public BasicKafkaClient(String bootstrapServer) { * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ - @SuppressWarnings("checkstyle:ParameterNumber") public int sendStringMessagesPlain(long timeoutMs, String topicName, int messageCount, List headers, String message, int partition, Long timestamp, boolean withNullKeyRecord) { CompletableFuture resultPromise = new CompletableFuture<>(); @@ -119,7 +118,6 @@ public int sendStringMessagesPlain(String topicName, String message, int message * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ - @SuppressWarnings("checkstyle:ParameterNumber") public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCount, List headers, String message, int partition, Long timestamp, boolean withNullKeyRecord) { CompletableFuture resultPromise = new CompletableFuture<>(); @@ -172,7 +170,7 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, String mess */ public int sendJsonMessagesPlain(String topicName, int messageCount, String message, int partition, boolean withNullKeyRecord) { return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(), - message, partition, null, withNullKeyRecord); + message, partition,null, withNullKeyRecord); } /** @@ -187,7 +185,7 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, String mess */ public int sendJsonMessagesPlain(String topicName, int messageCount, List headers, String message, boolean withNullKeyRecord) { - return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, headers, message, 0, null, withNullKeyRecord); + return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, headers, message, 0,null, withNullKeyRecord); } /** @@ -201,7 +199,7 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, List Date: Thu, 4 Jul 2024 23:48:29 +0530 Subject: [PATCH 11/13] SuppressWarnings checkstyle's ParameterNumber rule Signed-off-by: Antonio Pedro --- .../strimzi/kafka/bridge/clients/BasicKafkaClient.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java b/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java index ac55b1b4..9c510adb 100644 --- a/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java +++ b/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java @@ -46,6 +46,7 @@ public BasicKafkaClient(String bootstrapServer) { * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ + @SuppressWarnings("checkstyle:ParameterNumber") public int sendStringMessagesPlain(long timeoutMs, String topicName, int messageCount, List headers, String message, int partition, Long timestamp, boolean withNullKeyRecord) { CompletableFuture resultPromise = new CompletableFuture<>(); @@ -118,6 +119,7 @@ public int sendStringMessagesPlain(String topicName, String message, int message * @param withNullKeyRecord boolean, which allowing sending messages with NULL key * @return sent message count */ + @SuppressWarnings("checkstyle:ParameterNumber") public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCount, List headers, String message, int partition, Long timestamp, boolean withNullKeyRecord) { CompletableFuture resultPromise = new CompletableFuture<>(); @@ -170,7 +172,7 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, String mess */ public int sendJsonMessagesPlain(String topicName, int messageCount, String message, int partition, boolean withNullKeyRecord) { return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(), - message, partition,null, withNullKeyRecord); + message, partition, null, withNullKeyRecord); } /** @@ -185,7 +187,7 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, String mess */ public int sendJsonMessagesPlain(String topicName, int messageCount, List headers, String message, boolean withNullKeyRecord) { - return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, headers, message, 0,null, withNullKeyRecord); + return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, headers, message, 0, null, withNullKeyRecord); } /** @@ -199,7 +201,7 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, List Date: Tue, 23 Jul 2024 03:41:37 +0530 Subject: [PATCH 12/13] added CHANGELOG and fix nits Signed-off-by: Antonio Pedro --- CHANGELOG.md | 3 ++ .../bridge/clients/BasicKafkaClient.java | 34 +++++++++---------- .../strimzi/kafka/bridge/http/ProducerIT.java | 1 + 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2bf66932..7d9116c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ ## 0.30.0 * Dependency updates (Kafka 3.7.1, Prometheus JMX Collector 1.0.1, Prometheus Client 1.3.1) +* added support for message timestamp. + * Implemented support for interpreting the timestamp parameter in `ProducerRecord` objects sent to Kafka topics via the bridge. + * Allow users to read the `ConsumerRecord` timestamp on the request's response. ## 0.29.0 diff --git a/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java b/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java index 9c510adb..cfcb691d 100644 --- a/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java +++ b/src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java @@ -61,10 +61,10 @@ public int sendStringMessagesPlain(long timeoutMs, String topicName, int message properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name); try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition, timestamp) - .withProperties(properties) - .withHeaders(headers) - .withNullKeyRecord(withNullKeyRecord) - .build()) { + .withProperties(properties) + .withHeaders(headers) + .withNullKeyRecord(withNullKeyRecord) + .build()) { plainProducer.getVertx().deployVerticle(plainProducer); @@ -84,7 +84,7 @@ public int sendStringMessagesPlain(long timeoutMs, String topicName, int message */ public int sendStringMessagesPlain(String topicName, int messageCount) { return sendStringMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, - List.of(), "\"Hello\" : \"World\"", 0, null, false); + List.of(), "\"Hello\" : \"World\"", 0, null, false); } /** @@ -93,7 +93,7 @@ public int sendStringMessagesPlain(String topicName, int messageCount) { * @param topicName topic name where messages are send * @param message content to be sent * @param messageCount message count - * @param partition partition, which will be selected + * @param partition partition, which will be selected * @return sent message count */ public int sendStringMessagesPlain(String topicName, String message, int messageCount, int partition) { @@ -133,10 +133,10 @@ public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCo properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "producer-sender-plain-" + new Random().nextInt(Integer.MAX_VALUE)); properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name); try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition, timestamp) - .withProperties(properties) - .withHeaders(headers) - .withNullKeyRecord(withNullKeyRecord) - .build()) { + .withProperties(properties) + .withHeaders(headers) + .withNullKeyRecord(withNullKeyRecord) + .build()) { plainProducer.getVertx().deployVerticle(plainProducer); return plainProducer.getResultPromise().get(timeoutMs, TimeUnit.MILLISECONDS); @@ -172,7 +172,7 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, String mess */ public int sendJsonMessagesPlain(String topicName, int messageCount, String message, int partition, boolean withNullKeyRecord) { return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(), - message, partition, null, withNullKeyRecord); + message, partition, null, withNullKeyRecord); } /** @@ -195,13 +195,13 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, List future = adminClientFacade.createTopic(topic, 2, 1); String value = "message-value"; + String key = "my-key"; JsonArray records = new JsonArray(); From c71d2d0d3cb0b01e2093b5e152d15ddedd993df3 Mon Sep 17 00:00:00 2001 From: Paolo Patierno Date: Tue, 23 Jul 2024 09:20:31 +0200 Subject: [PATCH 13/13] Minor refactoring on the CHANGELOG Signed-off-by: Paolo Patierno --- CHANGELOG.md | 6 +++--- src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d9116c4..07fa6da8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,9 @@ ## 0.30.0 * Dependency updates (Kafka 3.7.1, Prometheus JMX Collector 1.0.1, Prometheus Client 1.3.1) -* added support for message timestamp. - * Implemented support for interpreting the timestamp parameter in `ProducerRecord` objects sent to Kafka topics via the bridge. - * Allow users to read the `ConsumerRecord` timestamp on the request's response. +* Added support for message timestamp. + * Setting the timestamp on a message sent via the `send` API. + * Getting the timestamp on receiving a message via the `poll` API. ## 0.29.0 diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java index 7ececc1b..252f9519 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java @@ -208,7 +208,6 @@ void sendSimpleMessageWithKey(VertxTestContext context) throws InterruptedExcept KafkaFuture future = adminClientFacade.createTopic(topic, 2, 1); String value = "message-value"; - String key = "my-key"; JsonArray records = new JsonArray();