Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SNOW-851840] Enable tombstone record ingestion in Snowpipe Streaming #688

Merged
merged 44 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
f3916c5
initial impl
sfc-gh-rcheng Aug 17, 2023
eec908e
autoformatting
sfc-gh-rcheng Aug 17, 2023
62c1f24
works, making it
sfc-gh-rcheng Aug 18, 2023
b39a691
autoformatting
sfc-gh-rcheng Aug 18, 2023
290aad2
passes except for avro
sfc-gh-rcheng Aug 18, 2023
240d968
it passes
sfc-gh-rcheng Aug 18, 2023
a2c53e6
trying ut
sfc-gh-rcheng Aug 18, 2023
cd09713
autoformatting
sfc-gh-rcheng Aug 18, 2023
928b943
build passes locally
sfc-gh-rcheng Aug 18, 2023
00bee04
personal nits
sfc-gh-rcheng Aug 21, 2023
1dde035
autoformatting
sfc-gh-rcheng Aug 21, 2023
62e4757
eyeball cc
sfc-gh-rcheng Aug 21, 2023
94ac2ca
uncomment test cases
sfc-gh-rcheng Aug 21, 2023
e73a063
autoformatting
sfc-gh-rcheng Aug 21, 2023
553999b
Merge branch 'master' into rcheng-tombstone
sfc-gh-rcheng Aug 21, 2023
754824e
autoformatting
sfc-gh-rcheng Aug 22, 2023
6c20656
fix sinktest
sfc-gh-rcheng Aug 22, 2023
b85f733
add null key failure test
sfc-gh-rcheng Aug 22, 2023
4ab7ffa
autoformatting
sfc-gh-rcheng Aug 22, 2023
cafdd42
merge master
sfc-gh-rcheng Aug 22, 2023
37dd5a7
autoformatting
sfc-gh-rcheng Aug 22, 2023
65aab68
fix dlq test
sfc-gh-rcheng Aug 22, 2023
cfd6f54
add exception case
sfc-gh-rcheng Aug 22, 2023
32d7a60
autoformatting
sfc-gh-rcheng Aug 22, 2023
014b8eb
nonnull keys
sfc-gh-rcheng Aug 22, 2023
434ebc2
autoformatting
sfc-gh-rcheng Aug 22, 2023
6583657
fix it again
sfc-gh-rcheng Aug 22, 2023
abc32d3
autoformatting
sfc-gh-rcheng Aug 22, 2023
4ba153e
simplify
sfc-gh-rcheng Aug 23, 2023
8673464
autoformatting
sfc-gh-rcheng Aug 23, 2023
ffa955b
tests
sfc-gh-rcheng Aug 23, 2023
a6f76d7
autoformatting
sfc-gh-rcheng Aug 23, 2023
07e0db4
add schema enable back
sfc-gh-rcheng Aug 23, 2023
e644e90
add all null tests
sfc-gh-rcheng Aug 23, 2023
27b6641
autoformatting
sfc-gh-rcheng Aug 23, 2023
631f75a
add null key test to streaming
sfc-gh-rcheng Aug 23, 2023
c815b48
autoformatting
sfc-gh-rcheng Aug 23, 2023
eef2642
update test to call set not config
sfc-gh-rcheng Aug 24, 2023
06c69ba
group it test
sfc-gh-rcheng Aug 24, 2023
abeaf39
nits
sfc-gh-rcheng Aug 24, 2023
3fc817f
autoformatting
sfc-gh-rcheng Aug 24, 2023
3fd589b
remove null check
sfc-gh-rcheng Aug 24, 2023
21cc65a
autoformatting
sfc-gh-rcheng Aug 24, 2023
f516110
null check schema
sfc-gh-rcheng Aug 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.snowflake.kafka.connector.internal;

import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;

