Skip to content

Commit a3af578

Browse files
add CHANGELOG and fix nits
1 parent ea04500 commit a3af578

File tree

3 files changed

+25
-22
lines changed

3 files changed

+25
-22
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
## 0.30.0
44

55
* Dependency updates (Prometheus JMX Collector 1.0.1, Prometheus Client 1.3.1)
6+
* Added support for message timestamp.
7+
1. Implemented support for interpreting the timestamp parameter in `ProducerRecord`objects sent to Kafka topics via the bridge.
8+
2. Allow users to read the `ConsumerRecord`'s timestamp on the request's response.
69

710
## 0.29.0
811

src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public ProducerRecord<byte[], byte[]> toKafkaRecord(String kafkaTopic, Integer p
5050
}
5151
if (json.has("headers")) {
5252
ArrayNode jsonArray = (ArrayNode) json.get("headers");
53-
for (JsonNode jsonObject : jsonArray) {
53+
for (JsonNode jsonObject: jsonArray) {
5454
headers.add(new RecordHeader(jsonObject.get("key").asText(), DatatypeConverter.parseBase64Binary(jsonObject.get("value").asText())));
5555
}
5656
}
@@ -106,7 +106,7 @@ public byte[] toMessages(ConsumerRecords<byte[], byte[]> records) {
106106

107107
ArrayNode headers = JsonUtils.createArrayNode();
108108

109-
for (Header kafkaHeader : record.headers()) {
109+
for (Header kafkaHeader: record.headers()) {
110110
ObjectNode header = JsonUtils.createObjectNode();
111111

112112
header.put("key", kafkaHeader.key());

src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java

+20-20
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,10 @@ public int sendStringMessagesPlain(long timeoutMs, String topicName, int message
6161
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name);
6262

6363
try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition, timestamp)
64-
.withProperties(properties)
65-
.withHeaders(headers)
66-
.withNullKeyRecord(withNullKeyRecord)
67-
.build()) {
64+
.withProperties(properties)
65+
.withHeaders(headers)
66+
.withNullKeyRecord(withNullKeyRecord)
67+
.build()) {
6868

6969
plainProducer.getVertx().deployVerticle(plainProducer);
7070

@@ -84,7 +84,7 @@ public int sendStringMessagesPlain(long timeoutMs, String topicName, int message
8484
*/
8585
public int sendStringMessagesPlain(String topicName, int messageCount) {
8686
return sendStringMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount,
87-
List.of(), "\"Hello\" : \"World\"", 0, null, false);
87+
List.of(), "\"Hello\" : \"World\"", 0, null, false);
8888
}
8989

9090
/**
@@ -93,17 +93,17 @@ public int sendStringMessagesPlain(String topicName, int messageCount) {
9393
* @param topicName topic name where messages are send
9494
* @param message content to be sent
9595
* @param messageCount message count
96-
* @param partition partition, which will be selected
96+
* @param partition partition, which will be selected
9797
* @return sent message count
9898
*/
9999
public int sendStringMessagesPlain(String topicName, String message, int messageCount, int partition) {
100100
return sendStringMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount,
101-
List.of(), message, partition, null, true);
101+
List.of(), message, partition, null, true);
102102
}
103103

104104
public int sendStringMessagesPlain(String topicName, String message, int messageCount, int partition, boolean withNullKeyRecord) {
105105
return sendStringMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount,
106-
List.of(), message, partition, null, withNullKeyRecord);
106+
List.of(), message, partition, null, withNullKeyRecord);
107107
}
108108

109109
/**
@@ -133,10 +133,10 @@ public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCo
133133
properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "producer-sender-plain-" + new Random().nextInt(Integer.MAX_VALUE));
134134
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name);
135135
try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition, timestamp)
136-
.withProperties(properties)
137-
.withHeaders(headers)
138-
.withNullKeyRecord(withNullKeyRecord)
139-
.build()) {
136+
.withProperties(properties)
137+
.withHeaders(headers)
138+
.withNullKeyRecord(withNullKeyRecord)
139+
.build()) {
140140
plainProducer.getVertx().deployVerticle(plainProducer);
141141

142142
return plainProducer.getResultPromise().get(timeoutMs, TimeUnit.MILLISECONDS);
@@ -157,7 +157,7 @@ public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCo
157157
*/
158158
public int sendJsonMessagesPlain(String topicName, int messageCount, String message, Long timestamp) {
159159
return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(),
160-
message, 0, timestamp, false);
160+
message, 0, timestamp, false);
161161
}
162162

163163
/**
@@ -172,7 +172,7 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, String mess
172172
*/
173173
public int sendJsonMessagesPlain(String topicName, int messageCount, String message, int partition, boolean withNullKeyRecord) {
174174
return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(),
175-
message, partition, null, withNullKeyRecord);
175+
message, partition, null, withNullKeyRecord);
176176
}
177177

178178
/**
@@ -195,26 +195,26 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, List<KafkaH
195195
*
196196
* @param topicName topic name where messages are send
197197
* @param messageCount message count
198-
* @param message specific message to send
199-
* @param partition partition count, how many shards/partitions will topic have
198+
* @param message specific message to send
199+
* @param partition partition count, how many shards/partitions will topic have
200200
* @return sent message count
201201
*/
202202
public int sendJsonMessagesPlain(String topicName, int messageCount, String message, int partition) {
203203
return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(),
204-
message, partition, null, false);
204+
message, partition, null, false);
205205
}
206206

207207
/**
208208
* Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting
209209
*
210210
* @param topicName topic name where messages are send
211211
* @param messageCount message count
212-
* @param message specific message to send
212+
* @param message specific message to send
213213
* @return sent message count
214214
*/
215215
public int sendJsonMessagesPlain(String topicName, int messageCount, String message) {
216216
return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(),
217-
message, 0, null, false);
217+
message, 0, null, false);
218218
}
219219

220220
/**
@@ -226,7 +226,7 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, String mess
226226
*/
227227
public int sendJsonMessagesPlain(String topicName, int messageCount) {
228228
return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(),
229-
"{\"Hello\" : \"World\"}", 0, null, false);
229+
"{\"Hello\" : \"World\"}", 0, null, false);
230230
}
231231

232232
/**

0 commit comments

Comments
 (0)