From 9e02c271615bcfbe6fd61481e14d1fb164d7383d Mon Sep 17 00:00:00 2001 From: amstee Date: Mon, 9 Sep 2024 15:52:27 +0100 Subject: [PATCH 1/8] feat: support marking requests as failed from callback --- README.md | 6 +++ .../http/HttpPostRequestCallback.java | 2 +- .../http/PostRequestCallbackException.java | 11 +++++ .../httpclient/JavaNetSinkHttpClient.java | 16 +++++-- .../lookup/JavaNetHttpPollingClient.java | 10 ++++- .../JavaNetSinkHttpClientConnectionTest.java | 44 +++++++++++++++++++ 6 files changed, 83 insertions(+), 6 deletions(-) create mode 100644 src/main/java/com/getindata/connectors/http/PostRequestCallbackException.java diff --git a/README.md b/README.md index 3b3ce57a..f287e880 100644 --- a/README.md +++ b/README.md @@ -375,6 +375,12 @@ and then reference identifier `rest-lookup-logger` in the HTTP lookup DDL proper is provided. +- Callback Errors: + + It is also possible to declare if a request should be considered failed from the [HttpPostRequestCallback](src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java) by throwing a +[PostRequestCallbackException](src/main/java/com/getindata/connectors/http/PostRequestCallbackException.java). + + ## HTTP status code handler Http Sink and Lookup Source connectors allow defining list of HTTP status codes that should be treated as errors. By default all 400s and 500s response codes will be interpreted as error code. diff --git a/src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java b/src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java index 3f9975ef..64231ab0 100644 --- a/src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java +++ b/src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java @@ -25,5 +25,5 @@ void call( RequestT requestEntry, String endpointUrl, Map headerMap - ); + ) throws PostRequestCallbackException; } diff --git a/src/main/java/com/getindata/connectors/http/PostRequestCallbackException.java b/src/main/java/com/getindata/connectors/http/PostRequestCallbackException.java new file mode 100644 index 00000000..939cd860 --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/PostRequestCallbackException.java @@ -0,0 +1,11 @@ +package com.getindata.connectors.http; + +public class PostRequestCallbackException extends Exception { + public PostRequestCallbackException(String message) { + super(message); + } + + public PostRequestCallbackException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java index 7e4c19ff..8f0405d0 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java @@ -13,6 +13,7 @@ import org.apache.flink.annotation.VisibleForTesting; import com.getindata.connectors.http.HttpPostRequestCallback; +import com.getindata.connectors.http.PostRequestCallbackException; import com.getindata.connectors.http.internal.HeaderPreprocessor; import com.getindata.connectors.http.internal.SinkHttpClient; import com.getindata.connectors.http.internal.SinkHttpClientResponse; @@ -98,13 +99,20 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse( for (var response : responses) { var sinkRequestEntry = response.getHttpRequest(); var optResponse = response.getResponse(); - - httpPostRequestCallback.call( - optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap); + var failedCallback = false; + + try { + httpPostRequestCallback.call( + optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap); + } catch (PostRequestCallbackException e) { + failedCallback = true; + log.info("request marked as failed due to callback exception", e); + } // TODO Add response processor here and orchestrate it with statusCodeChecker. if (optResponse.isEmpty() || - statusCodeChecker.isErrorCode(optResponse.get().statusCode())) { + statusCodeChecker.isErrorCode(optResponse.get().statusCode()) || + failedCallback) { failedResponses.add(sinkRequestEntry); } else { successfulResponses.add(sinkRequestEntry); diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index ce3a31cc..35d18a55 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -14,6 +14,7 @@ import org.apache.flink.util.StringUtils; import com.getindata.connectors.http.HttpPostRequestCallback; +import com.getindata.connectors.http.PostRequestCallbackException; import com.getindata.connectors.http.internal.PollingClient; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker; @@ -89,7 +90,14 @@ private Optional processHttpResponse( HttpResponse response, HttpLookupSourceRequestEntry request) throws IOException { - this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap()); + try { + this.httpPostRequestCallback.call( + response, request, "endpoint", Collections.emptyMap() + ); + } catch (PostRequestCallbackException e) { + log.warn("Error during post request callback.", e); + return Optional.empty(); + } if (response == null) { return Optional.empty(); diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java index 345687b5..2eaf0832 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java @@ -1,7 +1,9 @@ package com.getindata.connectors.http.internal.sink.httpclient; import java.io.File; +import java.net.http.HttpResponse; import java.util.Collections; +import java.util.Map; import java.util.Properties; import com.github.tomakehurst.wiremock.WireMockServer; @@ -19,6 +21,8 @@ import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertThrows; +import com.getindata.connectors.http.HttpPostRequestCallback; +import com.getindata.connectors.http.PostRequestCallbackException; import com.getindata.connectors.http.internal.HttpsConnectionTestBase; import com.getindata.connectors.http.internal.SinkHttpClientResponse; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; @@ -62,6 +66,33 @@ public void testHttpConnection() { batchRequestSubmitterFactory); } + @Test + public void testHttpPostRequestCallbackWithException() { + wireMockServer = new WireMockServer(SERVER_PORT); + wireMockServer.start(); + mockEndPoint(wireMockServer); + + try { + JavaNetSinkHttpClient client = + new JavaNetSinkHttpClient( + properties, + new TestPostRequestCallbackWithException(), + headerPreprocessor, + perRequestSubmitterFactory); + HttpSinkRequestEntry requestEntry = new HttpSinkRequestEntry("GET", new byte[0]); + SinkHttpClientResponse response = + client.putRequests( + Collections.singletonList(requestEntry), + "https://localhost:" + HTTPS_SERVER_PORT + ENDPOINT + ).get(); + + assertThat(response.getSuccessfulRequests()).isEmpty(); + assertThat(response.getFailedRequests()).isNotEmpty(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @Test public void testHttpsConnectionWithSelfSignedCert() { @@ -366,4 +397,17 @@ private void mockEndPointWithBasicAuth(WireMockServer wireMockServer) { .withBody("{}")) ); } + + public static class TestPostRequestCallbackWithException + implements HttpPostRequestCallback { + @Override + public void call( + HttpResponse response, + HttpRequest requestEntry, + String endpointUrl, + Map headerMap + ) throws PostRequestCallbackException { + throw new PostRequestCallbackException("Test exception"); + } + } } From 29139e18e1d7d32b5b1af0a5034d9f9762529f7d Mon Sep 17 00:00:00 2001 From: amstee Date: Mon, 9 Sep 2024 15:52:27 +0100 Subject: [PATCH 2/8] feat: support marking requests as failed from callback --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2889b9d3..7e2c7c0f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## [Unreleased] +### Added + +- Added support for callbacks to mark requests as failed. + ## [0.15.0] - 2024-07-30 ### Added From 93ba8166d14be698431a15c1af85bc4407237bdf Mon Sep 17 00:00:00 2001 From: amstee Date: Mon, 9 Sep 2024 22:54:06 +0100 Subject: [PATCH 3/8] fix: callback exception logs to debug --- .../http/internal/sink/httpclient/JavaNetSinkHttpClient.java | 2 +- .../http/internal/table/lookup/JavaNetHttpPollingClient.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java index 8f0405d0..c7a18e46 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java @@ -106,7 +106,7 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse( optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap); } catch (PostRequestCallbackException e) { failedCallback = true; - log.info("request marked as failed due to callback exception", e); + log.debug("request marked as failed due to callback exception", e); } // TODO Add response processor here and orchestrate it with statusCodeChecker. diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index 35d18a55..380ffea2 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -95,7 +95,7 @@ private Optional processHttpResponse( response, request, "endpoint", Collections.emptyMap() ); } catch (PostRequestCallbackException e) { - log.warn("Error during post request callback.", e); + log.debug("Error during post request callback.", e); return Optional.empty(); } From 084fd52c0dfdbb7dc09c37789dd4ca6274ba4d59 Mon Sep 17 00:00:00 2001 From: amstee Date: Tue, 24 Sep 2024 08:44:25 +0100 Subject: [PATCH 4/8] renamed exception PostRequestCallbackException to FailedRequestException --- README.md | 10 ++++++++-- .../connectors/http/FailedRequestException.java | 17 +++++++++++++++++ .../http/HttpPostRequestCallback.java | 2 +- .../http/PostRequestCallbackException.java | 11 ----------- .../sink/httpclient/JavaNetSinkHttpClient.java | 6 +++--- .../table/lookup/JavaNetHttpPollingClient.java | 6 +++--- .../JavaNetSinkHttpClientConnectionTest.java | 6 +++--- 7 files changed, 35 insertions(+), 23 deletions(-) create mode 100644 src/main/java/com/getindata/connectors/http/FailedRequestException.java delete mode 100644 src/main/java/com/getindata/connectors/http/PostRequestCallbackException.java diff --git a/README.md b/README.md index f287e880..14707095 100644 --- a/README.md +++ b/README.md @@ -377,8 +377,14 @@ is provided. - Callback Errors: - It is also possible to declare if a request should be considered failed from the [HttpPostRequestCallback](src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java) by throwing a -[PostRequestCallbackException](src/main/java/com/getindata/connectors/http/PostRequestCallbackException.java). + Throw a [FailedRequestException](src/main/java/com/getindata/connectors/http/FailedRequestException.java) to indicate a + failed request. + + This allows control over the connector's behavior when an HTTP response does not meet your expectations + whether based on the response body or headers. + + Currently, there are no side effects, as the connector does not + support retries yet. However, once retry functionality is implemented, this will allow users to specify if requests should be retried. ## HTTP status code handler diff --git a/src/main/java/com/getindata/connectors/http/FailedRequestException.java b/src/main/java/com/getindata/connectors/http/FailedRequestException.java new file mode 100644 index 00000000..d48caa3d --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/FailedRequestException.java @@ -0,0 +1,17 @@ +package com.getindata.connectors.http; + +/** + * Exception thrown from a {@link HttpPostRequestCallback} when a request should be considered as failed. + * + *

