Skip to content

Commit 12945d9

Browse files
authored
HTTP-145 SQL explain drives rest flow to OIDC endpoint (#146)
Signed-off-by: davidradl <[email protected]>
1 parent d5429f0 commit 12945d9

File tree

8 files changed

+258
-32
lines changed

8 files changed

+258
-32
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## [Unreleased]
44

5+
- OIDC token request to not flow during explain
6+
57
## [0.18.0] - 2025-01-15
68

79
- Ignore Eclipse files in .gitignore

src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java

+57-3
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@
22

33
import java.io.IOException;
44
import java.net.http.HttpClient;
5+
import java.net.http.HttpRequest;
56
import java.net.http.HttpResponse;
67
import java.net.http.HttpResponse.BodyHandlers;
78
import java.util.ArrayList;
89
import java.util.Collection;
910
import java.util.Collections;
11+
import java.util.HashMap;
1012
import java.util.List;
13+
import java.util.Map;
1114
import java.util.Optional;
1215

1316
import com.fasterxml.jackson.core.type.TypeReference;
@@ -17,16 +20,20 @@
1720
import lombok.extern.slf4j.Slf4j;
1821
import org.apache.flink.annotation.VisibleForTesting;
1922
import org.apache.flink.api.common.serialization.DeserializationSchema;
23+
import org.apache.flink.configuration.ReadableConfig;
2024
import org.apache.flink.table.data.RowData;
2125
import org.apache.flink.util.StringUtils;
2226

2327
import com.getindata.connectors.http.HttpPostRequestCallback;
28+
import com.getindata.connectors.http.internal.HeaderPreprocessor;
2429
import com.getindata.connectors.http.internal.PollingClient;
2530
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
2631
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker;
2732
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig;
2833
import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker;
34+
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
2935
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.RESULT_TYPE;
36+
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST;
3037

3138
/**
3239
* An implementation of {@link PollingClient} that uses Java 11's {@link HttpClient}.
@@ -96,12 +103,59 @@ private Collection<RowData> queryAndProcess(RowData lookupData) throws Exception
96103

97104
HttpLookupSourceRequestEntry request = requestFactory.buildLookupRequest(lookupData);
98105
HttpResponse<String> response = httpClient.send(
99-
request.getHttpRequest(),
100-
BodyHandlers.ofString()
101-
);
106+
updateHttpRequestIfRequired(request,
107+
HttpHeaderUtils.createOIDCHeaderPreprocessor(options.getReadableConfig())),
108+
BodyHandlers.ofString());
102109
return processHttpResponse(response, request);
103110
}
104111

112+
/**
113+
* If using OIDC, update the http request using the oidc header pre processor to supply the
114+
* authentication header, with a short lived bearer token.
115+
* @param request http reauest to amend
116+
* @param oidcHeaderPreProcessor OIDC header pre processor
117+
* @return http request, which for OIDC will have the bearer token as the authentication header
118+
*/
119+
protected HttpRequest updateHttpRequestIfRequired(HttpLookupSourceRequestEntry request,
120+
HeaderPreprocessor oidcHeaderPreProcessor) {
121+
// We need to check the config and if required amend the value of the
122+
// authentication header to the short lived bearer token
123+
HttpRequest httpRequest = request.getHttpRequest();
124+
ReadableConfig readableConfig = options.getReadableConfig();
125+
if (oidcHeaderPreProcessor != null) {
126+
HttpRequest.Builder builder = HttpRequest.newBuilder()
127+
.uri(httpRequest.uri());
128+
if (httpRequest.timeout().isPresent()) {
129+
builder.timeout(httpRequest.timeout().get());
130+
}
131+
if (httpRequest.method().endsWith("GET")) {
132+
builder.GET();
133+
} else {
134+
builder.method(httpRequest.method(), httpRequest.bodyPublisher().get());
135+
}
136+
Map<String, String> headerMap = new HashMap<>();
137+
if (httpRequest.headers() != null && !httpRequest.headers().map().isEmpty()) {
138+
for (Map.Entry<String, List<String>> header
139+
:httpRequest.headers().map().entrySet()) {
140+
List<String> values = header.getValue();
141+
if (values.size() == 1) {
142+
headerMap.put(header.getKey(), header.getValue().get(0));
143+
}
144+
// the existing design does not handle multiple values for headers
145+
}
146+
}
147+
Optional<String> oidcTokenRequest = readableConfig
148+
.getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST);
149+
String bearerToken = oidcHeaderPreProcessor.preprocessValueForHeader(
150+
HttpHeaderUtils.AUTHORIZATION, oidcTokenRequest.get());
151+
headerMap.put(HttpHeaderUtils.AUTHORIZATION, bearerToken);
152+
String[] headerAndValueArray = HttpHeaderUtils.toHeaderAndValueArray(headerMap);
153+
builder.headers(headerAndValueArray);
154+
httpRequest = builder.build();
155+
}
156+
return httpRequest;
157+
}
158+
105159
private Collection<RowData> processHttpResponse(
106160
HttpResponse<String> response,
107161
HttpLookupSourceRequestEntry request) throws IOException {

src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java

+4-18
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
import java.net.http.HttpRequest.Builder;
55
import java.util.Arrays;
66
import java.util.Map;
7-
import java.util.Optional;
8-
import java.util.Properties;
97
import java.util.stream.Collectors;
108

119
import lombok.extern.slf4j.Slf4j;
@@ -18,7 +16,6 @@
1816
import com.getindata.connectors.http.internal.HeaderPreprocessor;
1917
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
2018
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
21-
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL;
2219

2320
/**
2421
* Base class for {@link HttpRequest} factories.
@@ -51,21 +48,10 @@ public RequestFactoryBase(
5148
this.baseUrl = options.getUrl();
5249
this.lookupQueryCreator = lookupQueryCreator;
5350
this.options = options;
54-
55-
Properties properties = options.getProperties();
56-
/*
57-
* For OIDC, the preprocessor will fully specify the Authentication header value,
58-
* as a bearer token. But the preprocessors only amend existing headers, so in this case
59-
* if there is no existing authorization header then we add a dummy one to the properties,
60-
* so the preprocessor will be driven and will provide the value.
61-
*/
62-
Optional<String> oidcAuthURL = options.getReadableConfig()
63-
.getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL);
64-
if (oidcAuthURL.isPresent()) {
65-
properties.put(HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_PREFIX
66-
+ HttpHeaderUtils.AUTHORIZATION, "Dummy");
67-
}
68-
51+
// note that the OIDC header preprocessor is not setup here, because it
52+
// issues a network call to the authentication server. This code is driven for
53+
// explain select. Explain should not issue network calls.
54+
// We setup the OIDC authentication header at lookup query time.
6955
var headerMap = HttpHeaderUtils
7056
.prepareHeaderMap(
7157
HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_PREFIX,

src/main/java/com/getindata/connectors/http/internal/utils/HttpHeaderUtils.java

+11-10
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,17 @@ AUTHORIZATION, new OIDCAuthHeaderValuePreprocessor(oidcAuthURL,
105105
}
106106

107107
public static HeaderPreprocessor createHeaderPreprocessor(ReadableConfig readableConfig) {
108-
HeaderPreprocessor headerPreprocessor;
108+
boolean useRawAuthHeader =
109+
readableConfig.get(HttpLookupConnectorOptions.USE_RAW_AUTH_HEADER);
110+
HeaderPreprocessor headerPreprocessor =
111+
HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(
112+
useRawAuthHeader);
113+
log.info("created HeaderPreprocessor for basic useRawAuthHeader=" + useRawAuthHeader);
114+
log.info("returning HeaderPreprocessor " + headerPreprocessor);
115+
return headerPreprocessor;
116+
}
117+
public static HeaderPreprocessor createOIDCHeaderPreprocessor(ReadableConfig readableConfig) {
118+
HeaderPreprocessor headerPreprocessor = null;
109119
Optional<String> oidcAuthURL = readableConfig
110120
.getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL);
111121

@@ -121,16 +131,7 @@ public static HeaderPreprocessor createHeaderPreprocessor(ReadableConfig readabl
121131
+ " for OIDC oidcAuthURL=" + oidcAuthURL
122132
+ ", oidcTokenRequest=" + oidcTokenRequest
123133
+ ", oidcExpiryReduction=" + oidcExpiryReduction);
124-
} else {
125-
boolean useRawAuthHeader =
126-
readableConfig.get(HttpLookupConnectorOptions.USE_RAW_AUTH_HEADER);
127-
128-
headerPreprocessor =
129-
HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(
130-
useRawAuthHeader);
131-
log.info("created HeaderPreprocessor for basic useRawAuthHeader=" + useRawAuthHeader);
132134
}
133-
log.info("returning HeaderPreprocessor " + headerPreprocessor);
134135
return headerPreprocessor;
135136
}
136137
}

