Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-189106 Kafka Connector to Support External OAuth #671

Merged
merged 15 commits into from
Aug 18, 2023
Binary file modified .github/scripts/profile.json.gpg
Binary file not shown.
Binary file modified .github/scripts/profile_azure.json.gpg
Binary file not shown.
Binary file modified .github/scripts/profile_gcs.json.gpg
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,13 @@ public Config validate(Map<String, String> connectorConfigs) {
Utils.updateConfigErrorMessage(result, invalidKey, invalidProxyParams.get(invalidKey));
}

// If private key or private key passphrase is provided through file, skip validation
if (connectorConfigs.getOrDefault(Utils.SF_PRIVATE_KEY, "").contains("${file:")
|| connectorConfigs.getOrDefault(Utils.PRIVATE_KEY_PASSPHRASE, "").contains("${file:"))
// If using snowflake_jwt and authentication, and private key or private key passphrase is
// provided through file, skip validation
if (connectorConfigs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we're adding a new option, could you ask @sfc-gh-xhuang or @sfc-gh-lema to update our online doc?

.getOrDefault(Utils.SF_AUTHENTICATOR, Utils.SNOWFLAKE_JWT)
.equals(Utils.SNOWFLAKE_JWT)
&& (connectorConfigs.getOrDefault(Utils.SF_PRIVATE_KEY, "").contains("${file:")
|| connectorConfigs.getOrDefault(Utils.PRIVATE_KEY_PASSPHRASE, "").contains("${file:")))
return result;