Expand Down Expand Up @@ -283,7 +284,10 @@ public enum SnowflakeErrors {
ERROR_5016(
"5016",
"Invalid SinkRecord received",
"SinkRecord.value and SinkRecord.valueSchema cannot be null"),
"SinkRecord.value and SinkRecord.valueSchema cannot be null unless tombstone record ingestion"
+ " is enabled (see "
+ SnowflakeSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG
+ " for more information."),
ERROR_5017(
"5017", "Invalid api call to cached put", "Cached put only support AWS, Azure and GCS."),
ERROR_5018("5018", "Failed to execute cached put", "Error in cached put command"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public class RecordService {
static final String HEADERS = "headers";

private boolean enableSchematization = false;
private SnowflakeSinkConnectorConfig.BehaviorOnNullValues behaviorOnNullValues =
SnowflakeSinkConnectorConfig.BehaviorOnNullValues.DEFAULT;

// For each task, we require a separate instance of SimpleDataFormat, since they are not
// inherently thread safe
Expand All @@ -89,7 +91,6 @@ public class RecordService {

// This class is designed to work with empty metadata config map
private SnowflakeMetadataConfig metadataConfig = new SnowflakeMetadataConfig();

sfc-gh-rcheng marked this conversation as resolved.
Show resolved Hide resolved
/** Send Telemetry Data to Snowflake */
private final SnowflakeTelemetryService telemetryService;

Expand All @@ -108,13 +109,12 @@ public RecordService(SnowflakeTelemetryService telemetryService) {
/** Record service with null telemetry Service, only use it for testing. */
@VisibleForTesting
public RecordService() {
this.telemetryService = null;
this(null);
sfc-gh-rcheng marked this conversation as resolved.
Show resolved Hide resolved
}

public void setMetadataConfig(SnowflakeMetadataConfig metadataConfigIn) {
metadataConfig = metadataConfigIn;
}

sfc-gh-rcheng marked this conversation as resolved.
Show resolved Hide resolved
/**
* extract enableSchematization from the connector config and set the value for the recordService
*
Expand Down Expand Up @@ -145,26 +145,45 @@ public void setEnableSchematization(final boolean enableSchematization) {
this.enableSchematization = enableSchematization;
}

/**
* Directly set the behaviorOnNullValues through param
*
* <p>This method is only for testing
*
* @param behaviorOnNullValues how to handle null values
*/
@VisibleForTesting
public void setBehaviorOnNullValues(
final SnowflakeSinkConnectorConfig.BehaviorOnNullValues behaviorOnNullValues) {
this.behaviorOnNullValues = behaviorOnNullValues;
}

/**
* process given SinkRecord, only support snowflake converters
*
* @param record SinkRecord
* @return a Row wrapper which contains both actual content(payload) and metadata
*/
private SnowflakeTableRow processRecord(SinkRecord record) {
SnowflakeRecordContent valueContent;

if (record.value() == null || record.valueSchema() == null) {
throw SnowflakeErrors.ERROR_5016.getException();
}
if (!record.valueSchema().name().equals(SnowflakeJsonSchema.NAME)) {
throw SnowflakeErrors.ERROR_0009.getException();
}
if (!(record.value() instanceof SnowflakeRecordContent)) {
throw SnowflakeErrors.ERROR_0010.getException(
"Input record should be SnowflakeRecordContent object");
if (this.behaviorOnNullValues == SnowflakeSinkConnectorConfig.BehaviorOnNullValues.DEFAULT) {
valueContent = new SnowflakeRecordContent();
} else {
throw SnowflakeErrors.ERROR_5016.getException();
}
} else {
if (!record.valueSchema().name().equals(SnowflakeJsonSchema.NAME)) {
throw SnowflakeErrors.ERROR_0009.getException();
}
if (!(record.value() instanceof SnowflakeRecordContent)) {
throw SnowflakeErrors.ERROR_0010.getException(
"Input record should be SnowflakeRecordContent object");
}
valueContent = (SnowflakeRecordContent) record.value();
}

SnowflakeRecordContent valueContent = (SnowflakeRecordContent) record.value();

ObjectNode meta = MAPPER.createObjectNode();
if (metadataConfig.topicFlag) {
meta.put(TOPIC, record.topic());
Expand Down Expand Up @@ -297,7 +316,7 @@ public SnowflakeTableRow(SnowflakeRecordContent content, JsonNode metadata) {
}

void putKey(SinkRecord record, ObjectNode meta) {
if (record.key() == null) {
if (record.key() == null || record.keySchema() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we can't put the key when the keySchema is null but not the key? This change might be risky and introduce a behavior change for both Snowpipe and Snowpipe Streaming

Copy link
Collaborator Author

@sfc-gh-rcheng sfc-gh-rcheng Aug 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current code will NPE on the next if statement if keySchema is null.

handleNativeRecord() prevents null keySchemas from being passed into putKey, however we should throw a better error if we ever get a null keySchema, so I added a null check.

This shouldn't be risky because we are changing the error thrown from NPE to a Snowflake error, what do you think?

return;
}

Expand Down
166 changes: 103 additions & 63 deletions src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,59 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.storage.Converter;
import org.junit.Assert;
import org.junit.Test;

public class ConnectorConfigTest {

public enum CommunityConverterSubset {
STRING_CONVERTER(
"org.apache.kafka.connect.storage.StringConverter",
new org.apache.kafka.connect.storage.StringConverter()),
JSON_CONVERTER(
"org.apache.kafka.connect.json.JsonConverter",
new org.apache.kafka.connect.json.JsonConverter()),
AVRO_CONVERTER(
"io.confluent.connect.avro.AvroConverter", new io.confluent.connect.avro.AvroConverter());

private final String name;
public final Converter converter;

CommunityConverterSubset(String name, Converter converter) {
this.name = name;
this.converter = converter;
}

public String toString() {
return this.name;
}
};

public enum CustomSfConverter {
sfc-gh-rcheng marked this conversation as resolved.
Show resolved Hide resolved
JSON_CONVERTER(
"com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
new com.snowflake.kafka.connector.records.SnowflakeJsonConverter()),
AVRO_CONVERTER_WITHOUT_SCHEMA_REGISTRY(
"com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry",
new com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry()),
AVRO_CONVERTER(
"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
new com.snowflake.kafka.connector.records.SnowflakeAvroConverter());

private final String name;
public final Converter converter;

CustomSfConverter(String name, Converter converter) {
this.name = name;
this.converter = converter;
}

public String toString() {
return this.name;
}
}

@Test
public void testConfig() {
Map<String, String> config = getConfig();
Expand Down Expand Up @@ -656,81 +704,73 @@ public void testValidKeyAndValueConvertersForStreamingSnowpipe() {
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");
Utils.validateConfig(config);

config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.json.JsonConverter");
Utils.validateConfig(config);
Arrays.stream(CommunityConverterSubset.values())
.forEach(
converter -> {
config.put(SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD, converter.name);
Utils.validateConfig(config);
});

config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
"io.confluent.connect.avro.AvroConverter");
Utils.validateConfig(config);

config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");
Utils.validateConfig(config);

config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.json.JsonConverter");
Utils.validateConfig(config);

config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"io.confluent.connect.avro.AvroConverter");
Utils.validateConfig(config);
Arrays.stream(CommunityConverterSubset.values())
.forEach(
converter -> {
config.put(SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, converter.name);
Utils.validateConfig(config);
});
}

@Test
public void testInvalidKeyConvertersForStreamingSnowpipe() {
try {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
"com.snowflake.kafka.connector.records.SnowflakeJsonConverter");
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");

config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");
Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD);
}
Arrays.stream(CustomSfConverter.values())
.forEach(
converter -> {
try {
config.put(SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD, converter.name);
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");

Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD);
}
});
}

@Test
public void testInvalidValueConvertersForStreamingSnowpipe() {
try {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"com.snowflake.kafka.connector.records.SnowflakeJsonConverter");
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");

config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");
Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD);
}
Arrays.stream(CustomSfConverter.values())
.forEach(
converter -> {
try {
config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, converter.name);

Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD);
}
});
}

@Test
Expand Down
Loading
Loading