src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.mockito.junit.jupiter.MockitoExtension;
2323
import static org.assertj.core.api.Assertions.assertThat;
2424

25-
2625
import com.getindata.connectors.http.internal.HeaderPreprocessor;
2726
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
2827
import com.getindata.connectors.http.internal.table.lookup.querycreators.GenericGetQueryCreator;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package com.getindata.connectors.http.internal.table.lookup;
2+
3+
import java.io.File;
4+
import java.net.URI;
5+
import java.net.http.HttpRequest;
6+
import java.time.Duration;
7+
8+
import com.github.tomakehurst.wiremock.WireMockServer;
9+
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
10+
import com.github.tomakehurst.wiremock.stubbing.StubMapping;
11+
import org.apache.flink.api.common.RuntimeExecutionMode;
12+
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
13+
import org.apache.flink.configuration.Configuration;
14+
import org.apache.flink.configuration.ExecutionOptions;
15+
import org.apache.flink.streaming.api.CheckpointingMode;
16+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
17+
import org.junit.jupiter.api.AfterEach;
18+
import org.junit.jupiter.api.BeforeEach;
19+
import org.junit.jupiter.api.Test;
20+
import static com.github.tomakehurst.wiremock.client.WireMock.*;
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
23+
import com.getindata.connectors.http.internal.HeaderPreprocessor;
24+
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
25+
import static com.getindata.connectors.http.TestHelper.readTestFile;
26+
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.*;
27+
28+
public class JavaNetHttpPollingClientWithWireTest {
29+
private static final String BASE_URL = "http://localhost.com";
30+
31+
private static final String SAMPLES_FOLDER = "/auth/";
32+
private static final int SERVER_PORT = 9090;
33+
34+
private static final int HTTPS_SERVER_PORT = 8443;
35+
36+
private static final String SERVER_KEYSTORE_PATH =
37+
"src/test/resources/security/certs/serverKeyStore.jks";
38+
39+
private static final String SERVER_TRUSTSTORE_PATH =
40+
"src/test/resources/security/certs/serverTrustStore.jks";
41+
42+
private static final String ENDPOINT = "/auth";
43+
private static final String BEARER_REQUEST = "Bearer Dummy";
44+
45+
private WireMockServer wireMockServer;
46+
@SuppressWarnings("unchecked")
47+
@BeforeEach
48+
public void setup() {
49+
50+
File keyStoreFile = new File(SERVER_KEYSTORE_PATH);
51+
File trustStoreFile = new File(SERVER_TRUSTSTORE_PATH);
52+
53+
wireMockServer = new WireMockServer(
54+
WireMockConfiguration.wireMockConfig()
55+
.port(SERVER_PORT)
56+
.httpsPort(HTTPS_SERVER_PORT)
57+
.keystorePath(keyStoreFile.getAbsolutePath())
58+
.keystorePassword("password")
59+
.keyManagerPassword("password")
60+
.needClientAuth(true)
61+
.trustStorePath(trustStoreFile.getAbsolutePath())
62+
.trustStorePassword("password")
63+
.extensions(JsonTransform.class)
64+
);
65+
wireMockServer.start();
66+
67+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
68+
env.setRestartStrategy(RestartStrategies.noRestart());
69+
Configuration config = new Configuration();
70+
config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
71+
env.configure(config, getClass().getClassLoader());
72+
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
73+
}
74+
75+
@AfterEach
76+
public void tearDown() {
77+
wireMockServer.stop();
78+
}
79+
80+
81+
@Test
82+
public void shouldUpdateHttpRequestIfRequiredGet() {
83+
HttpRequest httpRequest = HttpRequest.newBuilder()
84+
.GET()
85+
.uri(URI.create(BASE_URL))
86+
.timeout(Duration.ofSeconds(1))
87+
.setHeader("Origin","*")
88+
.setHeader("X-Content-Type-Options","nosniff")
89+
.setHeader("Content-Type","application/json")
90+
.build();
91+
shouldUpdateHttpRequestIfRequired(httpRequest);
92+
}
93+
@Test
94+
public void shouldUpdateHttpRequestIfRequiredPut() {
95+
HttpRequest httpRequest = HttpRequest.newBuilder()
96+
.PUT( HttpRequest.BodyPublishers.ofString("foo"))
97+
.uri(URI.create(BASE_URL))
98+
.timeout(Duration.ofSeconds(1))
99+
.setHeader("Origin","*")
100+
.setHeader("X-Content-Type-Options","nosniff")
101+
.setHeader("Content-Type","application/json")
102+
.build();
103+
shouldUpdateHttpRequestIfRequired(httpRequest);
104+
}
105+
private void shouldUpdateHttpRequestIfRequired(HttpRequest httpRequest) {
106+
setUpServerBodyStub();
107+
JavaNetHttpPollingClient client = new JavaNetHttpPollingClient(null,
108+
null,
109+
HttpLookupConfig.builder().url(BASE_URL).build(),
110+
null);
111+
LookupQueryInfo lookupQueryInfo = null;
112+
HttpLookupSourceRequestEntry request =
113+
new HttpLookupSourceRequestEntry(httpRequest, lookupQueryInfo);
114+
115+
Configuration configuration = new Configuration();
116+
HeaderPreprocessor oidcHeaderPreProcessor =
117+
HttpHeaderUtils.createOIDCHeaderPreprocessor(configuration);
118+
HttpRequest newHttpRequest = client.updateHttpRequestIfRequired(request,
119+
oidcHeaderPreProcessor);
120+
assertThat(httpRequest).isEqualTo(newHttpRequest);
121+
configuration.setString(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL.key(),"http://localhost:9090/auth");
122+
configuration.setString(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST, BEARER_REQUEST);
123+
configuration.set(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION,
124+
Duration.ofSeconds(1L));
125+
client = new JavaNetHttpPollingClient(null,
126+
null,
127+
HttpLookupConfig.builder().url(BASE_URL).readableConfig(configuration).build(),
128+
null);
129+
oidcHeaderPreProcessor =
130+
HttpHeaderUtils.createOIDCHeaderPreprocessor(configuration);
131+
// change oidcHeaderPreProcessor to use the mock http client for the authentication flow
132+
newHttpRequest = client.updateHttpRequestIfRequired(request,
133+
oidcHeaderPreProcessor);
134+
assertThat(httpRequest).isNotEqualTo(newHttpRequest);
135+
assertThat(httpRequest.headers().map().keySet().size()).isEqualTo(3);
136+
assertThat(newHttpRequest.headers().map().keySet().size()).isEqualTo(4);
137+
assertThat(httpRequest.headers().map().get("Content-Type"))
138+
.isEqualTo(newHttpRequest.headers().map().get("Content-Type"));
139+
}
140+
141+
private StubMapping setUpServerBodyStub() {
142+
return wireMockServer.stubFor(
143+
post(urlEqualTo(ENDPOINT))
144+
.withHeader("Content-Type", equalTo("application/x-www-form-urlencoded"))
145+
.withRequestBody(equalTo(BEARER_REQUEST))
146+
.willReturn(
147+
aResponse()
148+
.withStatus(200)
149+
.withBody(readTestFile(SAMPLES_FOLDER + "AuthResult.json"))
150+
)
151+
);
152+
}
153+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.getindata.connectors.http.internal.utils;
2+
import java.time.Duration;
3+
4+
import org.apache.flink.configuration.Configuration;
5+
import org.junit.jupiter.api.Test;
6+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
7+
8+
import com.getindata.connectors.http.internal.HeaderPreprocessor;
9+
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.*;
10+
11+
12+
13+
public class HttpHeaderUtilsTest {
14+
@Test
15+
void shouldCreateOIDCHeaderPreprocessorTest() {
16+
Configuration configuration = new Configuration();
17+
HeaderPreprocessor headerPreprocessor
18+
= HttpHeaderUtils.createOIDCHeaderPreprocessor(configuration);
19+
assertThat(headerPreprocessor).isNull();
20+
configuration.setString(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL.key(), "http://aaa");
21+
configuration.setString(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST.key(), "ccc");
22+
configuration.set(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION, Duration.ofSeconds(1));
23+
headerPreprocessor
24+
= HttpHeaderUtils.createOIDCHeaderPreprocessor(configuration);
25+
assertThat(headerPreprocessor).isNotNull();
26+
}
27+
}
+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"access_token": "test",
3+
"expires_in": 1
4+
}

0 commit comments

Comments
 (0)