Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class SnowflakeSinkConnectorConfig {
public static final String SNOWFLAKE_ROLE = Utils.SF_ROLE;
public static final String ENABLE_SCHEMATIZATION_CONFIG = "snowflake.enable.schematization";
public static final String ENABLE_SCHEMATIZATION_DEFAULT = "false";
public static final String ENABLE_CHANGE_TRACKING_CONFIG = "snowflake.enable.change.tracking";
public static final boolean ENABLE_CHANGE_TRACKING_DEFAULT = false;

// Proxy Info
public static final String JVM_PROXY_HOST = "jvm.proxy.host";
Expand Down Expand Up @@ -162,6 +164,7 @@ public class SnowflakeSinkConnectorConfig {
public static final String ENABLE_MDC_LOGGING_DOC =
"Enable MDC context to prepend log messages. Note that this is only available after Apache"
+ " Kafka 2.3";

/**
* Used to serialize the incoming records to kafka connector. Note: Converter code is invoked
* before actually sending records to Kafka connector.
Expand All @@ -180,6 +183,7 @@ public class SnowflakeSinkConnectorConfig {
"com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
"com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry",
"com.snowflake.kafka.connector.records.SnowflakeAvroConverter");

/**
* Boolean Validator of passed booleans in configurations (TRUE or FALSE). This validator is case
* insensitive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,17 @@ public static ConfigDef getConfig() {
ICEBERG_ENABLED_DEFAULT_VALUE,
ConfigDef.Importance.HIGH,
"When set to true the connector will ingest data into the Iceberg table. Check the"
+ " official Snowflake documentation for the prerequisites.");
+ " official Snowflake documentation for the prerequisites.")
.define(
ENABLE_CHANGE_TRACKING_CONFIG,
ConfigDef.Type.BOOLEAN,
ENABLE_CHANGE_TRACKING_DEFAULT,
ConfigDef.Importance.LOW,
"Enable CHANGE_TRACKING on automatically created tables. When set to true, the"
+ " connector will enable change tracking on tables it creates automatically.",
CONNECTOR_CONFIG_DOC,
9,
ConfigDef.Width.NONE,
ENABLE_CHANGE_TRACKING_CONFIG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,4 +189,12 @@ public interface SnowflakeConnectionService {
* @param parameters query parameters
*/
void executeQueryWithParameters(String query, String... parameters);

/**
* Enable CHANGE_TRACKING on a table. This is typically called after creating a table
* automatically to enable change data capture capabilities.
*
* @param tableName table name
*/
void enableChangeTrackingOnTable(String tableName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,24 @@ public class SnowflakeConnectionServiceV1 implements SnowflakeConnectionService

@Override
public void createTable(final String tableName, final boolean overwrite) {
createTable(tableName, overwrite, false);
}

@Override
public void createTable(final String tableName) {
createTable(tableName, false, false);
}

/**
* Create a table with two variant columns: RECORD_METADATA and RECORD_CONTENT
*
* @param tableName a string represents table name
* @param overwrite if true, execute "create or replace table" query; otherwise, run "create table
* if not exists"
* @param enableChangeTracking if true, enable CHANGE_TRACKING on the table after creation
*/
public void createTable(
final String tableName, final boolean overwrite, final boolean enableChangeTracking) {
checkConnection();
InternalUtils.assertNotEmpty("tableName", tableName);
String query;
Expand All @@ -104,15 +122,25 @@ public void createTable(final String tableName, final boolean overwrite) {
}

LOGGER.info("create table {}", tableName);
}

@Override
public void createTable(final String tableName) {
createTable(tableName, false);
if (enableChangeTracking) {
enableChangeTrackingOnTable(tableName);
}
}

@Override
public void createTableWithOnlyMetadataColumn(final String tableName) {
createTableWithOnlyMetadataColumn(tableName, false);
}

/**
* Create a table with only RECORD_METADATA column for snowpipe streaming
*
* @param tableName table name
* @param enableChangeTracking if true, enable CHANGE_TRACKING on the table after creation
*/
public void createTableWithOnlyMetadataColumn(
final String tableName, final boolean enableChangeTracking) {
checkConnection();
InternalUtils.assertNotEmpty("tableName", tableName);
String createTableQuery =
Expand Down Expand Up @@ -142,6 +170,10 @@ public void createTableWithOnlyMetadataColumn(final String tableName) {
}

LOGGER.info("Created table {} with only RECORD_METADATA column", tableName);

if (enableChangeTracking) {
enableChangeTrackingOnTable(tableName);
}
}

@Override
Expand Down Expand Up @@ -729,6 +761,24 @@ public void executeQueryWithParameters(String query, String... parameters) {
}
}

@Override
public void enableChangeTrackingOnTable(String tableName) {
checkConnection();
InternalUtils.assertNotEmpty("tableName", tableName);
String enableChangeTrackingQuery = "alter table identifier(?) set CHANGE_TRACKING = true";
try {
PreparedStatement stmt = conn.prepareStatement(enableChangeTrackingQuery);
stmt.setString(1, tableName);
stmt.execute();
stmt.close();
LOGGER.info("Enabled CHANGE_TRACKING on table: {}", tableName);
} catch (SQLException e) {
// Log warning but don't fail - similar to schema evolution behavior
LOGGER.warn(
"Enable CHANGE_TRACKING failed on table: {}, message: {}", tableName, e.getMessage());
}
}

@VisibleForTesting
protected ChannelMigrateOffsetTokenResponseDTO getChannelMigrateOffsetTokenResponseDTO(
String migrateOffsetTokenResultFromSysFunc) throws JsonProcessingException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.snowflake.kafka.connector.dlq.KafkaRecordErrorReporter;
import com.snowflake.kafka.connector.internal.KCLogger;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1;
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
import com.snowflake.kafka.connector.internal.SnowflakeSinkService;
import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter;
Expand Down Expand Up @@ -78,21 +79,25 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
// Behavior to be set at the start of connector start. (For tombstone records)
private final SnowflakeSinkConnectorConfig.BehaviorOnNullValues behaviorOnNullValues;
private final MetricsJmxReporter metricsJmxReporter;

/**
* Fetching this from {@link org.apache.kafka.connect.sink.SinkTaskContext}'s {@link
* org.apache.kafka.connect.sink.ErrantRecordReporter}
*/
private final KafkaRecordErrorReporter kafkaRecordErrorReporter;

/* SinkTaskContext has access to all methods/APIs available to talk to Kafka Connect runtime*/
private final SinkTaskContext sinkTaskContext;
// Config set in JSON
private final Map<String, String> connectorConfig;

/**
* Key is formulated in {@link #partitionChannelKey(String, int)} }
*
* <p>value is the Streaming Ingest Channel implementation (Wrapped around TopicPartitionChannel)
*/
private final Map<String, TopicPartitionChannel> partitionsToChannel;

// Cache for schema evolution
private final Map<String, Boolean> tableName2SchemaEvolutionPermission;
// Set that keeps track of the channels that have been seen per input batch
Expand Down Expand Up @@ -475,6 +480,12 @@ protected Optional<TopicPartitionChannel> getTopicPartitionChannelFromCacheKey(

// ------ Streaming Ingest Related Functions ------ //
private void createTableIfNotExists(final String tableName) {
boolean enableChangeTracking =
Boolean.parseBoolean(
connectorConfig.getOrDefault(
SnowflakeSinkConnectorConfig.ENABLE_CHANGE_TRACKING_CONFIG,
String.valueOf(SnowflakeSinkConnectorConfig.ENABLE_CHANGE_TRACKING_DEFAULT)));

if (this.conn.tableExist(tableName)) {
if (!isSchematizationEnabled(connectorConfig)) {
if (this.conn.isTableCompatible(tableName)) {
Expand All @@ -491,9 +502,11 @@ private void createTableIfNotExists(final String tableName) {
if (isSchematizationEnabled(connectorConfig)) {
// Always create the table with RECORD_METADATA only and rely on schema evolution to update
// the schema
this.conn.createTableWithOnlyMetadataColumn(tableName);
((SnowflakeConnectionServiceV1) this.conn)
.createTableWithOnlyMetadataColumn(tableName, enableChangeTracking);
} else {
this.conn.createTable(tableName);
((SnowflakeConnectionServiceV1) this.conn)
.createTable(tableName, false, enableChangeTracking);
}
}

Expand Down