-
Notifications
You must be signed in to change notification settings - Fork 98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-189106 Kafka Connector to Support External OAuth #671
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some thoughts and requested some test changes. Don't have full context on this change, so lets wait for a review from toby/jay
src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnector.java
Outdated
Show resolved
Hide resolved
src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskForStreamingIT.java
Outdated
Show resolved
Hide resolved
src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java
Outdated
Show resolved
Hide resolved
Codecov Report
@@ Coverage Diff @@
## master #671 +/- ##
==========================================
+ Coverage 87.88% 87.96% +0.07%
==========================================
Files 50 50
Lines 4144 4261 +117
Branches 449 468 +19
==========================================
+ Hits 3642 3748 +106
- Misses 332 342 +10
- Partials 170 171 +1
... and 3 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
pom.xml
Outdated
@@ -156,6 +156,7 @@ | |||
<version>2.22.0</version> | |||
<configuration> | |||
<skipTests>${skipUnitTests}</skipTests> | |||
<trimStackTrace>false</trimStackTrace> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be deleted afterwards.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before you push this pr or in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll deleted this before pushing this PR.
try { | ||
return e.getKey() + "=" + URLEncoder.encode(e.getValue(), "UTF-8"); | ||
} catch (UnsupportedEncodingException ex) { | ||
throw SnowflakeErrors.ERROR_1004.getException(ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not covered by code coverage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could only happens if the encoder name is not UTF-8
. Ref
return respBody.get(tokenType).toString().replaceAll("^\"|\"$", ""); | ||
} | ||
} | ||
} catch (Exception ignored) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this being ignored?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it could be ignored and retries until the maximum retries count is reach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets add a logger.warn here and print out message from exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sfc-gh-japatel @sfc-gh-rcheng can you be the primary reviewers for this? Thanks!
public static final String SF_AUTHENTICATOR = "snowflake.authenticator"; | ||
public static final String SF_OAUTH_CLIENT_ID = "snowflake.oauth.client.id"; | ||
public static final String SF_OAUTH_CLIENT_SECRET = "snowflake.oauth.client.secret"; | ||
public static final String SF_OAUTH_REFRESH_TOKEN = "snowflake.refresh.token"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thinking out loud: do we need oauth here as well to be consistent? since it only applies to oauth
snowflake.oauth.refresh.token
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgot to add oauth here. Added oauth prefix since cliend_id
might seems misleading, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense to me
src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java
Show resolved
Hide resolved
AUTHENTICATOR, | ||
Type.STRING, | ||
"", | ||
Importance.LOW, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason this is low and rest are high importance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought this is an optional parameter when we use jwt. While client_id
, client_secret
and refresh_token
is required when using oauth. Do you think we should make all of the low?
src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java
Outdated
Show resolved
Hide resolved
|
||
// Exponential backoff retries | ||
try { | ||
Thread.sleep((1L << retries) * 1000L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is the retry logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a retry logic that can be reused in KC?
src/main/java/com/snowflake/kafka/connector/internal/InternalUtils.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/OAuthConstants.java
Outdated
Show resolved
Hide resolved
@@ -7,6 +7,7 @@ | |||
import static com.snowflake.kafka.connector.internal.TestUtils.getConfig; | |||
import static org.junit.Assert.assertEquals; | |||
|
|||
import com.snowflake.kafka.connector.internal.OAuthConstants; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall I am seeing great value addition for all types of tests.
I dont see End to End tests though, do you plan to add it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. Will added in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
Suggested some minor changes but none of them are blocker. Looking forward to end to end tests. Thanks for adding UT, IT tests but running it real kafka connect env might have its own issues which might not unravel here.
Good stuff @sfc-gh-alhuang
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments, PTAL! My main concern is about code duplication, please see if anything could be reused, thanks!
|| connectorConfigs.getOrDefault(Utils.PRIVATE_KEY_PASSPHRASE, "").contains("${file:")) | ||
// If using snowflake_jwt and authentication, and private key or private key passphrase is | ||
// provided through file, skip validation | ||
if (connectorConfigs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we're adding a new option, could you ask @sfc-gh-xhuang or @sfc-gh-lema to update our online doc?
src/main/java/com/snowflake/kafka/connector/internal/OAuthConstants.java
Show resolved
Hide resolved
|
||
// Exponential backoff retries | ||
try { | ||
Thread.sleep((1L << retries) * 1000L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a retry logic that can be reused in KC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, ship it!
src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskForStreamingIT.java
Show resolved
Hide resolved
The linked doc in the pr isn't accessible freely :) |
Things to do before merging this PR
According to @sfc-gh-japatel, this PR should be merge after the OAuth is support in SDK.
Update the Ingest SDK version, and modify the test should make this work. All the tests required modified are commented with TODOs.
After this PR is checked in, update local profile.json to pass the test.
Change
Support OAuth authenticator of streaming ingest with KC. For more details, please refer to the doc. Jira link: SNOW-189106
Test
OAuth authenticator for streaming ingest sdk is not released yet, tests with streaming ingest client are currently disabled until SNOW-352846 is released. Three profile.json.gpg files are updated for OAuth testing.