-
Notifications
You must be signed in to change notification settings - Fork 98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SNOW-851840] Enable tombstone record ingestion in Snowpipe Streaming #688
Conversation
src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/records/RecordService.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/records/RecordService.java
Outdated
Show resolved
Hide resolved
@@ -146,173 +148,6 @@ public void testIngestion() throws Exception { | |||
// assert !conn.pipeExist(pipe); | |||
} | |||
|
|||
@Test | |||
public void testTombstoneRecords_DEFAULT_behavior_ingestion_SFJsonConverter() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved to TombstoneRecordIngestionIT.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, only comment is about a potential risky change which I'm not sure if it's necessary
src/main/java/com/snowflake/kafka/connector/records/RecordService.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/records/RecordService.java
Outdated
Show resolved
Hide resolved
@@ -297,7 +316,7 @@ public SnowflakeTableRow(SnowflakeRecordContent content, JsonNode metadata) { | |||
} | |||
|
|||
void putKey(SinkRecord record, ObjectNode meta) { | |||
if (record.key() == null) { | |||
if (record.key() == null || record.keySchema() == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we can't put the key when the keySchema is null but not the key? This change might be risky and introduce a behavior change for both Snowpipe and Snowpipe Streaming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the current code will NPE on the next if statement if keySchema is null.
handleNativeRecord() prevents null keySchemas from being passed into putKey, however we should throw a better error if we ever get a null keySchema, so I added a null check.
This shouldn't be risky because we are changing the error thrown from NPE to a Snowflake error, what do you think?
@@ -749,19 +579,23 @@ public void testNativeNullIngestion() throws Exception { | |||
service.closeAll(); | |||
} | |||
|
|||
@Test(expected = SnowflakeKafkaConnectorException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to allow null value ingestion because community converters create null values instead of empty SnowflakeRecordContents. Currently Snowpipe fails to ingest null value records with community converters.
@@ -301,6 +322,12 @@ void putKey(SinkRecord record, ObjectNode meta) { | |||
return; | |||
} | |||
|
|||
if (record.keySchema() == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am concerned about this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed in this comment - ensuring we throw a snowflake error instead of NPE. We should never get into this situation, but i want to add this just in case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keyschemas are often null
I dont think we should do this. this seems like a behavior change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keyschemas are often null, we update it in handleNativeRecords earlier in the call stack. if the key schema is null in this method the next if statement throws an NPE.
The only behavior change here is to change the exception thrown from an NPE to a Snowflake Error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I See, thanks. Sounds like a harmless change but its recommended to not have refactoring/fixes in the same PR as features.
This is a small enough change to go through but it just raises more questions from reviewers POV
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, it was caught during testing that method since i wanted to give all possible null scenarios
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
What release should we expect this feature? |
@aidan-melen this should go out in release 2.0.2 |
Allows tombstone records (records with NULL value and value schema) to be ingested.
Motivation: