diff --git a/CHANGELOG.md b/CHANGELOG.md
index c827f6b2..d270d060 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,7 @@
### Added
+- Added support for callbacks to mark requests as failed.
- Added support for OIDC Bearer tokens.
## [0.15.0] - 2024-07-30
diff --git a/README.md b/README.md
index 3374f642..ee28bbe8 100644
--- a/README.md
+++ b/README.md
@@ -378,6 +378,18 @@ and then reference identifier `rest-lookup-logger` in the HTTP lookup DDL proper
is provided.
+- Callback Errors:
+
+ Throw a [FailedRequestException](src/main/java/com/getindata/connectors/http/FailedRequestException.java) to indicate a
+ failed request.
+
+ This allows control over the connector's behavior when an HTTP response does not meet your expectations
+ whether based on the response body or headers.
+
+ Currently, the only side effect is to incremenet the [numRecordsSendErrors counter](https://github.com/getindata/flink-http-connector?tab=readme-ov-file#http-sink-2), as the connector does not
+ support retries yet. However, once retry functionality is implemented, this will allow users to specify if requests should be retried.
+
+
## HTTP status code handler
Http Sink and Lookup Source connectors allow defining list of HTTP status codes that should be treated as errors.
By default all 400s and 500s response codes will be interpreted as error code.
diff --git a/src/main/java/com/getindata/connectors/http/FailedRequestException.java b/src/main/java/com/getindata/connectors/http/FailedRequestException.java
new file mode 100644
index 00000000..55a5bd21
--- /dev/null
+++ b/src/main/java/com/getindata/connectors/http/FailedRequestException.java
@@ -0,0 +1,19 @@
+package com.getindata.connectors.http;
+
+/**
+ * Exception thrown from a {@link HttpPostRequestCallback}
+ * when a request should be considered as failed.
+ *
+ *
This exception is caught by the
+ * {@link com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient}
+ * and {@link com.getindata.connectors.http.internal.table.lookup.JavaNetHttpPollingClient}
+ */
+public class FailedRequestException extends Exception {
+ public FailedRequestException(String message) {
+ super(message);
+ }
+
+ public FailedRequestException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java b/src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java
index 3f9975ef..2e4b0467 100644
--- a/src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java
+++ b/src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java
@@ -25,5 +25,5 @@ void call(
RequestT requestEntry,
String endpointUrl,
Map headerMap
- );
+ ) throws FailedRequestException;
}
diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java
index 7e4c19ff..cd287140 100644
--- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java
+++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java
@@ -12,6 +12,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
+import com.getindata.connectors.http.FailedRequestException;
import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.SinkHttpClient;
@@ -98,13 +99,20 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse(
for (var response : responses) {
var sinkRequestEntry = response.getHttpRequest();
var optResponse = response.getResponse();
-
- httpPostRequestCallback.call(
- optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap);
+ var failedCallback = false;
+
+ try {
+ httpPostRequestCallback.call(
+ optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap);
+ } catch (FailedRequestException e) {
+ failedCallback = true;
+ log.debug("FailedRequestException thrown by httpPostRequestCallback", e);
+ }
// TODO Add response processor here and orchestrate it with statusCodeChecker.
if (optResponse.isEmpty() ||
- statusCodeChecker.isErrorCode(optResponse.get().statusCode())) {
+ statusCodeChecker.isErrorCode(optResponse.get().statusCode()) ||
+ failedCallback) {
failedResponses.add(sinkRequestEntry);
} else {
successfulResponses.add(sinkRequestEntry);
diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java
index ce3a31cc..743ce571 100644
--- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java
+++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java
@@ -13,6 +13,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.StringUtils;
+import com.getindata.connectors.http.FailedRequestException;
import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.internal.PollingClient;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
@@ -89,7 +90,14 @@ private Optional processHttpResponse(
HttpResponse response,
HttpLookupSourceRequestEntry request) throws IOException {
- this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap());
+ try {
+ this.httpPostRequestCallback.call(
+ response, request, "endpoint", Collections.emptyMap()
+ );
+ } catch (FailedRequestException e) {
+ log.debug("FailedRequestException thrown by httpPostRequestCallback", e);
+ return Optional.empty();
+ }
if (response == null) {
return Optional.empty();
diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java
index 345687b5..68886369 100644
--- a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java
@@ -1,8 +1,11 @@
package com.getindata.connectors.http.internal.sink.httpclient;
import java.io.File;
+import java.net.http.HttpResponse;
import java.util.Collections;
+import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
import com.github.tomakehurst.wiremock.WireMockServer;
import org.junit.jupiter.api.AfterEach;
@@ -19,6 +22,8 @@
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import com.getindata.connectors.http.FailedRequestException;
+import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.internal.HttpsConnectionTestBase;
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
@@ -62,6 +67,30 @@ public void testHttpConnection() {
batchRequestSubmitterFactory);
}
+ @Test
+ public void testHttpPostRequestCallbackWithFailedRequestException()
+ throws ExecutionException, InterruptedException {
+ wireMockServer = new WireMockServer(SERVER_PORT);
+ wireMockServer.start();
+ mockEndPoint(wireMockServer);
+
+ JavaNetSinkHttpClient client =
+ new JavaNetSinkHttpClient(
+ properties,
+ new TestPostRequestCallbackWithException(),
+ headerPreprocessor,
+ perRequestSubmitterFactory);
+ HttpSinkRequestEntry requestEntry = new HttpSinkRequestEntry("GET", new byte[0]);
+ SinkHttpClientResponse response =
+ client.putRequests(
+ Collections.singletonList(requestEntry),
+ "https://localhost:" + HTTPS_SERVER_PORT + ENDPOINT
+ ).get();
+
+ assertThat(response.getSuccessfulRequests()).isEmpty();
+ assertThat(response.getFailedRequests()).isNotEmpty();
+ }
+
@Test
public void testHttpsConnectionWithSelfSignedCert() {
@@ -366,4 +395,17 @@ private void mockEndPointWithBasicAuth(WireMockServer wireMockServer) {
.withBody("{}"))
);
}
+
+ public static class TestPostRequestCallbackWithException
+ implements HttpPostRequestCallback {
+ @Override
+ public void call(
+ HttpResponse response,
+ HttpRequest requestEntry,
+ String endpointUrl,
+ Map headerMap
+ ) throws FailedRequestException {
+ throw new FailedRequestException("Test exception");
+ }
+ }
}