From 3e983a201e1d75889c99cc45601a0191789bada1 Mon Sep 17 00:00:00 2001 From: Paolo Patierno Date: Sun, 12 Jan 2025 19:18:59 +0100 Subject: [PATCH] Refactored to use new Java version constructs (#961) Signed-off-by: Paolo Patierno --- .../strimzi/kafka/bridge/EmbeddedFormat.java | 15 ++--- .../bridge/http/HttpAdminBridgeEndpoint.java | 4 +- .../strimzi/kafka/bridge/http/HttpBridge.java | 11 ++-- .../bridge/http/HttpSinkBridgeEndpoint.java | 55 +++++++--------- .../bridge/http/HttpSourceBridgeEndpoint.java | 24 ++++--- .../bridge/http/model/HttpBridgeError.java | 34 ++-------- .../bridge/http/model/HttpBridgeResult.java | 22 +------ .../strimzi/kafka/bridge/http/ConsumerIT.java | 62 +++++++++---------- .../bridge/http/ConsumerSubscriptionIT.java | 28 ++++----- .../http/DisablingConsumerProducerIT.java | 8 +-- .../kafka/bridge/http/OtherServicesIT.java | 6 +- .../strimzi/kafka/bridge/http/ProducerIT.java | 30 ++++----- .../io/strimzi/kafka/bridge/http/SeekIT.java | 20 +++--- 13 files changed, 130 insertions(+), 189 deletions(-) diff --git a/src/main/java/io/strimzi/kafka/bridge/EmbeddedFormat.java b/src/main/java/io/strimzi/kafka/bridge/EmbeddedFormat.java index 16de02de2..a8b53c904 100644 --- a/src/main/java/io/strimzi/kafka/bridge/EmbeddedFormat.java +++ b/src/main/java/io/strimzi/kafka/bridge/EmbeddedFormat.java @@ -26,14 +26,11 @@ public enum EmbeddedFormat { * @return corresponding enum */ public static EmbeddedFormat from(String value) { - switch (value) { - case "json": - return JSON; - case "binary": - return BINARY; - case "text": - return TEXT; - } - throw new IllegalEmbeddedFormatException("Invalid format type."); + return switch (value) { + case "json" -> JSON; + case "binary" -> BINARY; + case "text" -> TEXT; + default -> throw new IllegalEmbeddedFormatException("Invalid format type."); + }; } } diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java index 8b0763e3d..f795efa52 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java @@ -44,7 +44,7 @@ public class HttpAdminBridgeEndpoint extends HttpBridgeEndpoint { private static final Logger LOGGER = LogManager.getLogger(HttpAdminBridgeEndpoint.class); - private final HttpBridgeContext httpBridgeContext; + private final HttpBridgeContext httpBridgeContext; private final KafkaBridgeAdmin kafkaBridgeAdmin; /** @@ -53,7 +53,7 @@ public class HttpAdminBridgeEndpoint extends HttpBridgeEndpoint { * @param bridgeConfig the bridge configuration * @param context the HTTP bridge context */ - public HttpAdminBridgeEndpoint(BridgeConfig bridgeConfig, HttpBridgeContext context) { + public HttpAdminBridgeEndpoint(BridgeConfig bridgeConfig, HttpBridgeContext context) { super(bridgeConfig); this.name = "kafka-bridge-admin"; this.httpBridgeContext = context; diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java index fbebb7147..68511a5d2 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java @@ -329,7 +329,7 @@ private void createConsumer(RoutingContext routingContext) { responseStatus.code(), ex.getMessage() ); - HttpUtils.sendResponse(routingContext, error.getCode(), + HttpUtils.sendResponse(routingContext, error.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson())); } } @@ -586,20 +586,17 @@ private void errorHandler(RoutingContext routingContext) { // in case of validation exception, building a meaningful error message if (routingContext.failure() != null) { StringBuilder sb = new StringBuilder(); - if (routingContext.failure().getCause() instanceof ValidationException) { - ValidationException validationException = (ValidationException) routingContext.failure().getCause(); + if (routingContext.failure().getCause() instanceof ValidationException validationException) { if (validationException.inputScope() != null) { sb.append("Validation error on: ").append(validationException.inputScope()).append(" - "); } sb.append(validationException.getMessage()); - } else if (routingContext.failure() instanceof ParameterProcessorException) { - ParameterProcessorException parameterException = (ParameterProcessorException) routingContext.failure(); + } else if (routingContext.failure() instanceof ParameterProcessorException parameterException) { if (parameterException.getParameterName() != null) { sb.append("Parameter error on: ").append(parameterException.getParameterName()).append(" - "); } sb.append(parameterException.getMessage()); - } else if (routingContext.failure() instanceof BodyProcessorException) { - BodyProcessorException bodyProcessorException = (BodyProcessorException) routingContext.failure(); + } else if (routingContext.failure() instanceof BodyProcessorException bodyProcessorException) { sb.append(bodyProcessorException.getMessage()); } message = sb.toString(); diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java index b54cb0eaa..dc51e1fc0 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java @@ -197,7 +197,7 @@ private void doSeek(RoutingContext routingContext, JsonNode bodyAsJson) { HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null); } else { HttpBridgeError error = handleError(ex); - HttpUtils.sendResponse(routingContext, error.getCode(), + HttpUtils.sendResponse(routingContext, error.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson())); } }); @@ -223,7 +223,7 @@ private void doSeekTo(RoutingContext routingContext, JsonNode bodyAsJson, HttpOp HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null); } else { HttpBridgeError error = handleError(ex); - HttpUtils.sendResponse(routingContext, error.getCode(), + HttpUtils.sendResponse(routingContext, error.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson())); } }); @@ -388,7 +388,7 @@ private void doAssign(RoutingContext routingContext, JsonNode bodyAsJson) { StreamSupport.stream(partitionsList.spliterator(), false) .map(JsonNode.class::cast) .map(json -> new SinkTopicSubscription(JsonUtils.getString(json, "topic"), JsonUtils.getInt(json, "partition"))) - .collect(Collectors.toList()) + .toList() ); // fulfilling the request in a separate thread to free the Vert.x event loop still in place @@ -441,7 +441,7 @@ private void doSubscribe(RoutingContext routingContext, JsonNode bodyAsJson) { StreamSupport.stream(topicsList.spliterator(), false) .map(TextNode.class::cast) .map(topic -> new SinkTopicSubscription(topic.asText())) - .collect(Collectors.toList()) + .toList() ); this.kafkaBridgeConsumer.subscribe(topicSubscriptions); } else if (bodyAsJson.has("topic_pattern")) { @@ -554,7 +554,7 @@ public void handle(RoutingContext routingContext, Handler ha LOGGER.debug("[{}] Request: body = {}", routingContext.get("request-id"), bodyAsJson); } catch (JsonDecodeException ex) { HttpBridgeError error = handleError(ex); - HttpUtils.sendResponse(routingContext, error.getCode(), + HttpUtils.sendResponse(routingContext, error.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson())); return; } @@ -609,39 +609,30 @@ public void handle(RoutingContext routingContext, Handler ha @SuppressWarnings("unchecked") private MessageConverter buildMessageConverter() { - switch (this.format) { - case JSON: - return (MessageConverter) new HttpJsonMessageConverter(); - case BINARY: - return (MessageConverter) new HttpBinaryMessageConverter(); - case TEXT: - return (MessageConverter) new HttpTextMessageConverter(); - } - return null; + return switch (this.format) { + case JSON -> (MessageConverter) new HttpJsonMessageConverter(); + case BINARY -> (MessageConverter) new HttpBinaryMessageConverter(); + case TEXT -> (MessageConverter) new HttpTextMessageConverter(); + default -> null; + }; } private String getContentType() { - switch (this.format) { - case JSON: - return BridgeContentType.KAFKA_JSON_JSON; - case BINARY: - return BridgeContentType.KAFKA_JSON_BINARY; - case TEXT: - return BridgeContentType.KAFKA_JSON_TEXT; - } - throw new IllegalArgumentException(); + return switch (this.format) { + case JSON -> BridgeContentType.KAFKA_JSON_JSON; + case BINARY -> BridgeContentType.KAFKA_JSON_BINARY; + case TEXT -> BridgeContentType.KAFKA_JSON_TEXT; + default -> throw new IllegalArgumentException(); + }; } private boolean checkAcceptedBody(String accept) { - switch (accept) { - case BridgeContentType.KAFKA_JSON_JSON: - return format == EmbeddedFormat.JSON; - case BridgeContentType.KAFKA_JSON_BINARY: - return format == EmbeddedFormat.BINARY; - case BridgeContentType.KAFKA_JSON_TEXT: - return format == EmbeddedFormat.TEXT; - } - return false; + return switch (accept) { + case BridgeContentType.KAFKA_JSON_JSON -> format == EmbeddedFormat.JSON; + case BridgeContentType.KAFKA_JSON_BINARY -> format == EmbeddedFormat.BINARY; + case BridgeContentType.KAFKA_JSON_TEXT -> format == EmbeddedFormat.TEXT; + default -> false; + }; } /** diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java index 76a81577c..57d0897b6 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java @@ -191,13 +191,11 @@ private ObjectNode buildOffsets(List> results) { for (HttpBridgeResult result : results) { ObjectNode offset = null; - if (result.getResult() instanceof RecordMetadata) { - RecordMetadata metadata = (RecordMetadata) result.getResult(); + if (result.result() instanceof RecordMetadata metadata) { offset = JsonUtils.createObjectNode() .put("partition", metadata.partition()) .put("offset", metadata.offset()); - } else if (result.getResult() instanceof HttpBridgeError) { - HttpBridgeError error = (HttpBridgeError) result.getResult(); + } else if (result.result() instanceof HttpBridgeError error) { offset = error.toJson(); } offsets.add(offset); @@ -218,14 +216,14 @@ private int handleError(Throwable ex) { @SuppressWarnings("unchecked") private MessageConverter buildMessageConverter(String contentType) { - switch (contentType) { - case BridgeContentType.KAFKA_JSON_JSON: - return (MessageConverter) new HttpJsonMessageConverter(); - case BridgeContentType.KAFKA_JSON_BINARY: - return (MessageConverter) new HttpBinaryMessageConverter(); - case BridgeContentType.KAFKA_JSON_TEXT: - return (MessageConverter) new HttpTextMessageConverter(); - } - return null; + return switch (contentType) { + case BridgeContentType.KAFKA_JSON_JSON -> + (MessageConverter) new HttpJsonMessageConverter(); + case BridgeContentType.KAFKA_JSON_BINARY -> + (MessageConverter) new HttpBinaryMessageConverter(); + case BridgeContentType.KAFKA_JSON_TEXT -> + (MessageConverter) new HttpTextMessageConverter(); + default -> null; + }; } } diff --git a/src/main/java/io/strimzi/kafka/bridge/http/model/HttpBridgeError.java b/src/main/java/io/strimzi/kafka/bridge/http/model/HttpBridgeError.java index 409a0bbef..87aed9177 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/model/HttpBridgeError.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/model/HttpBridgeError.java @@ -11,36 +11,11 @@ /** * Represents an error related to HTTP bridging + * + * @param code code classifying the error itself + * @param message message providing more information about the error */ -public class HttpBridgeError { - - private final int code; - private final String message; - - /** - * Constructor - * - * @param code code classifying the error itself - * @param message message providing more information about the error - */ - public HttpBridgeError(int code, String message) { - this.code = code; - this.message = message; - } - - /** - * @return code classifying the error itself - */ - public int getCode() { - return code; - } - - /** - * @return message providing more information about the error - */ - public String getMessage() { - return message; - } +public record HttpBridgeError(int code, String message) { /** * @return a JSON representation of the error with code and message @@ -54,6 +29,7 @@ public ObjectNode toJson() { /** * Create an error instance from a JSON representation + * * @param json JSON representation of the error * @return error instance */ diff --git a/src/main/java/io/strimzi/kafka/bridge/http/model/HttpBridgeResult.java b/src/main/java/io/strimzi/kafka/bridge/http/model/HttpBridgeResult.java index befb2a3d5..c0d3129f5 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/model/HttpBridgeResult.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/model/HttpBridgeResult.java @@ -9,24 +9,6 @@ * This class represents a result of an HTTP bridging operation * * @param the class bringing the actual result as {@link HttpBridgeError} or {@link org.apache.kafka.clients.producer.RecordMetadata} + * @param result actual result */ -public class HttpBridgeResult { - - final T result; - - /** - * Constructor - * - * @param result actual result - */ - public HttpBridgeResult(T result) { - this.result = result; - } - - /** - * @return the actual result - */ - public T getResult() { - return result; - } -} +public record HttpBridgeResult(T result) { } diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java index f9d5776d2..a38974ea9 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerIT.java @@ -91,8 +91,8 @@ void createConsumerWrongFormat(VertxTestContext context) throws InterruptedExcep HttpResponse response = ar.result(); assertThat(response.statusCode(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); - assertThat(error.getCode(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); - assertThat(error.getMessage(), is("Invalid format type.")); + assertThat(error.code(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); + assertThat(error.message(), is("Invalid format type.")); }); create.complete(true); }); @@ -153,8 +153,8 @@ void createConsumerEnableAutoCommit(VertxTestContext context) throws Interrupted HttpResponse response = ar.result(); assertThat(response.statusCode(), is(HttpResponseStatus.BAD_REQUEST.code())); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); - assertThat(error.getCode(), is(HttpResponseStatus.BAD_REQUEST.code())); - assertThat(error.getMessage(), is("Validation error on: /enable.auto.commit - input don't match type BOOLEAN")); + assertThat(error.code(), is(HttpResponseStatus.BAD_REQUEST.code())); + assertThat(error.message(), is("Validation error on: /enable.auto.commit - input don't match type BOOLEAN")); }); createBooleanAsString.complete(true); }); @@ -174,8 +174,8 @@ void createConsumerEnableAutoCommit(VertxTestContext context) throws Interrupted HttpResponse response = ar.result(); assertThat(response.statusCode(), is(HttpResponseStatus.BAD_REQUEST.code())); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); - assertThat(error.getCode(), is(HttpResponseStatus.BAD_REQUEST.code())); - assertThat(error.getMessage(), is("Validation error on: /enable.auto.commit - input don't match type BOOLEAN")); + assertThat(error.code(), is(HttpResponseStatus.BAD_REQUEST.code())); + assertThat(error.message(), is("Validation error on: /enable.auto.commit - input don't match type BOOLEAN")); }); createGenericString.complete(true); }); @@ -218,8 +218,8 @@ private void createConsumerIntegerParam(VertxTestContext context, String param) HttpResponse response = ar.result(); assertThat(response.statusCode(), is(HttpResponseStatus.BAD_REQUEST.code())); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); - assertThat(error.getCode(), is(HttpResponseStatus.BAD_REQUEST.code())); - assertThat(error.getMessage(), is("Validation error on: /" + param + " - input don't match type INTEGER")); + assertThat(error.code(), is(HttpResponseStatus.BAD_REQUEST.code())); + assertThat(error.message(), is("Validation error on: /" + param + " - input don't match type INTEGER")); }); createIntegerAsString.complete(true); }); @@ -239,8 +239,8 @@ private void createConsumerIntegerParam(VertxTestContext context, String param) HttpResponse response = ar.result(); assertThat(response.statusCode(), is(HttpResponseStatus.BAD_REQUEST.code())); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); - assertThat(error.getCode(), is(HttpResponseStatus.BAD_REQUEST.code())); - assertThat(error.getMessage(), is("Validation error on: /" + param + " - input don't match type INTEGER")); + assertThat(error.code(), is(HttpResponseStatus.BAD_REQUEST.code())); + assertThat(error.message(), is("Validation error on: /" + param + " - input don't match type INTEGER")); }); createGenericString.complete(true); }); @@ -430,8 +430,8 @@ void createConsumerWithForwardedHeaderWrongProto(VertxTestContext context) throw HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())); - assertThat(error.getCode(), is(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())); - assertThat(error.getMessage(), is("mqtt is not a valid schema/proto.")); + assertThat(error.code(), is(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())); + assertThat(error.message(), is("mqtt is not a valid schema/proto.")); }); context.completeNow(); }); @@ -1196,8 +1196,8 @@ void consumerAlreadyExistsTest(VertxTestContext context) throws InterruptedExcep HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.CONFLICT.code())); - assertThat(error.getCode(), is(HttpResponseStatus.CONFLICT.code())); - assertThat(error.getMessage(), is("A consumer instance with the specified name already exists in the Kafka Bridge.")); + assertThat(error.code(), is(HttpResponseStatus.CONFLICT.code())); + assertThat(error.message(), is("A consumer instance with the specified name already exists in the Kafka Bridge.")); context.completeNow(); }); create2Again.complete(true); @@ -1244,8 +1244,8 @@ void recordsConsumerDoesNotExist(VertxTestContext context) { HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getMessage(), is("The specified consumer instance was not found.")); + assertThat(error.code(), is(HttpResponseStatus.NOT_FOUND.code())); + assertThat(error.message(), is("The specified consumer instance was not found.")); }); context.completeNow(); }); @@ -1272,8 +1272,8 @@ void offsetsConsumerDoesNotExist(VertxTestContext context) { HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getMessage(), is("The specified consumer instance was not found.")); + assertThat(error.code(), is(HttpResponseStatus.NOT_FOUND.code())); + assertThat(error.message(), is("The specified consumer instance was not found.")); }); context.completeNow(); }); @@ -1309,8 +1309,8 @@ void doNotRespondTooLongMessage(VertxTestContext context) throws InterruptedExce HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); - assertThat(error.getCode(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); - assertThat(error.getMessage(), is("Response exceeds the maximum number of bytes the consumer can receive")); + assertThat(error.code(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); + assertThat(error.message(), is("Response exceeds the maximum number of bytes the consumer can receive")); }); consume.complete(true); }); @@ -1388,8 +1388,8 @@ void doNotReceiveMessageAfterUnsubscribe(VertxTestContext context) throws Interr HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())); - assertThat(error.getCode(), is(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())); - assertThat(error.getMessage(), is("Consumer is not subscribed to any topics or assigned any partitions")); + assertThat(error.code(), is(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())); + assertThat(error.message(), is("Consumer is not subscribed to any topics or assigned any partitions")); }); consume2.complete(true); }); @@ -1429,8 +1429,8 @@ void formatAndAcceptMismatch(VertxTestContext context) throws InterruptedExcepti HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.NOT_ACCEPTABLE.code())); - assertThat(error.getCode(), is(HttpResponseStatus.NOT_ACCEPTABLE.code())); - assertThat(error.getMessage(), is("Consumer format does not match the embedded format requested by the Accept header.")); + assertThat(error.code(), is(HttpResponseStatus.NOT_ACCEPTABLE.code())); + assertThat(error.message(), is("Consumer format does not match the embedded format requested by the Accept header.")); }); consume.complete(true); }); @@ -1571,8 +1571,8 @@ void tryReceiveNotValidJsonMessage(VertxTestContext context) throws InterruptedE HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.NOT_ACCEPTABLE.code())); - assertThat(error.getCode(), is(HttpResponseStatus.NOT_ACCEPTABLE.code())); - assertThat(error.getMessage().startsWith("Failed to decode"), is(true)); + assertThat(error.code(), is(HttpResponseStatus.NOT_ACCEPTABLE.code())); + assertThat(error.message().startsWith("Failed to decode"), is(true)); }); consume.complete(true); }); @@ -1694,8 +1694,8 @@ private void checkCreatingConsumer(String key, String value, HttpResponseStatus HttpResponse response = ar.result(); assertThat(response.statusCode(), is(status.code())); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); - assertThat(error.getCode(), is(status.code())); - assertThat(error.getMessage(), is(message)); + assertThat(error.code(), is(status.code())); + assertThat(error.message(), is(message)); }); consumer.complete(true); }); @@ -1722,9 +1722,9 @@ void createConsumerWithInvalidFormat(VertxTestContext context) throws Interrupte HttpResponse response = ar.result(); assertThat("Response status code is not '422'", response.statusCode(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); - assertThat("Response status code is not '422'", HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), is(error.getCode())); - LOGGER.info("This is message -> " + error.getMessage()); - assertThat("Body message doesn't contain 'Invalid format type.'", error.getMessage(), equalTo("Invalid format type.")); + assertThat("Response status code is not '422'", HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), is(error.code())); + LOGGER.info("This is message -> " + error.message()); + assertThat("Body message doesn't contain 'Invalid format type.'", error.message(), equalTo("Invalid format type.")); }); create.complete(true); }); diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerSubscriptionIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerSubscriptionIT.java index 27b9d9e8b..4ba2c527c 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerSubscriptionIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerSubscriptionIT.java @@ -58,8 +58,8 @@ void unsubscribeConsumerNotFound(VertxTestContext context) throws InterruptedExc HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getMessage(), is("The specified consumer instance was not found.")); + assertThat(error.code(), is(HttpResponseStatus.NOT_FOUND.code())); + assertThat(error.message(), is("The specified consumer instance was not found.")); }); unsubscribe.complete(true); }); @@ -97,8 +97,8 @@ void subscribeExclusiveTopicAndPattern(VertxTestContext context) throws Throwabl HttpResponse response = ar.result(); assertThat(response.statusCode(), is(HttpResponseStatus.CONFLICT.code())); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); - assertThat(error.getCode(), is(HttpResponseStatus.CONFLICT.code())); - assertThat(error.getMessage(), is("Subscriptions to topics, partitions, and patterns are mutually exclusive.")); + assertThat(error.code(), is(HttpResponseStatus.CONFLICT.code())); + assertThat(error.message(), is("Subscriptions to topics, partitions, and patterns are mutually exclusive.")); }); subscribeConflict.complete(true); @@ -117,8 +117,8 @@ void subscribeExclusiveTopicAndPattern(VertxTestContext context) throws Throwabl HttpResponse response = ar.result(); assertThat(response.statusCode(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); - assertThat(error.getCode(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); - assertThat(error.getMessage(), is("A list (of Topics type) or a topic_pattern must be specified.")); + assertThat(error.code(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); + assertThat(error.message(), is("A list (of Topics type) or a topic_pattern must be specified.")); }); subscribeEmpty.complete(true); @@ -152,8 +152,8 @@ void subscriptionConsumerDoesNotExistBecauseNotCreated(VertxTestContext context) HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getMessage(), is("The specified consumer instance was not found.")); + assertThat(error.code(), is(HttpResponseStatus.NOT_FOUND.code())); + assertThat(error.message(), is("The specified consumer instance was not found.")); }); subscribe.complete(true); }); @@ -277,8 +277,8 @@ void subscriptionConsumerDoesNotExistBecauseAnotherGroup(VertxTestContext contex HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getMessage(), is("The specified consumer instance was not found.")); + assertThat(error.code(), is(HttpResponseStatus.NOT_FOUND.code())); + assertThat(error.message(), is("The specified consumer instance was not found.")); }); subscribe.complete(true); }); @@ -377,8 +377,8 @@ void tryToPollWithoutSubscriptionTest(VertxTestContext context) throws Interrupt HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), CoreMatchers.is(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())); - assertThat(error.getCode(), CoreMatchers.is(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())); - assertThat(error.getMessage(), CoreMatchers.is("Consumer is not subscribed to any topics or assigned any partitions")); + assertThat(error.code(), CoreMatchers.is(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())); + assertThat(error.message(), CoreMatchers.is("Consumer is not subscribed to any topics or assigned any partitions")); }); consume.complete(true); @@ -429,8 +429,8 @@ void assignAfterSubscriptionTest(VertxTestContext context) throws InterruptedExc HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.CONFLICT.code())); - assertThat(error.getCode(), is(HttpResponseStatus.CONFLICT.code())); - assertThat(error.getMessage(), is("Subscriptions to topics, partitions, and patterns are mutually exclusive.")); + assertThat(error.code(), is(HttpResponseStatus.CONFLICT.code())); + assertThat(error.message(), is("Subscriptions to topics, partitions, and patterns are mutually exclusive.")); }); assignCF.complete(true); }); diff --git a/src/test/java/io/strimzi/kafka/bridge/http/DisablingConsumerProducerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/DisablingConsumerProducerIT.java index a951f5a01..d668dd2d4 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/DisablingConsumerProducerIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/DisablingConsumerProducerIT.java @@ -111,8 +111,8 @@ void consumerDisabledTest(VertxTestContext context) throws ExecutionException, I HttpResponse response = ar.result(); assertThat(response.statusCode(), CoreMatchers.is(HttpResponseStatus.SERVICE_UNAVAILABLE.code())); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); - assertThat(error.getCode(), CoreMatchers.is(HttpResponseStatus.SERVICE_UNAVAILABLE.code())); - assertThat(error.getMessage(), CoreMatchers.is("Consumer is disabled in config. To enable consumer update http.consumer.enabled to true")); + assertThat(error.code(), CoreMatchers.is(HttpResponseStatus.SERVICE_UNAVAILABLE.code())); + assertThat(error.message(), CoreMatchers.is("Consumer is disabled in config. To enable consumer update http.consumer.enabled to true")); tryCreate.complete(true); }); }); @@ -148,8 +148,8 @@ void producerDisabledTest(VertxTestContext context) throws ExecutionException, I HttpResponse response = ar.result(); assertThat(response.statusCode(), CoreMatchers.is(HttpResponseStatus.SERVICE_UNAVAILABLE.code())); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); - assertThat(error.getCode(), CoreMatchers.is(HttpResponseStatus.SERVICE_UNAVAILABLE.code())); - assertThat(error.getMessage(), CoreMatchers.is("Producer is disabled in config. To enable producer update http.producer.enabled to true")); + assertThat(error.code(), CoreMatchers.is(HttpResponseStatus.SERVICE_UNAVAILABLE.code())); + assertThat(error.message(), CoreMatchers.is("Producer is disabled in config. To enable producer update http.producer.enabled to true")); trySend.complete(true); }); }); diff --git a/src/test/java/io/strimzi/kafka/bridge/http/OtherServicesIT.java b/src/test/java/io/strimzi/kafka/bridge/http/OtherServicesIT.java index b4b779922..b6afec195 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/OtherServicesIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/OtherServicesIT.java @@ -92,8 +92,8 @@ void openapiv2Test(VertxTestContext context) { HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.GONE.code())); - assertThat(error.getCode(), is(HttpResponseStatus.GONE.code())); - assertThat(error.getMessage(), is("OpenAPI v2 Swagger not supported")); + assertThat(error.code(), is(HttpResponseStatus.GONE.code())); + assertThat(error.message(), is("OpenAPI v2 Swagger not supported")); }); context.completeNow(); }); @@ -193,7 +193,7 @@ void postToNonexistentEndpoint(VertxTestContext context) { HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getCode(), is(HttpResponseStatus.NOT_FOUND.code())); + assertThat(error.code(), is(HttpResponseStatus.NOT_FOUND.code())); }); context.completeNow(); }); diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java index 37fd5eb38..5004bf27a 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java @@ -706,7 +706,7 @@ void emptyRecordTest(VertxTestContext context) throws InterruptedException, Exec HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); - assertThat(error.getCode(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); + assertThat(error.code(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); }); context.completeNow(); }); @@ -737,8 +737,8 @@ void sendTextMessageWithWrongValue(VertxTestContext context) throws InterruptedE HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); - assertThat(error.getCode(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); - assertThat(error.getMessage(), is("Because the embedded format is 'text', the value must be a string")); + assertThat(error.code(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); + assertThat(error.message(), is("Because the embedded format is 'text', the value must be a string")); }); context.completeNow(); }); @@ -770,8 +770,8 @@ void sendTextMessageWithWrongKey(VertxTestContext context) throws InterruptedExc HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); - assertThat(error.getCode(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); - assertThat(error.getMessage(), is("Because the embedded format is 'text', the key must be a string")); + assertThat(error.code(), is(HttpResponseStatus.UNPROCESSABLE_ENTITY.code())); + assertThat(error.message(), is("Because the embedded format is 'text', the key must be a string")); }); context.completeNow(); }); @@ -853,10 +853,10 @@ void sendToNonExistingPartitionsTest(VertxTestContext context) throws Interrupte assertThat(offsets.size(), is(1)); HttpBridgeError error = HttpBridgeError.fromJson(offsets.getJsonObject(0)); - assertThat(error.getCode(), is(HttpResponseStatus.NOT_FOUND.code())); + assertThat(error.code(), is(HttpResponseStatus.NOT_FOUND.code())); // the message got from the Kafka producer (starting from 2.3) is misleading // this JIRA (https://issues.apache.org/jira/browse/KAFKA-8862) raises the issue - assertThat(error.getMessage(), is( + assertThat(error.message(), is( "Topic " + topic + " not present in metadata after " + config.get(KafkaProducerConfig.KAFKA_PRODUCER_CONFIG_PREFIX + ProducerConfig.MAX_BLOCK_MS_CONFIG) + " ms.")); }); @@ -1036,8 +1036,8 @@ Handler>> verifyBadRequest(VertxTestContext HttpResponse response = ar.result(); assertThat(response.statusCode(), is(HttpResponseStatus.BAD_REQUEST.code())); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); - assertThat(error.getCode(), is(HttpResponseStatus.BAD_REQUEST.code())); - assertThat(error.getMessage(), is(message)); + assertThat(error.code(), is(HttpResponseStatus.BAD_REQUEST.code())); + assertThat(error.message(), is(message)); context.completeNow(); }); } @@ -1094,8 +1094,8 @@ void sendMultipleRecordsWithOneInvalidPartitionTest(VertxTestContext context) th assertThat(metadata.getInteger("partition"), is(partition)); assertThat(metadata.getLong("offset"), is(0L)); HttpBridgeError error = HttpBridgeError.fromJson(offsets.getJsonObject(1)); - assertThat(error.getCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getMessage(), is("Topic " + topic + " not present in metadata after 10000 ms.")); + assertThat(error.code(), is(HttpResponseStatus.NOT_FOUND.code())); + assertThat(error.message(), is("Topic " + topic + " not present in metadata after 10000 ms.")); }); context.completeNow(); }); @@ -1272,8 +1272,8 @@ void sendWithWrongAsync(VertxTestContext context) throws InterruptedException, E HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.BAD_REQUEST.code())); - assertThat(error.getCode(), is(HttpResponseStatus.BAD_REQUEST.code())); - assertThat(error.getMessage(), containsString("Value wrong should be true or false")); + assertThat(error.code(), is(HttpResponseStatus.BAD_REQUEST.code())); + assertThat(error.message(), containsString("Value wrong should be true or false")); }); context.completeNow(); }); @@ -1305,8 +1305,8 @@ void sendSimpleMessageWithWrongContentType(VertxTestContext context) throws Inte HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.BAD_REQUEST.code())); - assertThat(error.getCode(), is(HttpResponseStatus.BAD_REQUEST.code())); - assertThat(error.getMessage(), containsString("Cannot find body processor for content type")); + assertThat(error.code(), is(HttpResponseStatus.BAD_REQUEST.code())); + assertThat(error.message(), containsString("Cannot find body processor for content type")); }); context.completeNow(); }); diff --git a/src/test/java/io/strimzi/kafka/bridge/http/SeekIT.java b/src/test/java/io/strimzi/kafka/bridge/http/SeekIT.java index a8f2f8817..70a5894f7 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/SeekIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/SeekIT.java @@ -52,8 +52,8 @@ void seekToNotExistingConsumer(VertxTestContext context) throws InterruptedExcep HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getMessage(), is("The specified consumer instance was not found.")); + assertThat(error.code(), is(HttpResponseStatus.NOT_FOUND.code())); + assertThat(error.message(), is("The specified consumer instance was not found.")); context.completeNow(); }); }); @@ -84,8 +84,8 @@ void seekToNotExistingPartitionInSubscribedTopic(VertxTestContext context) throw HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getMessage(), is("No current assignment for partition " + topic + "-" + notExistingPartition)); + assertThat(error.code(), is(HttpResponseStatus.NOT_FOUND.code())); + assertThat(error.message(), is("No current assignment for partition " + topic + "-" + notExistingPartition)); context.completeNow(); }); }); @@ -119,8 +119,8 @@ void seekToNotExistingTopic(VertxTestContext context) throws InterruptedExceptio HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getMessage(), is("No current assignment for partition " + notExistingTopic + "-" + 0)); + assertThat(error.code(), is(HttpResponseStatus.NOT_FOUND.code())); + assertThat(error.message(), is("No current assignment for partition " + notExistingTopic + "-" + 0)); context.completeNow(); }); consumerInstanceDontHaveTopic.complete(true); @@ -446,8 +446,8 @@ void seekToBeginningMultipleTopicsWithNotSuscribedTopic(VertxTestContext context HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getMessage(), is("No current assignment for partition " + notSubscribedTopic + "-0")); + assertThat(error.code(), is(HttpResponseStatus.NOT_FOUND.code())); + assertThat(error.message(), is("No current assignment for partition " + notSubscribedTopic + "-0")); }); seek.complete(true); }); @@ -514,8 +514,8 @@ void seekToOffsetMultipleTopicsWithNotSuscribedTopic(VertxTestContext context) t HttpResponse response = ar.result(); HttpBridgeError error = HttpBridgeError.fromJson(response.body()); assertThat(response.statusCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getCode(), is(HttpResponseStatus.NOT_FOUND.code())); - assertThat(error.getMessage(), is("No current assignment for partition " + notSubscribedTopic + "-0")); + assertThat(error.code(), is(HttpResponseStatus.NOT_FOUND.code())); + assertThat(error.message(), is("No current assignment for partition " + notSubscribedTopic + "-0")); }); seek.complete(true); });