Skip to content

Commit

Permalink
[SNOW-851840] Enable tombstone record ingestion in Snowpipe Streaming (
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng authored Aug 25, 2023
1 parent ef0c907 commit e5c0e0c
Show file tree
Hide file tree
Showing 7 changed files with 489 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.snowflake.kafka.connector.internal;

import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;

Expand Down Expand Up @@ -283,7 +284,10 @@ public enum SnowflakeErrors {
ERROR_5016(
"5016",
"Invalid SinkRecord received",
"SinkRecord.value and SinkRecord.valueSchema cannot be null"),
"SinkRecord.value and SinkRecord.valueSchema cannot be null unless tombstone record ingestion"
+ " is enabled (see "
+ SnowflakeSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG
+ " for more information."),
ERROR_5017(
"5017", "Invalid api call to cached put", "Cached put only support AWS, Azure and GCS."),
ERROR_5018("5018", "Failed to execute cached put", "Error in cached put command"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public class RecordService {
static final String HEADERS = "headers";

private boolean enableSchematization = false;
private SnowflakeSinkConnectorConfig.BehaviorOnNullValues behaviorOnNullValues =
SnowflakeSinkConnectorConfig.BehaviorOnNullValues.DEFAULT;

// For each task, we require a separate instance of SimpleDataFormat, since they are not
// inherently thread safe
Expand Down Expand Up @@ -108,7 +110,7 @@ public RecordService(SnowflakeTelemetryService telemetryService) {
/** Record service with null telemetry Service, only use it for testing. */
@VisibleForTesting
public RecordService() {
this.telemetryService = null;
this(null);
}

public void setMetadataConfig(SnowflakeMetadataConfig metadataConfigIn) {
Expand Down Expand Up @@ -145,26 +147,45 @@ public void setEnableSchematization(final boolean enableSchematization) {
this.enableSchematization = enableSchematization;
}

/**
* Directly set the behaviorOnNullValues through param
*
* <p>This method is only for testing
*
* @param behaviorOnNullValues how to handle null values
*/
@VisibleForTesting
public void setBehaviorOnNullValues(
final SnowflakeSinkConnectorConfig.BehaviorOnNullValues behaviorOnNullValues) {
this.behaviorOnNullValues = behaviorOnNullValues;
}

/**
* process given SinkRecord, only support snowflake converters
*
* @param record SinkRecord
* @return a Row wrapper which contains both actual content(payload) and metadata
*/
private SnowflakeTableRow processRecord(SinkRecord record) {
SnowflakeRecordContent valueContent;

if (record.value() == null || record.valueSchema() == null) {
throw SnowflakeErrors.ERROR_5016.getException();
}
if (!record.valueSchema().name().equals(SnowflakeJsonSchema.NAME)) {
throw SnowflakeErrors.ERROR_0009.getException();
}
if (!(record.value() instanceof SnowflakeRecordContent)) {
throw SnowflakeErrors.ERROR_0010.getException(
"Input record should be SnowflakeRecordContent object");
if (this.behaviorOnNullValues == SnowflakeSinkConnectorConfig.BehaviorOnNullValues.DEFAULT) {
valueContent = new SnowflakeRecordContent();
} else {
throw SnowflakeErrors.ERROR_5016.getException();
}
} else {
if (!record.valueSchema().name().equals(SnowflakeJsonSchema.NAME)) {
throw SnowflakeErrors.ERROR_0009.getException();
}
if (!(record.value() instanceof SnowflakeRecordContent)) {
throw SnowflakeErrors.ERROR_0010.getException(
"Input record should be SnowflakeRecordContent object");
}
valueContent = (SnowflakeRecordContent) record.value();
}

SnowflakeRecordContent valueContent = (SnowflakeRecordContent) record.value();

ObjectNode meta = MAPPER.createObjectNode();
if (metadataConfig.topicFlag) {
meta.put(TOPIC, record.topic());
Expand Down Expand Up @@ -301,6 +322,12 @@ void putKey(SinkRecord record, ObjectNode meta) {
return;
}

if (record.keySchema() == null) {
throw SnowflakeErrors.ERROR_0010.getException(
"Unsupported Key format, please implement either String Key Converter or Snowflake"
+ " Converters");
}

if (record.keySchema().toString().equals(Schema.STRING_SCHEMA.toString())) {
meta.put(KEY, record.key().toString());
} else if (SnowflakeJsonSchema.NAME.equals(record.keySchema().name())) {
Expand Down
166 changes: 103 additions & 63 deletions src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,59 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.storage.Converter;
import org.junit.Assert;
import org.junit.Test;

public class ConnectorConfigTest {

public enum CommunityConverterSubset {
STRING_CONVERTER(
"org.apache.kafka.connect.storage.StringConverter",
new org.apache.kafka.connect.storage.StringConverter()),
JSON_CONVERTER(
"org.apache.kafka.connect.json.JsonConverter",
new org.apache.kafka.connect.json.JsonConverter()),
AVRO_CONVERTER(
"io.confluent.connect.avro.AvroConverter", new io.confluent.connect.avro.AvroConverter());

private final String name;
public final Converter converter;

CommunityConverterSubset(String name, Converter converter) {
this.name = name;
this.converter = converter;
}

public String toString() {
return this.name;
}
};

public enum CustomSfConverter {
JSON_CONVERTER(
"com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
new com.snowflake.kafka.connector.records.SnowflakeJsonConverter()),
AVRO_CONVERTER_WITHOUT_SCHEMA_REGISTRY(
"com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry",
new com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry()),
AVRO_CONVERTER(
"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
new com.snowflake.kafka.connector.records.SnowflakeAvroConverter());

private final String name;
public final Converter converter;

CustomSfConverter(String name, Converter converter) {
this.name = name;
this.converter = converter;
}

public String toString() {
return this.name;
}
}

@Test
public void testConfig() {
Map<String, String> config = getConfig();
Expand Down Expand Up @@ -656,81 +704,73 @@ public void testValidKeyAndValueConvertersForStreamingSnowpipe() {
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");
Utils.validateConfig(config);

config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.json.JsonConverter");
Utils.validateConfig(config);
Arrays.stream(CommunityConverterSubset.values())
.forEach(
converter -> {
config.put(SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD, converter.name);
Utils.validateConfig(config);
});

config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
"io.confluent.connect.avro.AvroConverter");
Utils.validateConfig(config);

config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");
Utils.validateConfig(config);

config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.json.JsonConverter");
Utils.validateConfig(config);

config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"io.confluent.connect.avro.AvroConverter");
Utils.validateConfig(config);
Arrays.stream(CommunityConverterSubset.values())
.forEach(
converter -> {
config.put(SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, converter.name);
Utils.validateConfig(config);
});
}

@Test
public void testInvalidKeyConvertersForStreamingSnowpipe() {
try {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
"com.snowflake.kafka.connector.records.SnowflakeJsonConverter");
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");

config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");
Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD);
}
Arrays.stream(CustomSfConverter.values())
.forEach(
converter -> {
try {
config.put(SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD, converter.name);
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");

Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD);
}
});
}

@Test
public void testInvalidValueConvertersForStreamingSnowpipe() {
try {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"com.snowflake.kafka.connector.records.SnowflakeJsonConverter");
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");

config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");
Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD);
}
Arrays.stream(CustomSfConverter.values())
.forEach(
converter -> {
try {
config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, converter.name);

Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD);
}
});
}

@Test
Expand Down
Loading

0 comments on commit e5c0e0c

Please sign in to comment.