Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public class SnowflakeSinkConnectorConfig {
public static final String SNOWFLAKE_ROLE = Utils.SF_ROLE;
public static final String ENABLE_SCHEMATIZATION_CONFIG = "snowflake.enable.schematization";
public static final String ENABLE_SCHEMATIZATION_CONFIG_DEFAULT = "true";
public static final String ENABLE_CHANGE_TRACKING_CONFIG = "snowflake.enable.change.tracking";
public static final boolean ENABLE_CHANGE_TRACKING_DEFAULT = false;

// Proxy Info
public static final String JVM_PROXY_HOST = "jvm.proxy.host";
Expand Down Expand Up @@ -161,6 +163,7 @@ public class SnowflakeSinkConnectorConfig {
public static final String ENABLE_MDC_LOGGING_DOC =
"Enable MDC context to prepend log messages. Note that this is only available after Apache"
+ " Kafka 2.3";

/**
* Used to serialize the incoming records to kafka connector. Note: Converter code is invoked
* before actually sending records to Kafka connector.
Expand All @@ -179,6 +182,7 @@ public class SnowflakeSinkConnectorConfig {
"com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
"com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry",
"com.snowflake.kafka.connector.records.SnowflakeAvroConverter");

/**
* Boolean Validator of passed booleans in configurations (TRUE or FALSE). This validator is case
* insensitive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,17 @@ public static ConfigDef getConfig() {
HIGH,
"When set to true the connector will ingest data into the Iceberg table. Check the"
+ " official Snowflake documentation for the prerequisites.")
.define(
ENABLE_CHANGE_TRACKING_CONFIG,
ConfigDef.Type.BOOLEAN,
ENABLE_CHANGE_TRACKING_DEFAULT,
ConfigDef.Importance.LOW,
"Enable CHANGE_TRACKING on automatically created tables. When set to true, the"
+ " connector will enable change tracking on tables it creates automatically.",
CONNECTOR_CONFIG_DOC,
9,
ConfigDef.Width.NONE,
ENABLE_CHANGE_TRACKING_CONFIG)
.define(
CACHE_TABLE_EXISTS,
BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ public void appendMetaColIfNotExist(String tableName) {
delegate.appendMetaColIfNotExist(tableName);
}

@Override
public void enableChangeTrackingOnTable(String tableName) {
delegate.enableChangeTrackingOnTable(tableName);
}

private void logStatsIfNeeded() {
final long now = System.currentTimeMillis();
final long lastLogged = lastStatsLogTimestamp.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,12 @@ public interface SnowflakeConnectionService {
* @param parameters query parameters
*/
void executeQueryWithParameters(String query, String... parameters);

/**
* Enable CHANGE_TRACKING on a table. This is typically called after creating a table
* automatically to enable change data capture capabilities.
*
* @param tableName table name
*/
void enableChangeTrackingOnTable(String tableName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig.SNOWPIPE_STREAMING;

import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import java.util.Map;
Expand All @@ -19,6 +20,9 @@ public static class SnowflakeConnectionServiceBuilder {
private String connectorName;
private String taskID = "-1";

// Enable CHANGE_TRACKING on the table after creation
private boolean enableChangeTracking = false;

/** Underlying implementation - Check Enum {@link IngestionMethodConfig} */
private IngestionMethodConfig ingestionMethodConfig;

Expand All @@ -41,7 +45,12 @@ public SnowflakeConnectionServiceBuilder setProperties(Map<String, String> conf)
}
this.url = new SnowflakeURL(conf.get(Utils.SF_URL));
this.connectorName = conf.get(Utils.NAME);
this.config = conf; // Store the config for caching configuration
this.config = conf;
this.enableChangeTracking =
Boolean.parseBoolean(
conf.getOrDefault(
SnowflakeSinkConnectorConfig.ENABLE_CHANGE_TRACKING_CONFIG,
Boolean.toString(SnowflakeSinkConnectorConfig.ENABLE_CHANGE_TRACKING_DEFAULT)));

Properties proxyProperties = InternalUtils.generateProxyParametersIfRequired(conf);
Properties connectionProperties =
Expand All @@ -59,7 +68,8 @@ public SnowflakeConnectionService build() {
InternalUtils.assertNotEmpty("connectorName", connectorName);

SnowflakeConnectionService baseService =
new StandardSnowflakeConnectionService(jdbcProperties, url, connectorName, taskID);
new StandardSnowflakeConnectionService(
jdbcProperties, url, connectorName, taskID, enableChangeTracking);

CachingConfig cachingConfig = CachingConfig.fromConfig(config);
return new CachingSnowflakeConnectionService(baseService, cachingConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,18 @@ public class StandardSnowflakeConnectionService implements SnowflakeConnectionSe
private final SnowflakeTelemetryService telemetry;
private final String connectorName;
private final String taskID;
private final boolean enableChangeTracking;

StandardSnowflakeConnectionService(
JdbcProperties jdbcProperties, SnowflakeURL url, String connectorName, String taskID) {
JdbcProperties jdbcProperties,
SnowflakeURL url,
String connectorName,
String taskID,
boolean enableChangeTracking) {
this.connectorName = connectorName;
this.taskID = taskID;
this.enableChangeTracking = enableChangeTracking;

Properties proxyProperties = jdbcProperties.getProxyProperties();
Properties combinedProperties = jdbcProperties.getProperties();
try {
Expand All @@ -62,6 +69,24 @@ public class StandardSnowflakeConnectionService implements SnowflakeConnectionSe

@Override
public void createTable(final String tableName, final boolean overwrite) {
createTable(tableName, overwrite, this.enableChangeTracking);
}

@Override
public void createTable(final String tableName) {
createTable(tableName, false, this.enableChangeTracking);
}

/**
* Create a table with two variant columns: RECORD_METADATA and RECORD_CONTENT
*
* @param tableName a string represents table name
* @param overwrite if true, execute "create or replace table" query; otherwise, run "create table
* if not exists"
* @param enableChangeTracking if true, enable CHANGE_TRACKING on the table after creation
*/
public void createTable(
final String tableName, final boolean overwrite, final boolean enableChangeTracking) {
checkConnection();
InternalUtils.assertNotEmpty("tableName", tableName);
String query;
Expand All @@ -80,15 +105,25 @@ public void createTable(final String tableName, final boolean overwrite) {
}

LOGGER.info("create table {}", tableName);
}

@Override
public void createTable(final String tableName) {
createTable(tableName, false);
if (enableChangeTracking) {
enableChangeTrackingOnTable(tableName);
}
}

@Override
public void createTableWithOnlyMetadataColumn(final String tableName) {
createTableWithOnlyMetadataColumn(tableName, this.enableChangeTracking);
}

/**
* Create a table with only RECORD_METADATA column for snowpipe streaming
*
* @param tableName table name
* @param enableChangeTracking if true, enable CHANGE_TRACKING on the table after creation
*/
public void createTableWithOnlyMetadataColumn(
final String tableName, final boolean enableChangeTracking) {
checkConnection();
InternalUtils.assertNotEmpty("tableName", tableName);
String createTableQuery =
Expand Down Expand Up @@ -118,6 +153,10 @@ public void createTableWithOnlyMetadataColumn(final String tableName) {
}

LOGGER.info("Created table {} with only RECORD_METADATA column", tableName);

if (enableChangeTracking) {
enableChangeTrackingOnTable(tableName);
}
}

@Override
Expand Down Expand Up @@ -652,6 +691,24 @@ public void executeQueryWithParameters(String query, String... parameters) {
}
}

@Override
public void enableChangeTrackingOnTable(String tableName) {
checkConnection();
InternalUtils.assertNotEmpty("tableName", tableName);
String enableChangeTrackingQuery = "alter table identifier(?) set CHANGE_TRACKING = true";
try {
PreparedStatement stmt = conn.prepareStatement(enableChangeTrackingQuery);
stmt.setString(1, tableName);
stmt.execute();
stmt.close();
LOGGER.info("Enabled CHANGE_TRACKING on table: {}", tableName);
} catch (SQLException e) {
// Log warning but don't fail - similar to schema evolution behavior
LOGGER.warn(
"Enable CHANGE_TRACKING failed on table: {}, message: {}", tableName, e.getMessage());
}
}

@VisibleForTesting
protected ChannelMigrateOffsetTokenResponseDTO getChannelMigrateOffsetTokenResponseDTO(
String migrateOffsetTokenResultFromSysFunc) throws JsonProcessingException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,12 @@ public Optional<MetricRegistry> getMetricRegistry(String partitionChannelKey) {

// ------ Streaming Ingest Related Functions ------ //
private void createTableIfNotExists(final String tableName) {
boolean enableChangeTracking =
Boolean.parseBoolean(
connectorConfig.getOrDefault(
SnowflakeSinkConnectorConfig.ENABLE_CHANGE_TRACKING_CONFIG,
String.valueOf(SnowflakeSinkConnectorConfig.ENABLE_CHANGE_TRACKING_DEFAULT)));

if (this.conn.tableExist(tableName)) {
if (!isSchematizationEnabled(connectorConfig)) {
if (this.conn.isTableCompatible(tableName)) {
Expand All @@ -469,7 +475,7 @@ private void createTableIfNotExists(final String tableName) {
// the schema
this.conn.createTableWithOnlyMetadataColumn(tableName);
} else {
this.conn.createTable(tableName);
this.conn.createTable(tableName, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,28 @@ void testConnectionFunction() {
service.close();
assert service.isClosed();
}

@Test
void testEnableChangeTracking() throws SQLException {
// create table without change tracking
conn.createTable(tableName);
assert conn.tableExist(tableName);
// verify change tracking is off by default
assert !isChangeTrackingEnabled(tableName);

// enable change tracking
conn.enableChangeTrackingOnTable(tableName);
// verify change tracking is on
assert isChangeTrackingEnabled(tableName);

TestUtils.dropTable(tableName);
}

private boolean isChangeTrackingEnabled(String tableName) throws SQLException {
ResultSet rs = TestUtils.executeQuery("SHOW TABLES LIKE '" + tableName + "'");
if (rs.next()) {
return "ON".equalsIgnoreCase(rs.getString("change_tracking"));
}
return false;
}
}
Loading