Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added a new text embedded format #858

Merged
merged 17 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions documentation/book/api/paths.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,7 @@ __optional__|The maximum amount of time, in milliseconds, that the HTTP Bridge s

* `application/vnd.kafka.json.v2+json`
* `application/vnd.kafka.binary.v2+json`
* `application/vnd.kafka.text.v2+json`
* `application/vnd.kafka.v2+json`


Expand Down Expand Up @@ -1046,6 +1047,7 @@ __required__||<<_producerrecordlist,ProducerRecordList>>

* `application/vnd.kafka.json.v2+json`
* `application/vnd.kafka.binary.v2+json`
* `application/vnd.kafka.text.v2+json`


==== Produces
Expand Down Expand Up @@ -1312,6 +1314,7 @@ __required__|List of records to send to a given topic partition, including a val

* `application/vnd.kafka.json.v2+json`
* `application/vnd.kafka.binary.v2+json`
* `application/vnd.kafka.text.v2+json`


==== Produces
Expand Down
9 changes: 6 additions & 3 deletions documentation/modules/con-requests-kafka-bridge.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ API request and response bodies are always encoded as JSON.
Content-Type: application/vnd.kafka.v2+json
----

* When performing producer operations, `POST` requests must provide `Content-Type` headers specifying the _embedded data format_ of the messages produced. This can be either `json` or `binary`.
* When performing producer operations, `POST` requests must provide `Content-Type` headers specifying the _embedded data format_ of the messages produced. This can be either `json`, `binary` or `text`.
+
[cols="35,65",options="header",stripes="none",separator=¦]
|===
Expand All @@ -33,6 +33,9 @@ m¦Content-Type: application/vnd.kafka.json.v2+json
¦Binary
m¦Content-Type: application/vnd.kafka.binary.v2+json

¦Text
m¦Content-Type: application/vnd.kafka.text.v2+json

|===

The embedded data format is set per consumer, as described in the next section.
Expand All @@ -42,9 +45,9 @@ An empty body can be used to create a consumer with the default values.

== Embedded data format

The embedded data format is the format of the Kafka messages that are transmitted, over HTTP, from a producer to a consumer using the Kafka Bridge. Two embedded data formats are supported: JSON and binary.
The embedded data format is the format of the Kafka messages that are transmitted, over HTTP, from a producer to a consumer using the Kafka Bridge. Three embedded data formats are supported: JSON, binary and text.

When creating a consumer using the `/consumers/_groupid_` endpoint, the `POST` request body must specify an embedded data format of either JSON or binary. This is specified in the `format` field, for example:
When creating a consumer using the `/consumers/_groupid_` endpoint, the `POST` request body must specify an embedded data format of either JSON, binary or text. This is specified in the `format` field, for example:

[source,json,subs="attributes+"]
----
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/strimzi/kafka/bridge/BridgeContentType.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ public class BridgeContentType {
/** JSON encoding with BINARY embedded format */
public static final String KAFKA_JSON_BINARY = "application/vnd.kafka.binary.v2+json";

/** JSON encoding with TEXT embedded format */
public static final String KAFKA_JSON_TEXT = "application/vnd.kafka.text.v2+json";

/** Specific Kafka JSON encoding */
public static final String KAFKA_JSON = "application/vnd.kafka.v2+json";

Expand Down
7 changes: 6 additions & 1 deletion src/main/java/io/strimzi/kafka/bridge/EmbeddedFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ public enum EmbeddedFormat {
BINARY,

/** Define "json" data as embedded format */
JSON;
JSON,

/** Define "text" data as embedded format */
TEXT;

/**
* Convert the String value in the corresponding enum
Expand All @@ -28,6 +31,8 @@ public static EmbeddedFormat from(String value) {
return JSON;
case "binary":
return BINARY;
case "text":
return TEXT;
}
throw new IllegalEmbeddedFormatException("Invalid format type.");
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,8 @@ private EmbeddedFormat contentTypeToFormat(String contentType) {
return EmbeddedFormat.BINARY;
case BridgeContentType.KAFKA_JSON_JSON:
return EmbeddedFormat.JSON;
case BridgeContentType.KAFKA_JSON_TEXT:
return EmbeddedFormat.TEXT;
}
throw new IllegalArgumentException(contentType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.strimzi.kafka.bridge.converter.MessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpBinaryMessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpTextMessageConverter;
import io.strimzi.kafka.bridge.http.converter.JsonDecodeException;
import io.strimzi.kafka.bridge.http.converter.JsonUtils;
import io.strimzi.kafka.bridge.http.model.HttpBridgeError;
Expand Down Expand Up @@ -305,9 +306,8 @@ private void pollHandler(ConsumerRecords<K, V> records, Throwable ex, RoutingCon
BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson()));
} else {
responseStatus = HttpResponseStatus.OK;
HttpUtils.sendResponse(routingContext, responseStatus.code(),
this.format == EmbeddedFormat.BINARY ? BridgeContentType.KAFKA_JSON_BINARY : BridgeContentType.KAFKA_JSON_JSON,
buffer);

HttpUtils.sendResponse(routingContext, responseStatus.code(), getContentType(), buffer);
}
} catch (JsonDecodeException e) {
LOGGER.error("Error decoding records as JSON", e);
Expand Down Expand Up @@ -614,16 +614,32 @@ private MessageConverter<K, V, byte[], byte[]> buildMessageConverter() {
return (MessageConverter<K, V, byte[], byte[]>) new HttpJsonMessageConverter();
case BINARY:
return (MessageConverter<K, V, byte[], byte[]>) new HttpBinaryMessageConverter();
case TEXT:
return (MessageConverter<K, V, byte[], byte[]>) new HttpTextMessageConverter();
}
return null;
}

private String getContentType() {
switch (this.format) {
case JSON:
return BridgeContentType.KAFKA_JSON_JSON;
case BINARY:
return BridgeContentType.KAFKA_JSON_BINARY;
case TEXT:
return BridgeContentType.KAFKA_JSON_TEXT;
}
throw new IllegalArgumentException();
}

private boolean checkAcceptedBody(String accept) {
switch (accept) {
case BridgeContentType.KAFKA_JSON_JSON:
return format == EmbeddedFormat.JSON;
case BridgeContentType.KAFKA_JSON_BINARY:
return format == EmbeddedFormat.BINARY;
case BridgeContentType.KAFKA_JSON_TEXT:
return format == EmbeddedFormat.TEXT;
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.strimzi.kafka.bridge.converter.MessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpBinaryMessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpTextMessageConverter;
import io.strimzi.kafka.bridge.http.converter.JsonUtils;
import io.strimzi.kafka.bridge.http.model.HttpBridgeError;
import io.strimzi.kafka.bridge.http.model.HttpBridgeResult;
Expand Down Expand Up @@ -228,6 +229,8 @@ private MessageConverter<K, V, byte[], byte[]> buildMessageConverter(String cont
return (MessageConverter<K, V, byte[], byte[]>) new HttpJsonMessageConverter();
case BridgeContentType.KAFKA_JSON_BINARY:
return (MessageConverter<K, V, byte[], byte[]>) new HttpBinaryMessageConverter();
case BridgeContentType.KAFKA_JSON_TEXT:
return (MessageConverter<K, V, byte[], byte[]>) new HttpTextMessageConverter();
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.bridge.http.converter;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import io.strimzi.kafka.bridge.converter.MessageConverter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;

import javax.xml.bind.DatatypeConverter;
import java.util.ArrayList;
import java.util.List;

/**
* Implementation of a message converter to deal with the "text" embedded data format
*/
@SuppressWarnings("checkstyle:NPathComplexity")
public class HttpTextMessageConverter implements MessageConverter<byte[], byte[], byte[], byte[]> {
@Override
public ProducerRecord<byte[], byte[]> toKafkaRecord(String kafkaTopic, Integer partition, byte[] message) {

Integer partitionFromBody = null;
byte[] key = null;
byte[] value = null;
Headers headers = new RecordHeaders();

JsonNode json = JsonUtils.bytesToJson(message);

if (!json.isEmpty()) {
if (json.has("key")) {
key = json.get("key").asText().getBytes();
}
if (json.has("value")) {
JsonNode valueNode = json.get("value");
if (!valueNode.isTextual()) {
throw new IllegalStateException("Because the embedded format is 'text', the value must be a string");
}
value = valueNode.asText().getBytes();
}
if (json.has("headers")) {
ArrayNode jsonArray = (ArrayNode) json.get("headers");
for (JsonNode jsonObject : jsonArray) {
headers.add(new RecordHeader(jsonObject.get("key").asText(), DatatypeConverter.parseBase64Binary(jsonObject.get("value").asText())));
}
}
if (json.has("partition")) {
partitionFromBody = json.get("partition").asInt();
}
if (partition != null && partitionFromBody != null) {
throw new IllegalStateException("Partition specified in body and in request path");
}
if (partition != null) {
partitionFromBody = partition;
}
}
return new ProducerRecord<>(kafkaTopic, partitionFromBody, key, value, headers);
}

@Override
public List<ProducerRecord<byte[], byte[]>> toKafkaRecords(String kafkaTopic, Integer partition, byte[] messages) {

List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();

JsonNode json = JsonUtils.bytesToJson(messages);
ArrayNode jsonArray = (ArrayNode) json.get("records");

for (JsonNode jsonObj : jsonArray) {
records.add(toKafkaRecord(kafkaTopic, partition, JsonUtils.jsonToBytes(jsonObj)));
}
return records;
}

@Override
public byte[] toMessage(String address, ConsumerRecord<byte[], byte[]> record) {
throw new UnsupportedOperationException();
}

@Override
public byte[] toMessages(ConsumerRecords<byte[], byte[]> records) {
ArrayNode jsonArray = JsonUtils.createArrayNode();

for (ConsumerRecord<byte[], byte[]> record : records) {
ObjectNode jsonObject = JsonUtils.createObjectNode();

jsonObject.set("topic", new TextNode(record.topic()));
jsonObject.set("key", record.key() != null ? new TextNode(new String(record.key())) : null);
jsonObject.set("value", record.value() != null ? new TextNode(new String(record.value())) : null);
jsonObject.put("partition", record.partition());
jsonObject.put("offset", record.offset());

ArrayNode headers = JsonUtils.createArrayNode();

for (Header kafkaHeader : record.headers()) {
ObjectNode header = JsonUtils.createObjectNode();

header.set("key", new TextNode(kafkaHeader.key()));
header.put("value", DatatypeConverter.printBase64Binary(kafkaHeader.value()));
headers.add(header);
}
if (!headers.isEmpty()) {
jsonObject.set("headers", headers);
}
jsonArray.add(jsonObject);
}
return JsonUtils.jsonToBytes(jsonArray);
}
}
30 changes: 30 additions & 0 deletions src/main/resources/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,11 @@
"schema": {
"$ref": "#/components/schemas/ProducerRecordList"
}
},
"application/vnd.kafka.text.v2+json": {
"schema": {
"$ref": "#/components/schemas/ProducerRecordList"
}
}
},
"required": true
Expand Down Expand Up @@ -799,6 +804,11 @@
}
}
},
"application/vnd.kafka.text.v2+json": {
"schema": {
"$ref": "#/components/schemas/ConsumerRecordList"
}
},
"application/vnd.kafka.v2+json": {
"schema": {
"$ref": "#/components/schemas/ConsumerRecordList"
Expand All @@ -819,6 +829,11 @@
"$ref": "#/components/schemas/Error"
}
},
"application/vnd.kafka.text.v2+json": {
"schema": {
"$ref": "#/components/schemas/Error"
}
},
"application/vnd.kafka.v2+json": {
"schema": {
"$ref": "#/components/schemas/Error"
Expand Down Expand Up @@ -847,6 +862,11 @@
"$ref": "#/components/schemas/Error"
}
},
"application/vnd.kafka.text.v2+json": {
"schema": {
"$ref": "#/components/schemas/Error"
}
},
"application/vnd.kafka.v2+json": {
"schema": {
"$ref": "#/components/schemas/Error"
Expand Down Expand Up @@ -875,6 +895,11 @@
"$ref": "#/components/schemas/Error"
}
},
"application/vnd.kafka.text.v2+json": {
"schema": {
"$ref": "#/components/schemas/Error"
}
},
"application/vnd.kafka.v2+json": {
"schema": {
"$ref": "#/components/schemas/Error"
Expand Down Expand Up @@ -1041,6 +1066,11 @@
"schema": {
"$ref": "#/components/schemas/ProducerRecordToPartitionList"
}
},
"application/vnd.kafka.text.v2+json": {
"schema": {
"$ref": "#/components/schemas/ProducerRecordToPartitionList"
}
}
},
"required": true
Expand Down
7 changes: 5 additions & 2 deletions src/main/resources/openapiv2.json
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,8 @@
"operationId": "send",
"consumes": [
"application/vnd.kafka.json.v2+json",
"application/vnd.kafka.binary.v2+json"
"application/vnd.kafka.binary.v2+json",
"application/vnd.kafka.text.v2+json"
],
"produces": [
"application/vnd.kafka.v2+json"
Expand Down Expand Up @@ -695,6 +696,7 @@
"produces": [
"application/vnd.kafka.json.v2+json",
"application/vnd.kafka.binary.v2+json",
"application/vnd.kafka.text.v2+json",
"application/vnd.kafka.v2+json"
],
"responses": {
Expand Down Expand Up @@ -935,7 +937,8 @@
"operationId": "sendToPartition",
"consumes": [
"application/vnd.kafka.json.v2+json",
"application/vnd.kafka.binary.v2+json"
"application/vnd.kafka.binary.v2+json",
"application/vnd.kafka.text.v2+json"
],
"produces": [
"application/vnd.kafka.v2+json"
Expand Down
Loading
Loading