This exception is caught by the {@link com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient} + * and {@link com.getindata.connectors.http.internal.table.lookup.JavaNetHttpPollingClient} + */ +public class FailedRequestException extends Exception { + public FailedRequestException(String message) { + super(message); + } + + public FailedRequestException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java b/src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java index 64231ab0..2e4b0467 100644 --- a/src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java +++ b/src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java @@ -25,5 +25,5 @@ void call( RequestT requestEntry, String endpointUrl, Map headerMap - ) throws PostRequestCallbackException; + ) throws FailedRequestException; } diff --git a/src/main/java/com/getindata/connectors/http/PostRequestCallbackException.java b/src/main/java/com/getindata/connectors/http/PostRequestCallbackException.java deleted file mode 100644 index 939cd860..00000000 --- a/src/main/java/com/getindata/connectors/http/PostRequestCallbackException.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.getindata.connectors.http; - -public class PostRequestCallbackException extends Exception { - public PostRequestCallbackException(String message) { - super(message); - } - - public PostRequestCallbackException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java index c7a18e46..017923e7 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java @@ -13,7 +13,7 @@ import org.apache.flink.annotation.VisibleForTesting; import com.getindata.connectors.http.HttpPostRequestCallback; -import com.getindata.connectors.http.PostRequestCallbackException; +import com.getindata.connectors.http.FailedRequestException; import com.getindata.connectors.http.internal.HeaderPreprocessor; import com.getindata.connectors.http.internal.SinkHttpClient; import com.getindata.connectors.http.internal.SinkHttpClientResponse; @@ -104,9 +104,9 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse( try { httpPostRequestCallback.call( optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap); - } catch (PostRequestCallbackException e) { + } catch (FailedRequestException e) { failedCallback = true; - log.debug("request marked as failed due to callback exception", e); + log.debug("FailedRequestException thrown by httpPostRequestCallback", e); } // TODO Add response processor here and orchestrate it with statusCodeChecker. diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index 380ffea2..8ee91b68 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -14,7 +14,7 @@ import org.apache.flink.util.StringUtils; import com.getindata.connectors.http.HttpPostRequestCallback; -import com.getindata.connectors.http.PostRequestCallbackException; +import com.getindata.connectors.http.FailedRequestException; import com.getindata.connectors.http.internal.PollingClient; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker; @@ -94,8 +94,8 @@ private Optional processHttpResponse( this.httpPostRequestCallback.call( response, request, "endpoint", Collections.emptyMap() ); - } catch (PostRequestCallbackException e) { - log.debug("Error during post request callback.", e); + } catch (FailedRequestException e) { + log.debug("FailedRequestException thrown by httpPostRequestCallback", e); return Optional.empty(); } diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java index 2eaf0832..fe4e4b79 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java @@ -22,7 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import com.getindata.connectors.http.HttpPostRequestCallback; -import com.getindata.connectors.http.PostRequestCallbackException; +import com.getindata.connectors.http.FailedRequestException; import com.getindata.connectors.http.internal.HttpsConnectionTestBase; import com.getindata.connectors.http.internal.SinkHttpClientResponse; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; @@ -406,8 +406,8 @@ public void call( HttpRequest requestEntry, String endpointUrl, Map headerMap - ) throws PostRequestCallbackException { - throw new PostRequestCallbackException("Test exception"); + ) throws FailedRequestException { + throw new FailedRequestException("Test exception"); } } } From 00fdaf674ce40b88804ab9ff70e1ca6c167bc610 Mon Sep 17 00:00:00 2001 From: amstee Date: Tue, 24 Sep 2024 08:48:00 +0100 Subject: [PATCH 5/8] removed unecessary try-catch --- .../JavaNetSinkHttpClientConnectionTest.java | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java index fe4e4b79..a6f1d738 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java @@ -5,6 +5,7 @@ import java.util.Collections; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ExecutionException; import com.github.tomakehurst.wiremock.WireMockServer; import org.junit.jupiter.api.AfterEach; @@ -67,30 +68,26 @@ public void testHttpConnection() { } @Test - public void testHttpPostRequestCallbackWithException() { + public void testHttpPostRequestCallbackWithFailedRequestException() throws ExecutionException, InterruptedException { wireMockServer = new WireMockServer(SERVER_PORT); wireMockServer.start(); mockEndPoint(wireMockServer); - try { - JavaNetSinkHttpClient client = - new JavaNetSinkHttpClient( - properties, - new TestPostRequestCallbackWithException(), - headerPreprocessor, - perRequestSubmitterFactory); - HttpSinkRequestEntry requestEntry = new HttpSinkRequestEntry("GET", new byte[0]); - SinkHttpClientResponse response = - client.putRequests( - Collections.singletonList(requestEntry), - "https://localhost:" + HTTPS_SERVER_PORT + ENDPOINT - ).get(); + JavaNetSinkHttpClient client = + new JavaNetSinkHttpClient( + properties, + new TestPostRequestCallbackWithException(), + headerPreprocessor, + perRequestSubmitterFactory); + HttpSinkRequestEntry requestEntry = new HttpSinkRequestEntry("GET", new byte[0]); + SinkHttpClientResponse response = + client.putRequests( + Collections.singletonList(requestEntry), + "https://localhost:" + HTTPS_SERVER_PORT + ENDPOINT + ).get(); - assertThat(response.getSuccessfulRequests()).isEmpty(); - assertThat(response.getFailedRequests()).isNotEmpty(); - } catch (Exception e) { - throw new RuntimeException(e); - } + assertThat(response.getSuccessfulRequests()).isEmpty(); + assertThat(response.getFailedRequests()).isNotEmpty(); } @Test From c32d55b49326c8a6080af6b1837baacf20b26cb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9my=20Barneron?= Date: Tue, 24 Sep 2024 08:51:53 +0100 Subject: [PATCH 6/8] Improved readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 14707095..163354bf 100644 --- a/README.md +++ b/README.md @@ -383,7 +383,7 @@ is provided. This allows control over the connector's behavior when an HTTP response does not meet your expectations whether based on the response body or headers. - Currently, there are no side effects, as the connector does not + Currently, the only side effect is to incremenet the [numRecordsSendErrors counter](https://github.com/getindata/flink-http-connector?tab=readme-ov-file#http-sink-2), as the connector does not support retries yet. However, once retry functionality is implemented, this will allow users to specify if requests should be retried. From cb1ed41fe385c9c7dd3e7820496e664ced0e26a6 Mon Sep 17 00:00:00 2001 From: amstee Date: Fri, 11 Oct 2024 16:25:23 +0100 Subject: [PATCH 7/8] chore: merge main --- CHANGELOG.md | 1 + README.md | 29 ++- pom.xml | 2 + .../connectors/http/HttpSinkBuilder.java | 2 +- .../OIDCAuthHeaderValuePreprocessor.java | 50 ++++ .../internal/auth/OidcAccessTokenManager.java | 150 +++++++++++ .../config/HttpConnectorConfigConstants.java | 8 + .../lookup/HttpLookupConnectorOptions.java | 27 +- .../table/lookup/HttpLookupTableSource.java | 17 +- .../lookup/HttpLookupTableSourceFactory.java | 19 +- .../table/lookup/RequestFactoryBase.java | 28 ++- .../internal/table/sink/HttpDynamicSink.java | 2 +- .../http/internal/utils/HttpHeaderUtils.java | 76 +++++- .../internal/HttpsConnectionTestBase.java | 2 +- .../auth/OidcAccessTokenManagerTest.java | 234 ++++++++++++++++++ .../httpclient/JavaNetSinkHttpClientTest.java | 2 +- .../HttpLookupTableSourceFactoryTest.java | 31 ++- ...avaNetHttpPollingClientConnectionTest.java | 4 +- ...tHttpPollingClientHttpsConnectionTest.java | 42 ++-- .../lookup/JavaNetHttpPollingClientTest.java | 4 +- 20 files changed, 674 insertions(+), 56 deletions(-) create mode 100644 src/main/java/com/getindata/connectors/http/internal/OIDCAuthHeaderValuePreprocessor.java create mode 100644 src/main/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManager.java create mode 100644 src/test/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManagerTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e2c7c0f..d270d060 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Added - Added support for callbacks to mark requests as failed. +- Added support for OIDC Bearer tokens. ## [0.15.0] - 2024-07-30 diff --git a/README.md b/README.md index 163354bf..ee28bbe8 100644 --- a/README.md +++ b/README.md @@ -337,6 +337,9 @@ CREATE TABLE http ( ) ``` +Note that when using OIDC, it adds an `Authentication` header with the bearer token; this will override +an existing `Authorization` header specified in configuration. + #### Custom request/response callback - Http Sink processes responses that it gets from the HTTP endpoint along their respective requests. One can customize the @@ -422,13 +425,30 @@ In this special case, you can configure connector to trust all certificates with To enable this option use `gid.connector.http.security.cert.server.allowSelfSigned` property setting its value to `true`. ## Basic Authentication -The connector supports Basic Authentication mechanism using HTTP `Authorization` header. +The connector supports Basic Authentication using a HTTP `Authorization` header. The header value can be set via properties, similarly as for other headers. The connector converts the passed value to Base64 and uses it for the request. If the used value starts with the prefix `Basic `, or `gid.connector.http.source.lookup.use-raw-authorization-header` is set to `'true'`, it will be used as header value as is, without any extra modification. +## OIDC Bearer Authentication +The connector supports Bearer Authentication using a HTTP `Authorization` header. The [OAuth 2.0 rcf](https://datatracker.ietf.org/doc/html/rfc6749) mentions [Obtaining Authorization](https://datatracker.ietf.org/doc/html/rfc6749#section-4) +and an authorization grant. OIDC makes use of this [authorisation grant](https://datatracker.ietf.org/doc/html/rfc6749#section-1.3) in a [Token Request](https://openid.net/specs/openid-connect-core-1_0.html#TokenRequest) by including a [OAuth grant type](https://oauth.net/2/grant-types/) and associated properties, the response is the [token response](https://openid.net/specs/openid-connect-core-1_0.html#TokenResponse). + +If you want to use this authorization then you should supply the `Token Request` body in `application/x-www-form-urlencoded` encoding +in configuration property `gid.connector.http.security.oidc.token.request`. See [grant extension](https://datatracker.ietf.org/doc/html/rfc6749#section-4.5) for +an example of a customised grant type token request. The supplied `token request` will be issued to the +[token end point](https://datatracker.ietf.org/doc/html/rfc6749#section-3.2), whose url should be supplied in configuration property +`gid.connector.http.security.oidc.token.endpoint.url`. The returned `access token` is then cached and used for subsequent requests; if the token has expired then + a new one is requested. There is a property `gid.connector.http.security.oidc.token.expiry.reduction`, that defaults to 1 second; new tokens will +be requested if the current time is later than the cached token expiry time minus `gid.connector.http.security.oidc.token.expiry.reduction`. + +### Restrictions at this time +* No authentication is applied to the token request. +* The processing does not use the refresh token if it present. + ## Table API Connector Options ### HTTP TableLookup Source + | Option | Required | Description/Value | |---------------------------------------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | connector | required | The Value should be set to _rest-lookup_ | @@ -448,19 +468,22 @@ is set to `'true'`, it will be used as header value as is, without any extra mod | gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. | | gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. | | gid.connector.http.security.cert.server.allowSelfSigned | optional | Accept untrusted certificates for TLS communication. | +| gid.connector.http.security.oidc.token.request | optional | OIDC `Token Request` body in `application/x-www-form-urlencoded` encoding | +| gid.connector.http.security.oidc.token.endpoint.url | optional | OIDC `Token Endpoint` url, to which the token request will be issued | +| gid.connector.http.security.oidc.token.expiry.reduction | optional | OIDC tokens will be requested if the current time is later than the cached token expiry time minus this value. | | gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. | | gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. | | gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. | | gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. | | gid.connector.http.source.lookup.request-callback | optional | Specify which `HttpLookupPostRequestCallback` implementation to use. By default, it is set to `slf4j-lookup-logger` corresponding to `Slf4jHttpLookupPostRequestCallback`. | - ### HTTP Sink + | Option | Required | Description/Value | |---------------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | connector | required | Specify what connector to use. For HTTP Sink it should be set to _'http-sink'_. | -| url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. | | format | required | Specify what format to use. | +| url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. | | insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. | | sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. | | sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. | diff --git a/pom.xml b/pom.xml index c160e556..eed3f5a4 100644 --- a/pom.xml +++ b/pom.xml @@ -297,6 +297,8 @@ under the License. maven-surefire-plugin 3.0.0-M5 + + --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED diff --git a/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java b/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java index 399eb35b..208bcf01 100644 --- a/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java +++ b/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java @@ -67,7 +67,7 @@ public class HttpSinkBuilder extends DEFAULT_POST_REQUEST_CALLBACK = new Slf4jHttpPostRequestCallback(); private static final HeaderPreprocessor DEFAULT_HEADER_PREPROCESSOR = - HttpHeaderUtils.createDefaultHeaderPreprocessor(); + HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(); private final Properties properties = new Properties(); diff --git a/src/main/java/com/getindata/connectors/http/internal/OIDCAuthHeaderValuePreprocessor.java b/src/main/java/com/getindata/connectors/http/internal/OIDCAuthHeaderValuePreprocessor.java new file mode 100644 index 00000000..945f3ba1 --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/internal/OIDCAuthHeaderValuePreprocessor.java @@ -0,0 +1,50 @@ +package com.getindata.connectors.http.internal; + +import java.net.http.HttpClient; +import java.time.Duration; +import java.util.Optional; + +import lombok.extern.slf4j.Slf4j; + +import com.getindata.connectors.http.internal.auth.OidcAccessTokenManager; + +/** + * Header processor for HTTP OIDC Authentication mechanism. + */ +@Slf4j +public class OIDCAuthHeaderValuePreprocessor implements HeaderValuePreprocessor { + + + private final String oidcAuthURL; + private final String oidcTokenRequest; + private Duration oidcExpiryReduction = Duration.ofSeconds(1); + /** + * Add the access token to the request using OidcAuth authenticate method that + * gives us a valid access token. + * @param oidcAuthURL OIDC token endpoint + * @param oidcTokenRequest OIDC Token Request + * @param oidcExpiryReduction OIDC token expiry reduction + */ + + public OIDCAuthHeaderValuePreprocessor(String oidcAuthURL, + String oidcTokenRequest, + Optional oidcExpiryReduction) { + this.oidcAuthURL = oidcAuthURL; + this.oidcTokenRequest = oidcTokenRequest; + if (oidcExpiryReduction.isPresent()) { + this.oidcExpiryReduction = oidcExpiryReduction.get(); + } + } + + @Override + public String preprocessHeaderValue(String rawValue) { + OidcAccessTokenManager auth = new OidcAccessTokenManager( + HttpClient.newBuilder().build(), + oidcTokenRequest, + oidcAuthURL, + oidcExpiryReduction + ); + // apply the OIDC authentication by adding the dynamically calculated header value. + return "BEARER " + auth.authenticate(); + } +} diff --git a/src/main/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManager.java b/src/main/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManager.java new file mode 100644 index 00000000..9334870c --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManager.java @@ -0,0 +1,150 @@ + +/* + * Copyright 2020 Red Hat + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.getindata.connectors.http.internal.auth; + +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.time.Instant; + +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +/** + * This class is inspired by + * https://github.com/Apicurio/apicurio-common-rest-client/blob/ + * 944ac9eb527c291a6083bd10ee012388e1684d20/rest-client-common/src/main/java/io/ + * apicurio/rest/client/auth/OidcAuth.java. + * + * The OIDC access token manager encapsulates the caching of an OIDC access token, + * which can be short lived, for example an hour. The authenticate method will return an + * un-expired access token, either from the cache or by requesting a new access token. + */ +@Slf4j +public class OidcAccessTokenManager { + + private static final Duration DEFAULT_TOKEN_EXPIRATION_REDUCTION = Duration.ofSeconds(1); + private final HttpClient httpClient; + private final String tokenRequest; + + private final String url; + private final Duration tokenExpirationReduction; + + private String cachedAccessToken; + private Instant cachedAccessTokenExp; + + /** + * Construct an Oidc access token manager with the default token expiration reduction + * @param httpClient httpClient to use to call the token endpoint. + * @param tokenRequest token request + * @param url token endpoint url + */ + public OidcAccessTokenManager(HttpClient httpClient, String tokenRequest, String url) { + this(httpClient, tokenRequest, url, DEFAULT_TOKEN_EXPIRATION_REDUCTION); + } + /** + * Construct an Oidc access token manager with the supplied token expiration reduction + * @param httpClient httpClient to use to call the token endpoint. + * @param tokenRequest token request this need to be form urlencoded + * @param url token endpoint url + * @param tokenExpirationReduction token expiry reduction, request a new token if the + * current time is later than the cached access token + * expiry time reduced by this value. This means that + * we will not use the cached token if it is about + * to expire. + */ + public OidcAccessTokenManager(HttpClient httpClient, String tokenRequest, String url, + Duration tokenExpirationReduction) { + this.tokenRequest = tokenRequest; + this.httpClient = httpClient; + this.url = url; + if (null == tokenExpirationReduction) { + this.tokenExpirationReduction = DEFAULT_TOKEN_EXPIRATION_REDUCTION; + } else { + this.tokenExpirationReduction = tokenExpirationReduction; + } + } + + /** + * Request an access token from the token endpoint + */ + private void requestAccessToken() { + try { + HttpRequest httpRequest = + HttpRequest.newBuilder() + .uri(URI.create(url)) + .header("Content-Type", "application/x-www-form-urlencoded") + .method("POST", HttpRequest.BodyPublishers.ofString(tokenRequest)) + .build(); + + HttpResponse response = httpClient.send(httpRequest, + HttpResponse.BodyHandlers.ofByteArray()); + //create ObjectMapper instance + final ObjectMapper objectMapper = new ObjectMapper(); + if (200 == response.statusCode()) { + byte[] bytes = response.body(); + JsonNode rootNode = objectMapper.readTree(bytes); + JsonNode tokenNode = rootNode.path("access_token"); + JsonNode expiresInNode = rootNode.path("expires_in"); + this.cachedAccessToken = tokenNode.textValue(); + /* + expiresIn is in seconds + */ + Duration expiresIn = Duration.ofSeconds(expiresInNode.asInt()); + if (expiresIn.compareTo(this.tokenExpirationReduction) > 0) { + //expiresIn is greater than tokenExpirationReduction + expiresIn = expiresIn.minus(this.tokenExpirationReduction); + } + this.cachedAccessTokenExp = Instant.now().plus(expiresIn); + } else { + throw new IllegalStateException("Attempted to get an access token but got http" + + " status code " + response.statusCode()); + } + } catch (JsonProcessingException e) { + throw new IllegalStateException("Error found while trying to request a new token"); + } catch (IOException e) { + throw new IllegalStateException("IO Exception occurred", e); + } catch (InterruptedException e) { + throw new IllegalStateException("Interrupted Exception occurred", e); + } + } + + /** + * Get a valid unexpired access token. + * @return access token. + */ + public String authenticate() { + if (isAccessTokenRequired()) { + requestAccessToken(); + } + return cachedAccessToken; + } + + private boolean isAccessTokenRequired() { + return null == cachedAccessToken || isTokenExpired(); + } + + private boolean isTokenExpired() { + return Instant.now().isAfter(this.cachedAccessTokenExp); + } +} diff --git a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java index 5415a159..b501b29b 100644 --- a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java +++ b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java @@ -27,6 +27,14 @@ public final class HttpConnectorConfigConstants { public static final String LOOKUP_SOURCE_HEADER_PREFIX = GID_CONNECTOR_HTTP + "source.lookup.header."; + public static final String OIDC_AUTH_TOKEN_REQUEST = GID_CONNECTOR_HTTP + + "security.oidc.token.request"; + + public static final String OIDC_AUTH_TOKEN_ENDPOINT_URL = GID_CONNECTOR_HTTP + + "security.oidc.token.endpoint.url"; + + public static final String OIDC_AUTH_TOKEN_EXPIRY_REDUCTION = GID_CONNECTOR_HTTP + + "security.oidc.token.expiry.reduction"; /** * Whether to use the raw value of the Authorization header. If set, it prevents * the special treatment of the header for Basic Authentication, thus preserving the passed diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java index 9a7b4320..9947d52d 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java @@ -1,11 +1,11 @@ package com.getindata.connectors.http.internal.table.lookup; +import java.time.Duration; + import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; -import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_USE_RAW; -import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER; -import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.*; public class HttpLookupConnectorOptions { @@ -53,4 +53,25 @@ public class HttpLookupConnectorOptions { ConfigOptions.key(SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER) .stringType() .defaultValue(Slf4jHttpLookupPostRequestCallbackFactory.IDENTIFIER); + + public static final ConfigOption SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL = + ConfigOptions.key(OIDC_AUTH_TOKEN_ENDPOINT_URL) + .stringType() + .noDefaultValue() + .withDescription("OIDC Token endpoint url."); + + public static final ConfigOption SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST = + ConfigOptions.key(OIDC_AUTH_TOKEN_REQUEST) + .stringType() + .noDefaultValue() + .withDescription("OIDC token request."); + + public static final ConfigOption SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION = + ConfigOptions.key(OIDC_AUTH_TOKEN_EXPIRY_REDUCTION) + .durationType() + .defaultValue(Duration.ofSeconds(1)) + .withDescription("OIDC authorization access token expiry" + + " reduction as a Duration." + + " A new access token is obtained if the token" + + " is older than it's expiry time minus this value."); } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java index c077f21c..129837c3 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java @@ -37,7 +37,7 @@ import com.getindata.connectors.http.internal.table.lookup.querycreators.GenericGetQueryCreatorFactory; import com.getindata.connectors.http.internal.table.lookup.querycreators.GenericJsonQueryCreatorFactory; import com.getindata.connectors.http.internal.utils.HttpHeaderUtils; -import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_QUERY_CREATOR_IDENTIFIER; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.*; import static com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSourceFactory.row; @Slf4j @@ -107,8 +107,10 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContex } protected LookupRuntimeProvider getLookupRuntimeProvider(LookupRow lookupRow, - DeserializationSchema responseSchemaDecoder, - PollingClientFactory pollingClientFactory) { + DeserializationSchema + responseSchemaDecoder, + PollingClientFactory + pollingClientFactory) { HttpTableLookupFunction dataLookupFunction = new HttpTableLookupFunction( @@ -167,11 +169,8 @@ private PollingClientFactory createPollingClientFactory( LookupQueryCreator lookupQueryCreator, HttpLookupConfig lookupConfig) { - boolean useRawAuthHeader = - lookupConfig.getReadableConfig().get(HttpLookupConnectorOptions.USE_RAW_AUTH_HEADER); - - HeaderPreprocessor headerPreprocessor = - HttpHeaderUtils.createDefaultHeaderPreprocessor(useRawAuthHeader); + HeaderPreprocessor headerPreprocessor = HttpHeaderUtils.createHeaderPreprocessor( + lookupConfig.getReadableConfig()); String lookupMethod = lookupConfig.getLookupMethod(); HttpRequestFactory requestFactory = (lookupMethod.equalsIgnoreCase("GET")) ? @@ -185,7 +184,7 @@ private PollingClientFactory createPollingClientFactory( headerPreprocessor, lookupConfig ); - + log.info("requestFactory is " + requestFactory); return new JavaNetHttpPollingClientFactory(requestFactory); } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java index c9f8f8c2..6c2edf20 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java @@ -59,6 +59,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext) HttpConnectorConfigConstants.GID_CONNECTOR_HTTP, LOOKUP_REQUEST_FORMAT.key() ); + validateHttpLookupSourceOptions(readable); DecodingFormat> decodingFormat = helper.discoverDecodingFormat( @@ -81,6 +82,17 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext) getLookupCache(readable) ); } + protected void validateHttpLookupSourceOptions(ReadableConfig tableOptions) + throws IllegalArgumentException { + // ensure that there is an OIDC token request if we have an OIDC token endpoint + tableOptions.getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL).ifPresent(url -> { + if (tableOptions.getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST).isEmpty()) { + throw new IllegalArgumentException("Config option " + + SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST.key() + " is required, if " + + SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL.key() + " is configured."); + } + }); + } @Override public String factoryIdentifier() { @@ -94,7 +106,6 @@ public Set> requiredOptions() { @Override public Set> optionalOptions() { - return Set.of( URL_ARGS, ASYNC_POLLING, @@ -105,7 +116,11 @@ public Set> optionalOptions() { LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE, LookupOptions.PARTIAL_CACHE_MAX_ROWS, LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY, - LookupOptions.MAX_RETRIES); + LookupOptions.MAX_RETRIES, + SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION, + SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST, + SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL + ); } private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig readableConfig) { diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java index f6c19f62..44689063 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java @@ -4,7 +4,11 @@ import java.net.http.HttpRequest.Builder; import java.util.Arrays; import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.table.data.RowData; import org.apache.flink.util.FlinkRuntimeException; @@ -14,10 +18,12 @@ import com.getindata.connectors.http.internal.HeaderPreprocessor; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; import com.getindata.connectors.http.internal.utils.HttpHeaderUtils; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL; /** * Base class for {@link HttpRequest} factories. */ +@Slf4j public abstract class RequestFactoryBase implements HttpRequestFactory { public static final String DEFAULT_REQUEST_TIMEOUT_SECONDS = "30"; @@ -35,6 +41,7 @@ public abstract class RequestFactoryBase implements HttpRequestFactory { * HTTP headers that should be used for {@link HttpRequest} created by factory. */ private final String[] headersAndValues; + private final HttpLookupConfig options; public RequestFactoryBase( LookupQueryCreator lookupQueryCreator, @@ -43,6 +50,21 @@ public RequestFactoryBase( this.baseUrl = options.getUrl(); this.lookupQueryCreator = lookupQueryCreator; + this.options = options; + + Properties properties = options.getProperties(); + /* + * For OIDC, the preprocessor will fully specify the Authentication header value, + * as a bearer token. But the preprocessors only amend existing headers, so in this case + * if there is no existing authorization header then we add a dummy one to the properties, + * so the preprocessor will be driven and will provide the value. + */ + Optional oidcAuthURL = options.getReadableConfig() + .getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL); + if (oidcAuthURL.isPresent()) { + properties.put(HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_PREFIX + + HttpHeaderUtils.AUTHORIZATION, "Dummy"); + } var headerMap = HttpHeaderUtils .prepareHeaderMap( @@ -52,6 +74,11 @@ public RequestFactoryBase( ); this.headersAndValues = HttpHeaderUtils.toHeaderAndValueArray(headerMap); + + log.debug("RequestFactoryBase headersAndValues: " + + Arrays.stream(headersAndValues) + .map(Object::toString) + .collect(Collectors.joining(","))); this.httpRequestTimeOutSeconds = Integer.parseInt( options.getProperties().getProperty( HttpConnectorConfigConstants.LOOKUP_HTTP_TIMEOUT_SECONDS, @@ -67,7 +94,6 @@ public HttpLookupSourceRequestEntry buildLookupRequest(RowData lookupRow) { getLogger().debug("Created Http lookup query: " + lookupQueryInfo); Builder requestBuilder = setUpRequestMethod(lookupQueryInfo); - if (headersAndValues.length != 0) { requestBuilder.headers(headersAndValues); } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java index ec7c53bd..f634e1d6 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java @@ -129,7 +129,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) .setHttpPostRequestCallback(httpPostRequestCallback) // In future header preprocessor could be set via custom factory - .setHttpHeaderPreprocessor(HttpHeaderUtils.createDefaultHeaderPreprocessor()) + .setHttpHeaderPreprocessor(HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor()) .setElementConverter( new SerializationSchemaElementConverter(insertMethod, serializationSchema)) .setProperties(properties); diff --git a/src/main/java/com/getindata/connectors/http/internal/utils/HttpHeaderUtils.java b/src/main/java/com/getindata/connectors/http/internal/utils/HttpHeaderUtils.java index 2e737a6c..c97e1b7b 100644 --- a/src/main/java/com/getindata/connectors/http/internal/utils/HttpHeaderUtils.java +++ b/src/main/java/com/getindata/connectors/http/internal/utils/HttpHeaderUtils.java @@ -1,22 +1,30 @@ package com.getindata.connectors.http.internal.utils; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import java.time.Duration; +import java.util.*; import java.util.Map.Entry; -import java.util.Properties; import java.util.stream.Stream; import lombok.AccessLevel; import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.configuration.ReadableConfig; import com.getindata.connectors.http.internal.BasicAuthHeaderValuePreprocessor; import com.getindata.connectors.http.internal.ComposeHeaderPreprocessor; import com.getindata.connectors.http.internal.HeaderPreprocessor; +import com.getindata.connectors.http.internal.OIDCAuthHeaderValuePreprocessor; +import com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.*; + + @NoArgsConstructor(access = AccessLevel.NONE) +@Slf4j public final class HttpHeaderUtils { + public static final String AUTHORIZATION = "Authorization"; + public static Map prepareHeaderMap( String headerKeyPrefix, Properties properties, @@ -32,11 +40,16 @@ public static Map prepareHeaderMap( for (Entry headerAndValue : propertyHeaderMap.entrySet()) { String propertyName = headerAndValue.getKey(); String headerValue = headerAndValue.getValue(); - + log.info("prepareHeaderMap propertyName=" + propertyName + + ",headerValue" + headerValue); String headerName = ConfigUtils.extractPropertyLastElement(propertyName); + String preProcessedHeader = + headerPreprocessor.preprocessValueForHeader(headerName, headerValue); + log.info("prepareHeaderMap preProcessedHeader=" + + preProcessedHeader); headerMap.put( headerName, - headerPreprocessor.preprocessValueForHeader(headerName, headerValue) + preProcessedHeader ); } return headerMap; @@ -67,14 +80,57 @@ public static String[] toHeaderAndValueArray(Map headerMap) { .toArray(String[]::new); } - public static HeaderPreprocessor createDefaultHeaderPreprocessor() { - return createDefaultHeaderPreprocessor(false); + public static HeaderPreprocessor createBasicAuthorizationHeaderPreprocessor() { + return createBasicAuthorizationHeaderPreprocessor(false); } - public static HeaderPreprocessor createDefaultHeaderPreprocessor(boolean useRawAuthHeader) { + public static HeaderPreprocessor createBasicAuthorizationHeaderPreprocessor( + boolean useRawAuthHeader) { return new ComposeHeaderPreprocessor( Collections.singletonMap( - "Authorization", new BasicAuthHeaderValuePreprocessor(useRawAuthHeader)) + AUTHORIZATION, new BasicAuthHeaderValuePreprocessor(useRawAuthHeader)) + ); + } + + public static HeaderPreprocessor createOIDCAuthorizationHeaderPreprocessor( + String oidcAuthURL, + String oidcTokenRequest, + Optional oidcExpiryReduction + ) { + return new ComposeHeaderPreprocessor( + Collections.singletonMap( + AUTHORIZATION, new OIDCAuthHeaderValuePreprocessor(oidcAuthURL, + oidcTokenRequest, oidcExpiryReduction)) ); } + + public static HeaderPreprocessor createHeaderPreprocessor(ReadableConfig readableConfig) { + HeaderPreprocessor headerPreprocessor; + Optional oidcAuthURL = readableConfig + .getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL); + + if(oidcAuthURL.isPresent()) { + Optional oidcTokenRequest = readableConfig + .getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST); + + Optional oidcExpiryReduction = readableConfig + .getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION); + headerPreprocessor = HttpHeaderUtils.createOIDCAuthorizationHeaderPreprocessor( + oidcAuthURL.get(), oidcTokenRequest.get(), oidcExpiryReduction); + log.info("created HeaderPreprocessor " + headerPreprocessor + + " for OIDC oidcAuthURL=" + oidcAuthURL + + ", oidcTokenRequest=" + oidcTokenRequest + + ", oidcExpiryReduction=" + oidcExpiryReduction); + } else { + boolean useRawAuthHeader = + readableConfig.get(HttpLookupConnectorOptions.USE_RAW_AUTH_HEADER); + + headerPreprocessor = + HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor( + useRawAuthHeader); + log.info("created HeaderPreprocessor for basic useRawAuthHeader=" + useRawAuthHeader); + } + log.info("returning HeaderPreprocessor " + headerPreprocessor); + return headerPreprocessor; + } } diff --git a/src/test/java/com/getindata/connectors/http/internal/HttpsConnectionTestBase.java b/src/test/java/com/getindata/connectors/http/internal/HttpsConnectionTestBase.java index 6855a86c..28cba6be 100644 --- a/src/test/java/com/getindata/connectors/http/internal/HttpsConnectionTestBase.java +++ b/src/test/java/com/getindata/connectors/http/internal/HttpsConnectionTestBase.java @@ -36,7 +36,7 @@ public abstract class HttpsConnectionTestBase { public void setUp() { this.properties = new Properties(); - this.headerPreprocessor = HttpHeaderUtils.createDefaultHeaderPreprocessor(); + this.headerPreprocessor = HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(); } public void tearDown() { diff --git a/src/test/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManagerTest.java b/src/test/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManagerTest.java new file mode 100644 index 00000000..d02f57da --- /dev/null +++ b/src/test/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManagerTest.java @@ -0,0 +1,234 @@ +package com.getindata.connectors.http.internal.auth; + +import java.net.*; +import java.net.http.HttpClient; +import java.net.http.HttpHeaders; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.SSLSession; + +import net.minidev.json.JSONObject; +import org.junit.jupiter.api.Test; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class OidcAccessTokenManagerTest { + + private static final String BASE_URL = "http://localhost/aaa"; + + @Test + public void testAuthenticate() throws InterruptedException { + + MockHttpClient authHttpClient = new MockHttpClient(); + + authHttpClient.setIsExpired(1); + authHttpClient.setAccessToken("Access1"); + String url = BASE_URL; + OidcAccessTokenManager oidcAuth = new OidcAccessTokenManager(authHttpClient, "abc", url); + + // apply the authorization to the httpRequest + String token1 = oidcAuth.authenticate(); + assertThat(token1).isNotNull(); + String token2 = oidcAuth.authenticate(); + assertThat(token2).isNotNull(); + // check the token is cached + assertThat(token1).isEqualTo(token2); + Thread.sleep(2000); + // check the token is different after first token has expired + String token3 = oidcAuth.authenticate(); + assertThat(token3).isNotNull(); + assertThat(token3).isNotEqualTo(token2); + } + + @Test + public void testAuthenticateWithBadStatusCode() throws InterruptedException { + + MockHttpClient authHttpClient = new MockHttpClient(); + + authHttpClient.setIsExpired(1); + authHttpClient.setAccessToken("Access1"); + authHttpClient.setStatus(500); + String url = BASE_URL; + OidcAccessTokenManager oidcAuth = new OidcAccessTokenManager(authHttpClient, "abc", url); + + try { + oidcAuth.authenticate(); + assertTrue(false, "Bad status code should result in an exception."); + } catch (IllegalStateException e) { + // expected + } + } + + @Test + public void testAuthenticateWithExpiryReduction() throws InterruptedException { + + MockHttpClient authHttpClient = new MockHttpClient(); + + authHttpClient.setIsExpired(1); + authHttpClient.setAccessToken("Access1"); + String url = "http://localhost"; + OidcAccessTokenManager oidcAuth = new OidcAccessTokenManager(authHttpClient, + "abc", url, Duration.ofSeconds(5)); + + // apply the authorization to the httpRequest + String token1 = oidcAuth.authenticate(); + assertThat(token1).isNotNull(); + String token2 = oidcAuth.authenticate(); + assertThat(token2).isNotNull(); + } + + class MockHttpClient extends HttpClient { + private int isExpired; + private String accessToken; + private int count = 0; + private int status = 200; + + @Override + public Optional cookieHandler() { + return Optional.empty(); + } + + @Override + public Optional connectTimeout() { + return Optional.empty(); + } + + @Override + public Redirect followRedirects() { + return null; + } + + @Override + public Optional proxy() { + return Optional.empty(); + } + + @Override + public SSLContext sslContext() { + return null; + } + + @Override + public SSLParameters sslParameters() { + return null; + } + + @Override + public Optional authenticator() { + return Optional.empty(); + } + + @Override + public Version version() { + return null; + } + + @Override + public Optional executor() { + return Optional.empty(); + } + + @Override + public HttpResponse send(HttpRequest request, + HttpResponse.BodyHandler responseBodyHandler) { + + JSONObject json = new JSONObject(); + + json.put("expires_in", 2); + json.put("access_token", "dummy_token_" + this.count++); + byte[] bytes = json.toJSONString().getBytes(); + + MockHttpResponse mockHttpResponse = new MockHttpResponse(); + mockHttpResponse.setStatusCode(status); + mockHttpResponse.setBody(bytes); + + return (HttpResponse) mockHttpResponse; + } + + @Override + public CompletableFuture> sendAsync( + HttpRequest request, + HttpResponse.BodyHandler responseBodyHandler) { + return null; + } + + @Override + public CompletableFuture> sendAsync( + HttpRequest request, + HttpResponse.BodyHandler responseBodyHandler, + HttpResponse.PushPromiseHandler pushPromiseHandler) { + return null; + } + + public void setIsExpired(int isExpired) { + this.isExpired = isExpired; + } + + public void setAccessToken(String accesstoken) { + this.accessToken = accesstoken; + } + + public void setStatus(int status) { + this.status = status; + } + + class MockHttpResponse implements HttpResponse { + int statusCode = 0; + byte[] body = new byte[0]; + + @Override + public int statusCode() { + return statusCode; + } + + @Override + public HttpRequest request() { + return null; + } + + @Override + public Optional> previousResponse() { + return Optional.empty(); + } + + @Override + public HttpHeaders headers() { + return null; + } + + @Override + public byte[] body() { + return body; + } + + @Override + public Optional sslSession() { + return Optional.empty(); + } + + @Override + public URI uri() { + return null; + } + + @Override + public Version version() { + return null; + } + + public void setStatusCode(int statusCode) { + this.statusCode = statusCode; + } + + public void setBody(byte[] body) { + this.body = body; + } + } + } +} diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientTest.java index b7f57733..c12b2699 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientTest.java @@ -50,7 +50,7 @@ public static void afterAll() { @BeforeEach public void setUp() { postRequestCallback = new Slf4jHttpPostRequestCallback(); - headerPreprocessor = HttpHeaderUtils.createDefaultHeaderPreprocessor(); + headerPreprocessor = HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(); httpClientStaticMock.when(HttpClient::newBuilder).thenReturn(httpClientBuilder); when(httpClientBuilder.followRedirects(any())).thenReturn(httpClientBuilder); when(httpClientBuilder.sslContext(any())).thenReturn(httpClientBuilder); diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactoryTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactoryTest.java index 31236460..82811004 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactoryTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactoryTest.java @@ -1,12 +1,9 @@ package com.getindata.connectors.http.internal.table.lookup; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; @@ -16,6 +13,7 @@ import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.junit.jupiter.api.Assertions.assertFalse; public class HttpLookupTableSourceFactoryTest { @@ -37,6 +35,29 @@ public class HttpLookupTableSourceFactoryTest { UniqueConstraint.primaryKey("id", List.of("id")) ); + @Test + void validateHttpLookupSourceOptions() { + + HttpLookupTableSourceFactory httpLookupTableSourceFactory + = new HttpLookupTableSourceFactory(); + TableConfig tableConfig = new TableConfig(); + httpLookupTableSourceFactory.validateHttpLookupSourceOptions(tableConfig); + tableConfig.set(HttpLookupConnectorOptions + .SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL.key(), "aaa"); + + try { + httpLookupTableSourceFactory.validateHttpLookupSourceOptions(tableConfig); + assertFalse(true, "Expected an error."); + } catch (IllegalArgumentException e) { + // expected + } + // should now work. + tableConfig.set(HttpLookupConnectorOptions + .SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST.key(), "bbb"); + + httpLookupTableSourceFactory.validateHttpLookupSourceOptions(tableConfig); + } + @Test void shouldCreateForMandatoryFields() { Map options = getMandatoryOptions(); diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java index df7b069b..27f62df0 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java @@ -325,7 +325,7 @@ private GetRequestFactory setUpGetRequestFactory(Properties properties) { return new GetRequestFactory( new GenericGetQueryCreator(lookupRow), - HttpHeaderUtils.createDefaultHeaderPreprocessor(useRawAuthHeader), + HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(useRawAuthHeader), HttpLookupConfig.builder() .url(getBaseUrl()) .properties(properties) @@ -348,7 +348,7 @@ private BodyBasedRequestFactory setUpBodyRequestFactory( return new BodyBasedRequestFactory( methodName, new GenericJsonQueryCreator(jsonSerializer), - HttpHeaderUtils.createDefaultHeaderPreprocessor(useRawAuthHeader), + HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(useRawAuthHeader), HttpLookupConfig.builder() .url(getBaseUrl()) .properties(properties) diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java index 683f9261..801c0142 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java @@ -1,7 +1,9 @@ package com.getindata.connectors.http.internal.table.lookup; import java.io.File; +import java.time.Duration; import java.util.List; +import java.util.Optional; import java.util.Properties; import com.github.tomakehurst.wiremock.WireMockServer; @@ -32,6 +34,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; +import com.getindata.connectors.http.internal.HeaderPreprocessor; import com.getindata.connectors.http.internal.HttpsConnectionTestBase; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; import com.getindata.connectors.http.internal.table.lookup.querycreators.GenericGetQueryCreator; @@ -96,11 +99,9 @@ public void testHttpsConnectionWithSelfSignedCert() { wireMockServer.start(); setupServerStub(); - setUpPollingClientFactory(wireMockServer.baseUrl()); - properties.setProperty(HttpConnectorConfigConstants.ALLOW_SELF_SIGNED, "true"); - testPollingClientConnection(); + setupAndTestConnection(); } @ParameterizedTest @@ -120,14 +121,11 @@ public void testHttpsConnectionWithAddedCerts(String certName) { wireMockServer.start(); setupServerStub(); - setUpPollingClientFactory(wireMockServer.baseUrl()); - properties.setProperty( HttpConnectorConfigConstants.SERVER_TRUSTED_CERT, trustedCert.getAbsolutePath() ); - - testPollingClientConnection(); + setupAndTestConnection(); } @ParameterizedTest @@ -154,8 +152,6 @@ public void testMTlsConnection(String clientPrivateKeyName) { wireMockServer.start(); setupServerStub(); - setUpPollingClientFactory(wireMockServer.baseUrl()); - properties.setProperty( HttpConnectorConfigConstants.SERVER_TRUSTED_CERT, serverTrustedCert.getAbsolutePath() @@ -168,8 +164,7 @@ public void testMTlsConnection(String clientPrivateKeyName) { HttpConnectorConfigConstants.CLIENT_PRIVATE_KEY, clientPrivateKey.getAbsolutePath() ); - - testPollingClientConnection(); + setupAndTestConnection(); } @Test @@ -198,8 +193,6 @@ public void testMTlsConnectionUsingKeyStore() { wireMockServer.start(); setupServerStub(); - setUpPollingClientFactory(wireMockServer.baseUrl()); - properties.setProperty( HttpConnectorConfigConstants.KEY_STORE_PASSWORD, password @@ -212,7 +205,26 @@ public void testMTlsConnectionUsingKeyStore() { HttpConnectorConfigConstants.SERVER_TRUSTED_CERT, serverTrustedCert.getAbsolutePath() ); + setupAndTestConnection(); + } + + private void setupAndTestConnection() { + // test with basic auth + setupAndTestConnectionWithAuth( + HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor()); + // test with OIDC auth + setupAndTestConnectionWithAuth( + HttpHeaderUtils.createOIDCAuthorizationHeaderPreprocessor( + "http://abc", + "aaa", + Optional.of(Duration.ofSeconds(5)) + ) + ); + } + private void setupAndTestConnectionWithAuth(HeaderPreprocessor headerPreprocessor) { + setUpPollingClientFactory(wireMockServer.baseUrl(), + headerPreprocessor); testPollingClientConnection(); } @@ -299,7 +311,7 @@ private void setupServerStub() { .withBody(readTestFile(SAMPLES_FOLDER + "HttpResult.json")))); } - private void setUpPollingClientFactory(String baseUrl) { + private void setUpPollingClientFactory(String baseUrl, HeaderPreprocessor headerPreprocessor) { LookupRow lookupRow = new LookupRow() .addLookupEntry( @@ -317,7 +329,7 @@ private void setUpPollingClientFactory(String baseUrl) { GetRequestFactory requestFactory = new GetRequestFactory( new GenericGetQueryCreator(lookupRow), - HttpHeaderUtils.createDefaultHeaderPreprocessor(), + headerPreprocessor, HttpLookupConfig.builder() .url(baseUrl + ENDPOINT) .build() diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java index 6d6feac1..f5c9f4bf 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java @@ -53,7 +53,7 @@ public class JavaNetHttpPollingClientTest { @BeforeEach public void setUp() { - this.headerPreprocessor = HttpHeaderUtils.createDefaultHeaderPreprocessor(); + this.headerPreprocessor = HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(); this.options = HttpLookupConfig.builder().url(BASE_URL).build(); } @@ -155,7 +155,7 @@ public void shouldBuildBodyBasedClientUri() { BodyBasedRequestFactory requestFactory = new BodyBasedRequestFactory( "POST", new GenericJsonQueryCreator(jsonSerializer), - HttpHeaderUtils.createDefaultHeaderPreprocessor(), + HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(), HttpLookupConfig.builder() .url(BASE_URL) .build() From c6f7b49acb24ea3c6a8ca71b0dc303badd1b34ef Mon Sep 17 00:00:00 2001 From: amstee Date: Fri, 11 Oct 2024 16:33:25 +0100 Subject: [PATCH 8/8] chore: fix checkstyle --- .../getindata/connectors/http/FailedRequestException.java | 6 ++++-- .../internal/sink/httpclient/JavaNetSinkHttpClient.java | 2 +- .../internal/table/lookup/JavaNetHttpPollingClient.java | 2 +- .../httpclient/JavaNetSinkHttpClientConnectionTest.java | 5 +++-- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/getindata/connectors/http/FailedRequestException.java b/src/main/java/com/getindata/connectors/http/FailedRequestException.java index d48caa3d..55a5bd21 100644 --- a/src/main/java/com/getindata/connectors/http/FailedRequestException.java +++ b/src/main/java/com/getindata/connectors/http/FailedRequestException.java @@ -1,9 +1,11 @@ package com.getindata.connectors.http; /** - * Exception thrown from a {@link HttpPostRequestCallback} when a request should be considered as failed. + * Exception thrown from a {@link HttpPostRequestCallback} + * when a request should be considered as failed. * - *

This exception is caught by the {@link com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient} + *

This exception is caught by the + * {@link com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient} * and {@link com.getindata.connectors.http.internal.table.lookup.JavaNetHttpPollingClient} */ public class FailedRequestException extends Exception { diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java index 017923e7..cd287140 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java @@ -12,8 +12,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.annotation.VisibleForTesting; -import com.getindata.connectors.http.HttpPostRequestCallback; import com.getindata.connectors.http.FailedRequestException; +import com.getindata.connectors.http.HttpPostRequestCallback; import com.getindata.connectors.http.internal.HeaderPreprocessor; import com.getindata.connectors.http.internal.SinkHttpClient; import com.getindata.connectors.http.internal.SinkHttpClientResponse; diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index 8ee91b68..743ce571 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -13,8 +13,8 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.util.StringUtils; -import com.getindata.connectors.http.HttpPostRequestCallback; import com.getindata.connectors.http.FailedRequestException; +import com.getindata.connectors.http.HttpPostRequestCallback; import com.getindata.connectors.http.internal.PollingClient; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker; diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java index a6f1d738..68886369 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java @@ -22,8 +22,8 @@ import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertThrows; -import com.getindata.connectors.http.HttpPostRequestCallback; import com.getindata.connectors.http.FailedRequestException; +import com.getindata.connectors.http.HttpPostRequestCallback; import com.getindata.connectors.http.internal.HttpsConnectionTestBase; import com.getindata.connectors.http.internal.SinkHttpClientResponse; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; @@ -68,7 +68,8 @@ public void testHttpConnection() { } @Test - public void testHttpPostRequestCallbackWithFailedRequestException() throws ExecutionException, InterruptedException { + public void testHttpPostRequestCallbackWithFailedRequestException() + throws ExecutionException, InterruptedException { wireMockServer = new WireMockServer(SERVER_PORT); wireMockServer.start(); mockEndPoint(wireMockServer);