Skip to content

Commit 65d7991

Browse files
sfc-gh-alhuangEduardHantig
authored andcommitted
SNOW-189106 Kafka Connector to Support External OAuth (snowflakedb#671)
1 parent 4e7d928 commit 65d7991

23 files changed

+883
-59
lines changed

.github/scripts/profile.json.gpg

103 Bytes
Binary file not shown.
109 Bytes
Binary file not shown.
107 Bytes
Binary file not shown.

src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnector.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -209,9 +209,13 @@ public Config validate(Map<String, String> connectorConfigs) {
209209
Utils.updateConfigErrorMessage(result, invalidKey, invalidProxyParams.get(invalidKey));
210210
}
211211

212-
// If private key or private key passphrase is provided through file, skip validation
213-
if (connectorConfigs.getOrDefault(Utils.SF_PRIVATE_KEY, "").contains("${file:")
214-
|| connectorConfigs.getOrDefault(Utils.PRIVATE_KEY_PASSPHRASE, "").contains("${file:"))
212+
// If using snowflake_jwt and authentication, and private key or private key passphrase is
213+
// provided through file, skip validation
214+
if (connectorConfigs
215+
.getOrDefault(Utils.SF_AUTHENTICATOR, Utils.SNOWFLAKE_JWT)
216+
.equals(Utils.SNOWFLAKE_JWT)
217+
&& (connectorConfigs.getOrDefault(Utils.SF_PRIVATE_KEY, "").contains("${file:")
218+
|| connectorConfigs.getOrDefault(Utils.PRIVATE_KEY_PASSPHRASE, "").contains("${file:")))
215219
return result;
216220

217221
// We don't validate name, since it is not included in the return value
@@ -244,6 +248,28 @@ public Config validate(Map<String, String> connectorConfigs) {
244248
case "0013":
245249
Utils.updateConfigErrorMessage(result, Utils.SF_PRIVATE_KEY, " must be non-empty");
246250
break;
251+
case "0026":
252+
Utils.updateConfigErrorMessage(
253+
result,
254+
Utils.SF_OAUTH_CLIENT_ID,
255+
" must be non-empty when using oauth authenticator");
256+
break;
257+
case "0027":
258+
Utils.updateConfigErrorMessage(
259+
result,
260+
Utils.SF_OAUTH_CLIENT_SECRET,
261+
" must be non-empty when using oauth authenticator");
262+
break;
263+
case "0028":
264+
Utils.updateConfigErrorMessage(
265+
result,
266+
Utils.SF_OAUTH_REFRESH_TOKEN,
267+
" must be non-empty when using oauth authenticator");
268+
break;
269+
case "0029":
270+
Utils.updateConfigErrorMessage(
271+
result, Utils.SF_AUTHENTICATOR, " is not a valid authenticator");
272+
break;
247273
case "0002":
248274
Utils.updateConfigErrorMessage(
249275
result, Utils.SF_PRIVATE_KEY, " must be a valid PEM RSA private key");

src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ public class SnowflakeSinkConnectorConfig {
7171
static final String SNOWFLAKE_DATABASE = Utils.SF_DATABASE;
7272
static final String SNOWFLAKE_SCHEMA = Utils.SF_SCHEMA;
7373
static final String SNOWFLAKE_PRIVATE_KEY_PASSPHRASE = Utils.PRIVATE_KEY_PASSPHRASE;
74+
static final String AUTHENTICATOR_TYPE = Utils.SF_AUTHENTICATOR;
75+
static final String OAUTH_CLIENT_ID = Utils.SF_OAUTH_CLIENT_ID;
76+
static final String OAUTH_CLIENT_SECRET = Utils.SF_OAUTH_CLIENT_SECRET;
77+
static final String OAUTH_REFRESH_TOKEN = Utils.SF_OAUTH_REFRESH_TOKEN;
7478

7579
// For Snowpipe Streaming client
7680
public static final String SNOWFLAKE_ROLE = Utils.SF_ROLE;
@@ -308,6 +312,46 @@ static ConfigDef newConfigDef() {
308312
6,
309313
ConfigDef.Width.NONE,
310314
SNOWFLAKE_ROLE)
315+
.define(
316+
AUTHENTICATOR_TYPE,
317+
Type.STRING, // TODO: SNOW-889748 change to enum and add validator
318+
Utils.SNOWFLAKE_JWT,
319+
Importance.LOW,
320+
"Authenticator for JDBC and streaming ingest sdk",
321+
SNOWFLAKE_LOGIN_INFO,
322+
7,
323+
ConfigDef.Width.NONE,
324+
AUTHENTICATOR_TYPE)
325+
.define(
326+
OAUTH_CLIENT_ID,
327+
Type.STRING,
328+
"",
329+
Importance.HIGH,
330+
"Client id of target OAuth integration",
331+
SNOWFLAKE_LOGIN_INFO,
332+
8,
333+
ConfigDef.Width.NONE,
334+
OAUTH_CLIENT_ID)
335+
.define(
336+
OAUTH_CLIENT_SECRET,
337+
Type.STRING,
338+
"",
339+
Importance.HIGH,
340+
"Client secret of target OAuth integration",
341+
SNOWFLAKE_LOGIN_INFO,
342+
9,
343+
ConfigDef.Width.NONE,
344+
OAUTH_CLIENT_SECRET)
345+
.define(
346+
OAUTH_REFRESH_TOKEN,
347+
Type.STRING,
348+
"",
349+
Importance.HIGH,
350+
"Refresh token for OAuth",
351+
SNOWFLAKE_LOGIN_INFO,
352+
10,
353+
ConfigDef.Width.NONE,
354+
OAUTH_REFRESH_TOKEN)
311355
// proxy
312356
.define(
313357
JVM_PROXY_HOST,

src/main/java/com/snowflake/kafka/connector/Utils.java

Lines changed: 205 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,46 @@
2323

2424
import com.google.common.collect.ImmutableMap;
2525
import com.snowflake.kafka.connector.internal.BufferThreshold;
26+
import com.snowflake.kafka.connector.internal.InternalUtils;
2627
import com.snowflake.kafka.connector.internal.KCLogger;
28+
import com.snowflake.kafka.connector.internal.OAuthConstants;
2729
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
30+
import com.snowflake.kafka.connector.internal.SnowflakeInternalOperations;
31+
import com.snowflake.kafka.connector.internal.SnowflakeURL;
2832
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
2933
import com.snowflake.kafka.connector.internal.streaming.StreamingUtils;
3034
import java.io.BufferedReader;
3135
import java.io.File;
3236
import java.io.InputStream;
3337
import java.io.InputStreamReader;
38+
import java.io.UnsupportedEncodingException;
3439
import java.net.Authenticator;
3540
import java.net.PasswordAuthentication;
41+
import java.net.URI;
42+
import java.net.URISyntaxException;
3643
import java.net.URL;
3744
import java.net.URLConnection;
45+
import java.net.URLEncoder;
3846
import java.util.Arrays;
47+
import java.util.Base64;
3948
import java.util.HashMap;
4049
import java.util.Map;
4150
import java.util.Objects;
4251
import java.util.Random;
4352
import java.util.regex.Matcher;
4453
import java.util.regex.Pattern;
54+
import java.util.stream.Collectors;
55+
import net.snowflake.client.jdbc.internal.apache.http.HttpHeaders;
56+
import net.snowflake.client.jdbc.internal.apache.http.client.methods.CloseableHttpResponse;
57+
import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpPost;
58+
import net.snowflake.client.jdbc.internal.apache.http.client.utils.URIBuilder;
59+
import net.snowflake.client.jdbc.internal.apache.http.entity.ContentType;
60+
import net.snowflake.client.jdbc.internal.apache.http.entity.StringEntity;
61+
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
62+
import net.snowflake.client.jdbc.internal.apache.http.impl.client.HttpClientBuilder;
63+
import net.snowflake.client.jdbc.internal.apache.http.util.EntityUtils;
64+
import net.snowflake.client.jdbc.internal.google.gson.JsonObject;
65+
import net.snowflake.client.jdbc.internal.google.gson.JsonParser;
4566
import org.apache.kafka.common.config.Config;
4667
import org.apache.kafka.common.config.ConfigException;
4768
import org.apache.kafka.common.config.ConfigValue;
@@ -62,6 +83,15 @@ public class Utils {
6283
public static final String SF_SSL = "sfssl"; // for test only
6384
public static final String SF_WAREHOUSE = "sfwarehouse"; // for test only
6485
public static final String PRIVATE_KEY_PASSPHRASE = "snowflake.private.key" + ".passphrase";
86+
public static final String SF_AUTHENTICATOR =
87+
"snowflake.authenticator"; // TODO: SNOW-889748 change to enum
88+
public static final String SF_OAUTH_CLIENT_ID = "snowflake.oauth.client.id";
89+
public static final String SF_OAUTH_CLIENT_SECRET = "snowflake.oauth.client.secret";
90+
public static final String SF_OAUTH_REFRESH_TOKEN = "snowflake.oauth.refresh.token";
91+
92+
// authenticator type
93+
public static final String SNOWFLAKE_JWT = "snowflake_jwt";
94+
public static final String OAUTH = "oauth";
6595

6696
/**
6797
* This value should be present if ingestion method is {@link
@@ -440,11 +470,54 @@ && parseTopicToTableMap(config.get(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MA
440470
Utils.formatString("{} cannot be empty.", SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA));
441471
}
442472

443-
if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY)) {
444-
invalidConfigParams.put(
445-
SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY,
446-
Utils.formatString(
447-
"{} cannot be empty.", SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY));
473+
switch (config
474+
.getOrDefault(SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE, Utils.SNOWFLAKE_JWT)
475+
.toLowerCase()) {
476+
// TODO: SNOW-889748 change to enum
477+
case Utils.SNOWFLAKE_JWT:
478+
if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY)) {
479+
invalidConfigParams.put(
480+
SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY,
481+
Utils.formatString(
482+
"{} cannot be empty when using {} authenticator.",
483+
SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY,
484+
Utils.SNOWFLAKE_JWT));
485+
}
486+
break;
487+
case Utils.OAUTH:
488+
if (!config.containsKey(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID)) {
489+
invalidConfigParams.put(
490+
SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID,
491+
Utils.formatString(
492+
"{} cannot be empty when using {} authenticator.",
493+
SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID,
494+
Utils.OAUTH));
495+
}
496+
if (!config.containsKey(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET)) {
497+
invalidConfigParams.put(
498+
SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET,
499+
Utils.formatString(
500+
"{} cannot be empty when using {} authenticator.",
501+
SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET,
502+
Utils.OAUTH));
503+
}
504+
if (!config.containsKey(SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN)) {
505+
invalidConfigParams.put(
506+
SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN,
507+
Utils.formatString(
508+
"{} cannot be empty when using {} authenticator.",
509+
SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN,
510+
Utils.OAUTH));
511+
}
512+
break;
513+
default:
514+
invalidConfigParams.put(
515+
SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE,
516+
Utils.formatString(
517+
"{} should be one of {} or {}.",
518+
SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE,
519+
Utils.SNOWFLAKE_JWT,
520+
Utils.OAUTH));
448521
}
449522

450523
if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_USER)) {
@@ -719,6 +792,133 @@ public static String formatString(String format, Object... vars) {
719792
return format;
720793
}
721794

795+
/**
796+
* Get OAuth access token given refresh token
797+
*
798+
* @param url OAuth server url
799+
* @param clientId OAuth clientId
800+
* @param clientSecret OAuth clientSecret
801+
* @param refreshToken OAuth refresh token
802+
* @return OAuth access token
803+
*/
804+
public static String getSnowflakeOAuthAccessToken(
805+
SnowflakeURL url, String clientId, String clientSecret, String refreshToken) {
806+
return getSnowflakeOAuthToken(
807+
url,
808+
clientId,
809+
clientSecret,
810+
refreshToken,
811+
OAuthConstants.REFRESH_TOKEN,
812+
OAuthConstants.REFRESH_TOKEN,
813+
OAuthConstants.ACCESS_TOKEN);
814+
}
815+
816+
/**
817+
* Get OAuth token given integration info <a
818+
* href="https://docs.snowflake.com/en/user-guide/oauth-snowflake-overview">Snowflake OAuth
819+
* Overview</a>
820+
*
821+
* @param url OAuth server url
822+
* @param clientId OAuth clientId
823+
* @param clientSecret OAuth clientSecret
824+
* @param credential OAuth credential, either az code or refresh token
825+
* @param grantType OAuth grant type, either authorization_code or refresh_token
826+
* @param credentialType OAuth credential key, either code or refresh_token
827+
* @param tokenType type of OAuth token to get, either access_token or refresh_token
828+
* @return OAuth token
829+
*/
830+
// TODO: SNOW-895296 Integrate OAuth utils with streaming ingest SDK
831+
public static String getSnowflakeOAuthToken(
832+
SnowflakeURL url,
833+
String clientId,
834+
String clientSecret,
835+
String credential,
836+
String grantType,
837+
String credentialType,
838+
String tokenType) {
839+
Map<String, String> headers = new HashMap<>();
840+
headers.put(HttpHeaders.CONTENT_TYPE, OAuthConstants.OAUTH_CONTENT_TYPE_HEADER);
841+
headers.put(
842+
HttpHeaders.AUTHORIZATION,
843+
OAuthConstants.BASIC_AUTH_HEADER_PREFIX
844+
+ Base64.getEncoder().encodeToString((clientId + ":" + clientSecret).getBytes()));
845+
846+
Map<String, String> payload = new HashMap<>();
847+
payload.put(OAuthConstants.GRANT_TYPE_PARAM, grantType);
848+
payload.put(credentialType, credential);
849+
payload.put(OAuthConstants.REDIRECT_URI, OAuthConstants.DEFAULT_REDIRECT_URI);
850+
851+
// Encode and convert payload into string entity
852+
String payloadString =
853+
payload.entrySet().stream()
854+
.map(
855+
e -> {
856+
try {
857+
return e.getKey() + "=" + URLEncoder.encode(e.getValue(), "UTF-8");
858+
} catch (UnsupportedEncodingException ex) {
859+
throw SnowflakeErrors.ERROR_1004.getException(ex);
860+
}
861+
})
862+
.collect(Collectors.joining("&"));
863+
final StringEntity entity =
864+
new StringEntity(payloadString, ContentType.APPLICATION_FORM_URLENCODED);
865+
866+
HttpPost post =
867+
buildOAuthHttpPostRequest(url, OAuthConstants.TOKEN_REQUEST_ENDPOINT, headers, entity);
868+
869+
// Request access token
870+
CloseableHttpClient client = HttpClientBuilder.create().build();
871+
try {
872+
return InternalUtils.backoffAndRetry(
873+
null,
874+
SnowflakeInternalOperations.FETCH_OAUTH_TOKEN,
875+
() -> {
876+
try (CloseableHttpResponse httpResponse = client.execute(post)) {
877+
String respBodyString = EntityUtils.toString(httpResponse.getEntity());
878+
JsonObject respBody = JsonParser.parseString(respBodyString).getAsJsonObject();
879+
// Trim surrounding quotation marks
880+
return respBody.get(tokenType).toString().replaceAll("^\"|\"$", "");
881+
} catch (Exception e) {
882+
throw SnowflakeErrors.ERROR_1004.getException(
883+
"Failed to get Oauth access token after retries");
884+
}
885+
})
886+
.toString();
887+
} catch (Exception e) {
888+
throw SnowflakeErrors.ERROR_1004.getException(e);
889+
}
890+
}
891+
892+
/**
893+
* Build OAuth http post request base on headers and payload
894+
*
895+
* @param url target url
896+
* @param headers headers key value pairs
897+
* @param entity payload entity
898+
* @return HttpPost request for OAuth
899+
*/
900+
public static HttpPost buildOAuthHttpPostRequest(
901+
SnowflakeURL url, String path, Map<String, String> headers, StringEntity entity) {
902+
// Build post request
903+
URI uri;
904+
try {
905+
uri =
906+
new URIBuilder().setHost(url.toString()).setScheme(url.getScheme()).setPath(path).build();
907+
} catch (URISyntaxException e) {
908+
throw SnowflakeErrors.ERROR_1004.getException(e);
909+
}
910+
911+
// Add headers
912+
HttpPost post = new HttpPost(uri);
913+
for (Map.Entry<String, String> e : headers.entrySet()) {
914+
post.addHeader(e.getKey(), e.getValue());
915+
}
916+
917+
post.setEntity(entity);
918+
919+
return post;
920+
}
921+
722922
/**
723923
* Get the message and cause of a missing exception, handling the null or empty cases of each
724924
*

0 commit comments

Comments
 (0)