diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java index 91e83a36..1b5c865b 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java @@ -644,18 +644,6 @@ private void closeConnectionEndpoint(HttpConnection connection) { } } - private EmbeddedFormat contentTypeToFormat(String contentType) { - switch (contentType) { - case BridgeContentType.KAFKA_JSON_BINARY: - return EmbeddedFormat.BINARY; - case BridgeContentType.KAFKA_JSON_JSON: - return EmbeddedFormat.JSON; - case BridgeContentType.KAFKA_JSON_TEXT: - return EmbeddedFormat.TEXT; - } - throw new IllegalArgumentException(contentType); - } - private boolean isAlive() { return this.isReady; } @@ -840,15 +828,7 @@ public void process(RoutingContext routingContext) { } }; - final HttpOpenApiOperation OPENAPIV2 = new HttpOpenApiOperation(HttpOpenApiOperations.OPENAPIV2) { - - @Override - public void process(RoutingContext routingContext) { - HttpBridgeError error = new HttpBridgeError(HttpResponseStatus.GONE.code(), "OpenAPI v2 Swagger not supported"); - HttpUtils.sendResponse(routingContext, HttpResponseStatus.GONE.code(), - BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson())); - } - }; + final HttpOpenApiOperation OPENAPIV2 = new NotSupportedApi(HttpOpenApiOperations.OPENAPIV2); final HttpOpenApiOperation OPENAPIV3 = new HttpOpenApiOperation(HttpOpenApiOperations.OPENAPIV3) { @@ -873,4 +853,17 @@ public void process(RoutingContext routingContext) { information(routingContext); } }; + + static class NotSupportedApi extends HttpOpenApiOperation { + public NotSupportedApi(HttpOpenApiOperations operationId) { + super(operationId); + } + + @Override + public void process(RoutingContext routingContext) { + HttpBridgeError error = new HttpBridgeError(HttpResponseStatus.GONE.code(), "OpenAPI v2 Swagger not supported"); + HttpUtils.sendResponse(routingContext, HttpResponseStatus.GONE.code(), + BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson())); + } + } } diff --git a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java index 0efc790f..3fdbde36 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpTextMessageConverter.java @@ -18,6 +18,8 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import javax.xml.bind.DatatypeConverter; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -28,7 +30,6 @@ public class HttpTextMessageConverter implements MessageConverter { @Override public ProducerRecord toKafkaRecord(String kafkaTopic, Integer partition, byte[] message) { - Integer partitionFromBody = null; Long timestamp = null; byte[] key = null; @@ -43,14 +44,14 @@ public ProducerRecord toKafkaRecord(String kafkaTopic, Integer p if (!keyNode.isTextual()) { throw new IllegalStateException("Because the embedded format is 'text', the key must be a string"); } - key = keyNode.asText().getBytes(); + key = keyNode.asText().getBytes(Charset.forName("UTF-8")); } if (json.has("value")) { JsonNode valueNode = json.get("value"); if (!valueNode.isTextual()) { throw new IllegalStateException("Because the embedded format is 'text', the value must be a string"); } - value = valueNode.asText().getBytes(); + value = valueNode.asText().getBytes(Charset.forName("UTF-8")); } if (json.has("timestamp")) { timestamp = json.get("timestamp").asLong(); @@ -101,8 +102,8 @@ public byte[] toMessages(ConsumerRecords records) { ObjectNode jsonObject = JsonUtils.createObjectNode(); jsonObject.set("topic", new TextNode(record.topic())); - jsonObject.set("key", record.key() != null ? new TextNode(new String(record.key())) : null); - jsonObject.set("value", record.value() != null ? new TextNode(new String(record.value())) : null); + jsonObject.set("key", record.key() != null ? new TextNode(new String(record.key(), StandardCharsets.UTF_8)) : null); + jsonObject.set("value", record.value() != null ? new TextNode(new String(record.value(), StandardCharsets.UTF_8)) : null); jsonObject.put("partition", record.partition()); jsonObject.put("offset", record.offset()); jsonObject.put("timestamp", record.timestamp());