// We don't validate name, since it is not included in the return value
Expand Down Expand Up @@ -244,6 +248,28 @@ public Config validate(Map<String, String> connectorConfigs) {
case "0013":
Utils.updateConfigErrorMessage(result, Utils.SF_PRIVATE_KEY, " must be non-empty");
break;
case "0026":
Utils.updateConfigErrorMessage(
result,
Utils.SF_OAUTH_CLIENT_ID,
" must be non-empty when using oauth authenticator");
break;
case "0027":
Utils.updateConfigErrorMessage(
result,
Utils.SF_OAUTH_CLIENT_SECRET,
" must be non-empty when using oauth authenticator");
break;
case "0028":
Utils.updateConfigErrorMessage(
result,
Utils.SF_OAUTH_REFRESH_TOKEN,
" must be non-empty when using oauth authenticator");
break;
case "0029":
Utils.updateConfigErrorMessage(
result, Utils.SF_AUTHENTICATOR, " is not a valid authenticator");
break;
case "0002":
Utils.updateConfigErrorMessage(
result, Utils.SF_PRIVATE_KEY, " must be a valid PEM RSA private key");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public class SnowflakeSinkConnectorConfig {
static final String SNOWFLAKE_DATABASE = Utils.SF_DATABASE;
static final String SNOWFLAKE_SCHEMA = Utils.SF_SCHEMA;
static final String SNOWFLAKE_PRIVATE_KEY_PASSPHRASE = Utils.PRIVATE_KEY_PASSPHRASE;
static final String AUTHENTICATOR_TYPE = Utils.SF_AUTHENTICATOR;
static final String OAUTH_CLIENT_ID = Utils.SF_OAUTH_CLIENT_ID;
static final String OAUTH_CLIENT_SECRET = Utils.SF_OAUTH_CLIENT_SECRET;
static final String OAUTH_REFRESH_TOKEN = Utils.SF_OAUTH_REFRESH_TOKEN;
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved

// For Snowpipe Streaming client
public static final String SNOWFLAKE_ROLE = Utils.SF_ROLE;
Expand Down Expand Up @@ -301,6 +305,46 @@ static ConfigDef newConfigDef() {
6,
ConfigDef.Width.NONE,
SNOWFLAKE_ROLE)
.define(
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
AUTHENTICATOR_TYPE,
Type.STRING, // TODO: SNOW-889748 change to enum and add validator
Utils.SNOWFLAKE_JWT,
Importance.LOW,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason this is low and rest are high importance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this is an optional parameter when we use jwt. While client_id , client_secret and refresh_token is required when using oauth. Do you think we should make all of the low?

"Authenticator for JDBC and streaming ingest sdk",
SNOWFLAKE_LOGIN_INFO,
7,
ConfigDef.Width.NONE,
AUTHENTICATOR_TYPE)
.define(
OAUTH_CLIENT_ID,
Type.STRING,
"",
Importance.HIGH,
"Client id of target OAuth integration",
SNOWFLAKE_LOGIN_INFO,
8,
ConfigDef.Width.NONE,
OAUTH_CLIENT_ID)
.define(
OAUTH_CLIENT_SECRET,
Type.STRING,
"",
Importance.HIGH,
"Client secret of target OAuth integration",
SNOWFLAKE_LOGIN_INFO,
9,
ConfigDef.Width.NONE,
OAUTH_CLIENT_SECRET)
.define(
OAUTH_REFRESH_TOKEN,
Type.STRING,
"",
Importance.HIGH,
"Refresh token for OAuth",
SNOWFLAKE_LOGIN_INFO,
10,
ConfigDef.Width.NONE,
OAUTH_REFRESH_TOKEN)
// proxy
.define(
JVM_PROXY_HOST,
Expand Down
210 changes: 205 additions & 5 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,46 @@

import com.google.common.collect.ImmutableMap;
import com.snowflake.kafka.connector.internal.BufferThreshold;
import com.snowflake.kafka.connector.internal.InternalUtils;
import com.snowflake.kafka.connector.internal.KCLogger;
import com.snowflake.kafka.connector.internal.OAuthConstants;
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
import com.snowflake.kafka.connector.internal.SnowflakeInternalOperations;
import com.snowflake.kafka.connector.internal.SnowflakeURL;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.streaming.StreamingUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.Authenticator;
import java.net.PasswordAuthentication;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLEncoder;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import net.snowflake.client.jdbc.internal.apache.http.HttpHeaders;
import net.snowflake.client.jdbc.internal.apache.http.client.methods.CloseableHttpResponse;
import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpPost;
import net.snowflake.client.jdbc.internal.apache.http.client.utils.URIBuilder;
import net.snowflake.client.jdbc.internal.apache.http.entity.ContentType;
import net.snowflake.client.jdbc.internal.apache.http.entity.StringEntity;
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
import net.snowflake.client.jdbc.internal.apache.http.impl.client.HttpClientBuilder;
import net.snowflake.client.jdbc.internal.apache.http.util.EntityUtils;
import net.snowflake.client.jdbc.internal.google.gson.JsonObject;
import net.snowflake.client.jdbc.internal.google.gson.JsonParser;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
Expand All @@ -62,6 +83,15 @@ public class Utils {
public static final String SF_SSL = "sfssl"; // for test only
public static final String SF_WAREHOUSE = "sfwarehouse"; // for test only
public static final String PRIVATE_KEY_PASSPHRASE = "snowflake.private.key" + ".passphrase";
public static final String SF_AUTHENTICATOR =
"snowflake.authenticator"; // TODO: SNOW-889748 change to enum
public static final String SF_OAUTH_CLIENT_ID = "snowflake.oauth.client.id";
public static final String SF_OAUTH_CLIENT_SECRET = "snowflake.oauth.client.secret";
public static final String SF_OAUTH_REFRESH_TOKEN = "snowflake.oauth.refresh.token";

// authenticator type
public static final String SNOWFLAKE_JWT = "snowflake_jwt";
public static final String OAUTH = "oauth";

/**
* This value should be present if ingestion method is {@link
Expand Down Expand Up @@ -440,11 +470,54 @@ && parseTopicToTableMap(config.get(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MA
Utils.formatString("{} cannot be empty.", SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA));
}

if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY)) {
invalidConfigParams.put(
SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY,
Utils.formatString(
"{} cannot be empty.", SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY));
switch (config
.getOrDefault(SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE, Utils.SNOWFLAKE_JWT)
.toLowerCase()) {
// TODO: SNOW-889748 change to enum
case Utils.SNOWFLAKE_JWT:
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY)) {
invalidConfigParams.put(
SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY,
Utils.formatString(
"{} cannot be empty when using {} authenticator.",
SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY,
Utils.SNOWFLAKE_JWT));
}
break;
case Utils.OAUTH:
if (!config.containsKey(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID)) {
invalidConfigParams.put(
SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID,
Utils.formatString(
"{} cannot be empty when using {} authenticator.",
SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID,
Utils.OAUTH));
}
if (!config.containsKey(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET)) {
invalidConfigParams.put(
SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET,
Utils.formatString(
"{} cannot be empty when using {} authenticator.",
SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET,
Utils.OAUTH));
}
if (!config.containsKey(SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN)) {
invalidConfigParams.put(
SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN,
Utils.formatString(
"{} cannot be empty when using {} authenticator.",
SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN,
Utils.OAUTH));
}
break;
default:
invalidConfigParams.put(
SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE,
Utils.formatString(
"{} should be one of {} or {}.",
SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE,
Utils.SNOWFLAKE_JWT,
Utils.OAUTH));
}

if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_USER)) {
Expand Down Expand Up @@ -704,6 +777,133 @@ public static String formatString(String format, Object... vars) {
return format;
}

/**
* Get OAuth access token given refresh token
*
* @param url OAuth server url
* @param clientId OAuth clientId
* @param clientSecret OAuth clientSecret
* @param refreshToken OAuth refresh token
* @return OAuth access token
*/
public static String getSnowflakeOAuthAccessToken(
SnowflakeURL url, String clientId, String clientSecret, String refreshToken) {
return getSnowflakeOAuthToken(
url,
clientId,
clientSecret,
refreshToken,
OAuthConstants.REFRESH_TOKEN,
OAuthConstants.REFRESH_TOKEN,
OAuthConstants.ACCESS_TOKEN);
}

/**
* Get OAuth token given integration info <a
* href="https://docs.snowflake.com/en/user-guide/oauth-snowflake-overview">Snowflake OAuth
* Overview</a>
*
* @param url OAuth server url
* @param clientId OAuth clientId
* @param clientSecret OAuth clientSecret
* @param credential OAuth credential, either az code or refresh token
* @param grantType OAuth grant type, either authorization_code or refresh_token
* @param credentialType OAuth credential key, either code or refresh_token
* @param tokenType type of OAuth token to get, either access_token or refresh_token
* @return OAuth token
*/
// TODO: SNOW-895296 Integrate OAuth utils with streaming ingest SDK
public static String getSnowflakeOAuthToken(
sfc-gh-japatel marked this conversation as resolved.
Show resolved Hide resolved
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
SnowflakeURL url,
String clientId,
String clientSecret,
String credential,
String grantType,
String credentialType,
String tokenType) {
Map<String, String> headers = new HashMap<>();
headers.put(HttpHeaders.CONTENT_TYPE, OAuthConstants.OAUTH_CONTENT_TYPE_HEADER);
headers.put(
HttpHeaders.AUTHORIZATION,
OAuthConstants.BASIC_AUTH_HEADER_PREFIX
+ Base64.getEncoder().encodeToString((clientId + ":" + clientSecret).getBytes()));

Map<String, String> payload = new HashMap<>();
payload.put(OAuthConstants.GRANT_TYPE_PARAM, grantType);
payload.put(credentialType, credential);
payload.put(OAuthConstants.REDIRECT_URI, OAuthConstants.DEFAULT_REDIRECT_URI);

// Encode and convert payload into string entity
String payloadString =
payload.entrySet().stream()
.map(
e -> {
try {
return e.getKey() + "=" + URLEncoder.encode(e.getValue(), "UTF-8");
} catch (UnsupportedEncodingException ex) {
throw SnowflakeErrors.ERROR_1004.getException(ex);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not covered by code coverage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could only happens if the encoder name is not UTF-8. Ref

}
})
.collect(Collectors.joining("&"));
final StringEntity entity =
new StringEntity(payloadString, ContentType.APPLICATION_FORM_URLENCODED);

HttpPost post =
buildOAuthHttpPostRequest(url, OAuthConstants.TOKEN_REQUEST_ENDPOINT, headers, entity);

// Request access token
CloseableHttpClient client = HttpClientBuilder.create().build();
try {
return InternalUtils.backoffAndRetry(
null,
SnowflakeInternalOperations.FETCH_OAUTH_TOKEN,
() -> {
try (CloseableHttpResponse httpResponse = client.execute(post)) {
String respBodyString = EntityUtils.toString(httpResponse.getEntity());
JsonObject respBody = JsonParser.parseString(respBodyString).getAsJsonObject();
// Trim surrounding quotation marks
return respBody.get(tokenType).toString().replaceAll("^\"|\"$", "");
} catch (Exception e) {
throw SnowflakeErrors.ERROR_1004.getException(
"Failed to get Oauth access token after retries");
}
})
.toString();
} catch (Exception e) {
throw SnowflakeErrors.ERROR_1004.getException(e);
}
}

/**
* Build OAuth http post request base on headers and payload
*
* @param url target url
* @param headers headers key value pairs
* @param entity payload entity
* @return HttpPost request for OAuth
*/
public static HttpPost buildOAuthHttpPostRequest(
SnowflakeURL url, String path, Map<String, String> headers, StringEntity entity) {
// Build post request
URI uri;
try {
uri =
new URIBuilder().setHost(url.toString()).setScheme(url.getScheme()).setPath(path).build();
} catch (URISyntaxException e) {
throw SnowflakeErrors.ERROR_1004.getException(e);
sfc-gh-rcheng marked this conversation as resolved.
Show resolved Hide resolved
}

// Add headers
HttpPost post = new HttpPost(uri);
for (Map.Entry<String, String> e : headers.entrySet()) {
post.addHeader(e.getKey(), e.getValue());
}

post.setEntity(entity);

return post;
}

/**
* Get the message and cause of a missing exception, handling the null or empty cases of each
*
Expand Down
Loading
Loading