diff --git a/CHANGELOG.md b/CHANGELOG.md index dbab2fd15..4c28b3db7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * Dependency updates (Kafka 3.7.0, Kubernetes configuration provider 1.1.2, Vert.x 4.5.4, Netty 4.1.107.Final, Jackson FasterXML 2.16.1, Micrometer 1.12.3) * Fixed missing messaging semantic attributes to the Kafka consumer spans +* Introduced a new text embedded format to send and receive plain strings for record key and value. ## 0.27.0 diff --git a/documentation/book/api/paths.adoc b/documentation/book/api/paths.adoc index 501e7fa47..d6be23541 100644 --- a/documentation/book/api/paths.adoc +++ b/documentation/book/api/paths.adoc @@ -616,6 +616,7 @@ __optional__|The maximum amount of time, in milliseconds, that the HTTP Bridge s * `application/vnd.kafka.json.v2+json` * `application/vnd.kafka.binary.v2+json` +* `application/vnd.kafka.text.v2+json` * `application/vnd.kafka.v2+json` @@ -1046,6 +1047,7 @@ __required__||<<_producerrecordlist,ProducerRecordList>> * `application/vnd.kafka.json.v2+json` * `application/vnd.kafka.binary.v2+json` +* `application/vnd.kafka.text.v2+json` ==== Produces @@ -1312,6 +1314,7 @@ __required__|List of records to send to a given topic partition, including a val * `application/vnd.kafka.json.v2+json` * `application/vnd.kafka.binary.v2+json` +* `application/vnd.kafka.text.v2+json` ==== Produces diff --git a/documentation/modules/con-requests-kafka-bridge.adoc b/documentation/modules/con-requests-kafka-bridge.adoc index 40b60557d..63049007a 100644 --- a/documentation/modules/con-requests-kafka-bridge.adoc +++ b/documentation/modules/con-requests-kafka-bridge.adoc @@ -19,7 +19,7 @@ API request and response bodies are always encoded as JSON. Content-Type: application/vnd.kafka.v2+json ---- -* When performing producer operations, `POST` requests must provide `Content-Type` headers specifying the _embedded data format_ of the messages produced. This can be either `json` or `binary`. +* When performing producer operations, `POST` requests must provide `Content-Type` headers specifying the _embedded data format_ of the messages produced. This can be either `json`, `binary` or `text`. + [cols="35,65",options="header",stripes="none",separator=¦] |=== @@ -33,6 +33,9 @@ m¦Content-Type: application/vnd.kafka.json.v2+json ¦Binary m¦Content-Type: application/vnd.kafka.binary.v2+json +¦Text +m¦Content-Type: application/vnd.kafka.text.v2+json + |=== The embedded data format is set per consumer, as described in the next section. @@ -42,9 +45,9 @@ An empty body can be used to create a consumer with the default values. == Embedded data format -The embedded data format is the format of the Kafka messages that are transmitted, over HTTP, from a producer to a consumer using the Kafka Bridge. Two embedded data formats are supported: JSON and binary. +The embedded data format is the format of the Kafka messages that are transmitted, over HTTP, from a producer to a consumer using the Kafka Bridge. Three embedded data formats are supported: JSON, binary and text. -When creating a consumer using the `/consumers/_groupid_` endpoint, the `POST` request body must specify an embedded data format of either JSON or binary. This is specified in the `format` field, for example: +When creating a consumer using the `/consumers/_groupid_` endpoint, the `POST` request body must specify an embedded data format of either JSON, binary or text. This is specified in the `format` field, for example: [source,json,subs="attributes+"] ---- @@ -109,6 +112,8 @@ curl -X POST \ ---- <1> The header value in binary format and encoded as Base64. +Please note that if your consumer is configured to use the text embedded data format, the `value` and `key` field in the `records` parameter must be a string and not a JSON object. + == Accept headers After creating a consumer, all subsequent GET requests must provide an `Accept` header in the following format: @@ -118,7 +123,7 @@ After creating a consumer, all subsequent GET requests must provide an `Accept` Accept: application/vnd.kafka._EMBEDDED-DATA-FORMAT_.v2+json ---- -The `EMBEDDED-DATA-FORMAT` is either `json` or `binary`. +The `EMBEDDED-DATA-FORMAT` is either `json`, `binary` or `text`. For example, when retrieving records for a subscribed consumer using an embedded data format of JSON, include this Accept header: diff --git a/src/main/java/io/strimzi/kafka/bridge/BridgeContentType.java b/src/main/java/io/strimzi/kafka/bridge/BridgeContentType.java index 39fe2019f..f436c9400 100644 --- a/src/main/java/io/strimzi/kafka/bridge/BridgeContentType.java +++ b/src/main/java/io/strimzi/kafka/bridge/BridgeContentType.java @@ -16,6 +16,9 @@ public class BridgeContentType { /** JSON encoding with BINARY embedded format */ public static final String KAFKA_JSON_BINARY = "application/vnd.kafka.binary.v2+json"; + /** JSON encoding with TEXT embedded format */ + public static final String KAFKA_JSON_TEXT = "application/vnd.kafka.text.v2+json"; + /** Specific Kafka JSON encoding */ public static final String KAFKA_JSON = "application/vnd.kafka.v2+json"; diff --git a/src/main/java/io/strimzi/kafka/bridge/EmbeddedFormat.java b/src/main/java/io/strimzi/kafka/bridge/EmbeddedFormat.java index 680b86d06..16de02de2 100644 --- a/src/main/java/io/strimzi/kafka/bridge/EmbeddedFormat.java +++ b/src/main/java/io/strimzi/kafka/bridge/EmbeddedFormat.java @@ -14,7 +14,10 @@ public enum EmbeddedFormat { BINARY, /** Define "json" data as embedded format */ - JSON; + JSON, + + /** Define "text" data as embedded format */ + TEXT; /** * Convert the String value in the corresponding enum @@ -28,6 +31,8 @@ public static EmbeddedFormat from(String value) { return JSON; case "binary": return BINARY; + case "text": + return TEXT; } throw new IllegalEmbeddedFormatException("Invalid format type."); } diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java index 77cbe9360..733a92a33 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java @@ -638,6 +638,8 @@ private EmbeddedFormat contentTypeToFormat(String contentType) { return EmbeddedFormat.BINARY; case BridgeContentType.KAFKA_JSON_JSON: return EmbeddedFormat.JSON; + case BridgeContentType.KAFKA_JSON_TEXT: + return EmbeddedFormat.TEXT; } throw new IllegalArgumentException(contentType); } diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java index 6c97ad4db..b54cb0eaa 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java @@ -20,6 +20,7 @@ import io.strimzi.kafka.bridge.converter.MessageConverter; import io.strimzi.kafka.bridge.http.converter.HttpBinaryMessageConverter; import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter; +import io.strimzi.kafka.bridge.http.converter.HttpTextMessageConverter; import io.strimzi.kafka.bridge.http.converter.JsonDecodeException; import io.strimzi.kafka.bridge.http.converter.JsonUtils; import io.strimzi.kafka.bridge.http.model.HttpBridgeError; @@ -305,9 +306,8 @@ private void pollHandler(ConsumerRecords records, Throwable ex, RoutingCon BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson())); } else { responseStatus = HttpResponseStatus.OK; - HttpUtils.sendResponse(routingContext, responseStatus.code(), - this.format == EmbeddedFormat.BINARY ? BridgeContentType.KAFKA_JSON_BINARY : BridgeContentType.KAFKA_JSON_JSON, - buffer); + + HttpUtils.sendResponse(routingContext, responseStatus.code(), getContentType(), buffer); } } catch (JsonDecodeException e) { LOGGER.error("Error decoding records as JSON", e); @@ -614,16 +614,32 @@ private MessageConverter buildMessageConverter() { return (MessageConverter) new HttpJsonMessageConverter(); case BINARY: return (MessageConverter) new HttpBinaryMessageConverter(); + case TEXT: + return (MessageConverter) new HttpTextMessageConverter(); } return null; } + private String getContentType() { + switch (this.format) { + case JSON: + return BridgeContentType.KAFKA_JSON_JSON; + case BINARY: + return BridgeContentType.KAFKA_JSON_BINARY; + case TEXT: + return BridgeContentType.KAFKA_JSON_TEXT; + } + throw new IllegalArgumentException(); + } + private boolean checkAcceptedBody(String accept) { switch (accept) { case BridgeContentType.KAFKA_JSON_JSON: return format == EmbeddedFormat.JSON; case BridgeContentType.KAFKA_JSON_BINARY: return format == EmbeddedFormat.BINARY; + case BridgeContentType.KAFKA_JSON_TEXT: + return format == EmbeddedFormat.TEXT; } return false; } diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java index 49282f36b..e75196cab 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java @@ -16,6 +16,7 @@ import io.strimzi.kafka.bridge.converter.MessageConverter; import io.strimzi.kafka.bridge.http.converter.HttpBinaryMessageConverter; import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter; +import io.strimzi.kafka.bridge.http.converter.HttpTextMessageConverter; import io.strimzi.kafka.bridge.http.converter.JsonUtils; import io.strimzi.kafka.bridge.http.model.HttpBridgeError; import io.strimzi.kafka.bridge.http.model.HttpBridgeResult; @@ -228,6 +229,8 @@ private MessageConverter buildMessageConverter(String cont return (MessageConverter) new HttpJsonMessageConverter(); case BridgeContentType.KAFKA_JSON_BINARY: return (MessageConverter) new HttpBinaryMessageConverter(); + case BridgeContentType.KAFKA_JSON_TEXT: + return (MessageConverter) new HttpTextMessageConverter(); } return null; } diff --git a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java new file mode 100644 index 000000000..704190197 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java @@ -0,0 +1,117 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.bridge.http.converter; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import io.strimzi.kafka.bridge.converter.MessageConverter; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import javax.xml.bind.DatatypeConverter; +import java.util.ArrayList; +import java.util.List; + +/** + * Implementation of a message converter to deal with the "text" embedded data format + */ +@SuppressWarnings("checkstyle:NPathComplexity") +public class HttpTextMessageConverter implements MessageConverter { + @Override + public ProducerRecord toKafkaRecord(String kafkaTopic, Integer partition, byte[] message) { + + Integer partitionFromBody = null; + byte[] key = null; + byte[] value = null; + Headers headers = new RecordHeaders(); + + JsonNode json = JsonUtils.bytesToJson(message); + + if (!json.isEmpty()) { + if (json.has("key")) { + key = json.get("key").asText().getBytes(); + } + if (json.has("value")) { + JsonNode valueNode = json.get("value"); + if (!valueNode.isTextual()) { + throw new IllegalStateException("Because the embedded format is 'text', the value must be a string"); + } + value = valueNode.asText().getBytes(); + } + if (json.has("headers")) { + ArrayNode jsonArray = (ArrayNode) json.get("headers"); + for (JsonNode jsonObject : jsonArray) { + headers.add(new RecordHeader(jsonObject.get("key").asText(), DatatypeConverter.parseBase64Binary(jsonObject.get("value").asText()))); + } + } + if (json.has("partition")) { + partitionFromBody = json.get("partition").asInt(); + } + if (partition != null && partitionFromBody != null) { + throw new IllegalStateException("Partition specified in body and in request path"); + } + if (partition != null) { + partitionFromBody = partition; + } + } + return new ProducerRecord<>(kafkaTopic, partitionFromBody, key, value, headers); + } + + @Override + public List> toKafkaRecords(String kafkaTopic, Integer partition, byte[] messages) { + + List> records = new ArrayList<>(); + + JsonNode json = JsonUtils.bytesToJson(messages); + ArrayNode jsonArray = (ArrayNode) json.get("records"); + + for (JsonNode jsonObj : jsonArray) { + records.add(toKafkaRecord(kafkaTopic, partition, JsonUtils.jsonToBytes(jsonObj))); + } + return records; + } + + @Override + public byte[] toMessage(String address, ConsumerRecord record) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] toMessages(ConsumerRecords records) { + ArrayNode jsonArray = JsonUtils.createArrayNode(); + + for (ConsumerRecord record : records) { + ObjectNode jsonObject = JsonUtils.createObjectNode(); + + jsonObject.set("topic", new TextNode(record.topic())); + jsonObject.set("key", record.key() != null ? new TextNode(new String(record.key())) : null); + jsonObject.set("value", record.value() != null ? new TextNode(new String(record.value())) : null); + jsonObject.put("partition", record.partition()); + jsonObject.put("offset", record.offset()); + + ArrayNode headers = JsonUtils.createArrayNode(); + + for (Header kafkaHeader : record.headers()) { + ObjectNode header = JsonUtils.createObjectNode(); + + header.set("key", new TextNode(kafkaHeader.key())); + header.put("value", DatatypeConverter.printBase64Binary(kafkaHeader.value())); + headers.add(header); + } + if (!headers.isEmpty()) { + jsonObject.set("headers", headers); + } + jsonArray.add(jsonObject); + } + return JsonUtils.jsonToBytes(jsonArray); + } +} \ No newline at end of file diff --git a/src/main/resources/openapi.json b/src/main/resources/openapi.json index f6a0862d8..696e67fb5 100644 --- a/src/main/resources/openapi.json +++ b/src/main/resources/openapi.json @@ -652,6 +652,11 @@ "schema": { "$ref": "#/components/schemas/ProducerRecordList" } + }, + "application/vnd.kafka.text.v2+json": { + "schema": { + "$ref": "#/components/schemas/ProducerRecordList" + } } }, "required": true @@ -799,6 +804,11 @@ } } }, + "application/vnd.kafka.text.v2+json": { + "schema": { + "$ref": "#/components/schemas/ConsumerRecordList" + } + }, "application/vnd.kafka.v2+json": { "schema": { "$ref": "#/components/schemas/ConsumerRecordList" @@ -819,6 +829,11 @@ "$ref": "#/components/schemas/Error" } }, + "application/vnd.kafka.text.v2+json": { + "schema": { + "$ref": "#/components/schemas/Error" + } + }, "application/vnd.kafka.v2+json": { "schema": { "$ref": "#/components/schemas/Error" @@ -847,6 +862,11 @@ "$ref": "#/components/schemas/Error" } }, + "application/vnd.kafka.text.v2+json": { + "schema": { + "$ref": "#/components/schemas/Error" + } + }, "application/vnd.kafka.v2+json": { "schema": { "$ref": "#/components/schemas/Error" @@ -875,6 +895,11 @@ "$ref": "#/components/schemas/Error" } }, + "application/vnd.kafka.text.v2+json": { + "schema": { + "$ref": "#/components/schemas/Error" + } + }, "application/vnd.kafka.v2+json": { "schema": { "$ref": "#/components/schemas/Error" @@ -1041,6 +1066,11 @@ "schema": { "$ref": "#/components/schemas/ProducerRecordToPartitionList" } + }, + "application/vnd.kafka.text.v2+json": { + "schema": { + "$ref": "#/components/schemas/ProducerRecordToPartitionList" + } } }, "required": true diff --git a/src/main/resources/openapiv2.json b/src/main/resources/openapiv2.json index 39d59ad81..f9a3e7ffd 100644 --- a/src/main/resources/openapiv2.json +++ b/src/main/resources/openapiv2.json @@ -602,7 +602,8 @@ "operationId": "send", "consumes": [ "application/vnd.kafka.json.v2+json", - "application/vnd.kafka.binary.v2+json" + "application/vnd.kafka.binary.v2+json", + "application/vnd.kafka.text.v2+json" ], "produces": [ "application/vnd.kafka.v2+json" @@ -695,6 +696,7 @@ "produces": [ "application/vnd.kafka.json.v2+json", "application/vnd.kafka.binary.v2+json", + "application/vnd.kafka.text.v2+json", "application/vnd.kafka.v2+json" ], "responses": { @@ -935,7 +937,8 @@ "operationId": "sendToPartition", "consumes": [ "application/vnd.kafka.json.v2+json", - "application/vnd.kafka.binary.v2+json" + "application/vnd.kafka.binary.v2+json", + "application/vnd.kafka.text.v2+json" ], "produces": [ "application/vnd.kafka.v2+json" diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java index 91cb14e45..1bd287e55 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java @@ -533,6 +533,60 @@ void receiveSimpleMessage(VertxTestContext context) throws InterruptedException, assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true)); } + @Test + void receiveTextMessage(VertxTestContext context) throws InterruptedException, ExecutionException, TimeoutException { + KafkaFuture future = adminClientFacade.createTopic(topic); + + future.get(); + String sentBody = "Simple message"; + basicKafkaClient.sendStringMessagesPlain(topic, sentBody, 1, 0, true); + + JsonObject json = new JsonObject(); + json.put("name", name); + json.put("format", "text"); + + // create consumer + // subscribe to a topic + consumerService() + .createConsumer(context, groupId, json) + .subscribeConsumer(context, groupId, name, topic); + + CompletableFuture consume = new CompletableFuture<>(); + // consume records + consumerService() + .consumeRecordsRequest(groupId, name, BridgeContentType.KAFKA_JSON_TEXT) + .as(BodyCodec.jsonArray()) + .send(ar -> { + context.verify(() -> { + assertThat(ar.succeeded(), is(true)); + HttpResponse response = ar.result(); + assertThat(response.statusCode(), is(HttpResponseStatus.OK.code())); + JsonObject jsonResponse = response.body().getJsonObject(0); + + String kafkaTopic = jsonResponse.getString("topic"); + int kafkaPartition = jsonResponse.getInteger("partition"); + String key = jsonResponse.getString("key"); + String value = jsonResponse.getString("value"); + long offset = jsonResponse.getLong("offset"); + + assertThat(kafkaTopic, is(topic)); + assertThat(value, is(sentBody)); + assertThat(offset, is(0L)); + assertThat(kafkaPartition, notNullValue()); + assertThat(key, nullValue()); + }); + consume.complete(true); + }); + + consume.get(TEST_TIMEOUT, TimeUnit.SECONDS); + + // consumer deletion + consumerService() + .deleteConsumer(context, groupId, name); + context.completeNow(); + assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true)); + } + @Test void receiveSimpleMessageWithHeaders(VertxTestContext context) throws InterruptedException, ExecutionException, TimeoutException { KafkaFuture future = adminClientFacade.createTopic(topic, 1, 1); diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java index 0fb49a862..3bc22e7d0 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java @@ -244,6 +244,53 @@ void sendBinaryMessageWithKey(VertxTestContext context) throws InterruptedExcept }); } + @Test + void sendTextMessage(VertxTestContext context) throws InterruptedException, ExecutionException { + KafkaFuture future = adminClientFacade.createTopic(topic); + + String value = "message-value"; + + JsonArray records = new JsonArray(); + JsonObject json = new JsonObject(); + json.put("value", value); + records.add(json); + + JsonObject root = new JsonObject(); + root.put("records", records); + + future.get(); + + producerService() + .sendRecordsRequest(topic, root, BridgeContentType.KAFKA_JSON_TEXT) + .sendJsonObject(root, verifyOK(context)); + + Properties consumerProperties = Consumer.fillDefaultProperties(); + consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUri); + + KafkaConsumer consumer = KafkaConsumer.create(vertx, consumerProperties, + new StringDeserializer(), new StringDeserializer()); + consumer.handler(record -> { + context.verify(() -> { + assertThat(record.value(), is(value)); + assertThat(record.topic(), is(topic)); + assertThat(record.partition(), is(0)); + assertThat(record.offset(), is(0L)); + assertThat(record.key(), nullValue()); + }); + LOGGER.info("Message consumed topic={} partition={} offset={}, key={}, value={}", + record.topic(), record.partition(), record.offset(), record.key(), record.value()); + consumer.close(); + context.completeNow(); + }); + + consumer.subscribe(topic, done -> { + if (!done.succeeded()) { + context.failNow(done.cause()); + } + }); + assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true)); + } + @Test void sendSimpleMessageWithHeaders(VertxTestContext context) throws ExecutionException, InterruptedException { KafkaFuture future = adminClientFacade.createTopic(topic, 2, 1); @@ -461,6 +508,38 @@ void emptyRecordTest(VertxTestContext context) throws InterruptedException, Exec }); } + @Test + void sendTextMessageWithWrongValue(VertxTestContext context) throws InterruptedException, ExecutionException { + KafkaFuture future = adminClientFacade.createTopic(topic); + + JsonObject value = new JsonObject().put("message", "Hi, This is kafka bridge"); + + JsonArray records = new JsonArray(); + JsonObject json = new JsonObject(); + json.put("value", value); + records.add(json); + + JsonObject root = new JsonObject(); + root.put("records", records); + + future.get(); + + // produce and check the status code + producerService() + .sendRecordsRequest(topic, root, BridgeContentType.KAFKA_JSON_TEXT) + .sendJsonObject(root, ar -> { + context.verify(() -> { + assertThat(ar.succeeded(), is(true)); + HttpResponse response = ar.result(); + HttpBridgeError error = HttpBridgeError.fromJson(response.body()); + assertThat(response.statusCode(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); + assertThat(error.getCode(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); + assertThat(error.getMessage(), is("Because the embedded format is 'text', the value must be a string")); + }); + context.completeNow(); + }); + } + @Test void sendMessageWithNullValueTest(VertxTestContext context) throws InterruptedException, ExecutionException { String topic = "sendMessageWithNullValueTest";