|
1 | 1 | package io.kafbat.ui.client;
|
2 | 2 |
|
| 3 | +import com.fasterxml.jackson.annotation.JsonProperty; |
3 | 4 | import io.kafbat.ui.config.ClustersProperties;
|
4 | 5 | import io.kafbat.ui.connect.ApiClient;
|
5 | 6 | import io.kafbat.ui.connect.api.KafkaConnectClientApi;
|
|
14 | 15 | import io.kafbat.ui.exception.KafkaConnectConflictReponseException;
|
15 | 16 | import io.kafbat.ui.exception.ValidationException;
|
16 | 17 | import io.kafbat.ui.util.WebClientConfigurator;
|
| 18 | +import jakarta.validation.constraints.NotNull; |
17 | 19 | import java.time.Duration;
|
18 | 20 | import java.util.List;
|
19 | 21 | import java.util.Map;
|
| 22 | +import java.util.Objects; |
20 | 23 | import javax.annotation.Nullable;
|
21 | 24 | import lombok.extern.slf4j.Slf4j;
|
22 | 25 | import org.springframework.http.ResponseEntity;
|
@@ -58,10 +61,24 @@ private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
|
58 | 61 |
|
59 | 62 | private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
|
60 | 63 | return publisher
|
61 |
| - .onErrorResume(WebClientResponseException.BadRequest.class, e -> |
62 |
| - Mono.error(new ValidationException("Invalid configuration"))) |
63 |
| - .onErrorResume(WebClientResponseException.InternalServerError.class, e -> |
64 |
| - Mono.error(new ValidationException("Invalid configuration"))); |
| 64 | + .onErrorResume(WebClientResponseException.BadRequest.class, |
| 65 | + RetryingKafkaConnectClient::parseConnectErrorMessage) |
| 66 | + .onErrorResume(WebClientResponseException.InternalServerError.class, |
| 67 | + RetryingKafkaConnectClient::parseConnectErrorMessage); |
| 68 | + } |
| 69 | + |
| 70 | + // Adapted from https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java |
| 71 | + // Adding the connect runtime dependency for this single class seems excessive |
| 72 | + private record ErrorMessage(@NotNull @JsonProperty("message") String message) { |
| 73 | + } |
| 74 | + |
| 75 | + private static <T> @NotNull Mono<T> parseConnectErrorMessage(WebClientResponseException parseException) { |
| 76 | + final var errorMessage = parseException.getResponseBodyAs(ErrorMessage.class); |
| 77 | + return Mono.error(new ValidationException( |
| 78 | + Objects.requireNonNull(errorMessage, |
| 79 | + // see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java |
| 80 | + "This should not happen according to the ConnectExceptionMapper") |
| 81 | + .message())); |
65 | 82 | }
|
66 | 83 |
|
67 | 84 | @Override
|
|
0 commit comments