Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/main/java/com/snowflake/kafka/connector/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ public static final class KafkaConnectorConfigParams {
public static final boolean SNOWFLAKE_STREAMING_METADATA_CONNECTOR_PUSH_TIME_DEFAULT = true;
public static final String SNOWFLAKE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP =
"snowflake.streaming.client.provider.override.map";
// Iceberg
public static final String SNOWFLAKE_STREAMING_ICEBERG_ENABLED =
"snowflake.streaming.iceberg.enabled";
public static final boolean SNOWFLAKE_STREAMING_ICEBERG_ENABLED_DEFAULT = false;
public static final String SNOWFLAKE_STREAMING_MAX_CLIENT_LAG =
"snowflake.streaming.max.client.lag";

Expand Down
10 changes: 0 additions & 10 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package com.snowflake.kafka.connector;

import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_ICEBERG_ENABLED;

import com.google.common.collect.ImmutableMap;
import com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams;
import com.snowflake.kafka.connector.internal.KCLogger;
Expand Down Expand Up @@ -315,14 +313,6 @@ static boolean isValidSnowflakeTableName(String tableName) {
return tableName.matches("^([_a-zA-Z]{1}[_$a-zA-Z0-9]+\\.){0,2}[_a-zA-Z]{1}[_$a-zA-Z0-9]+$");
}

/**
* @param config config with applied default values
* @return true when Iceberg mode is enabled.
*/
public static boolean isIcebergEnabled(Map<String, String> config) {
return Boolean.parseBoolean(config.get(SNOWFLAKE_STREAMING_ICEBERG_ENABLED));
}

/**
* @param config config with applied default values
* @return role specified in rhe config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,6 @@ public static ConfigDef getConfig() {
LOW,
"If set to true the Connector will fail its tasks when authorization error from"
+ " Snowflake occurred")
.define(
KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_ICEBERG_ENABLED,
BOOLEAN,
KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_ICEBERG_ENABLED_DEFAULT,
HIGH,
"When set to true the connector will ingest data into the Iceberg table. Check the"
+ " official Snowflake documentation for the prerequisites.")
.define(
KafkaConnectorConfigParams.CACHE_TABLE_EXISTS,
BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,6 @@ public void createTableWithOnlyMetadataColumn(String tableName) {
tableExistsCache.invalidate(tableName);
}

@Override
public void addMetadataColumnForIcebergIfNotExists(String tableName) {
delegate.addMetadataColumnForIcebergIfNotExists(tableName);
}

@Override
public void initializeMetadataColumnTypeForIceberg(String tableName) {
delegate.initializeMetadataColumnTypeForIceberg(tableName);
}

@Override
public boolean isTableCompatible(String tableName) {
return delegate.isTableCompatible(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ static void assertNotEmpty(String name, Object value) {
switch (name.toLowerCase()) {
case "tablename":
throw SnowflakeErrors.ERROR_0005.getException();
case "stagename":
throw SnowflakeErrors.ERROR_0004.getException();
case "pipename":
throw SnowflakeErrors.ERROR_0006.getException();
case "conf":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,6 @@ public interface SnowflakeConnectionService {
*/
void createTableWithOnlyMetadataColumn(String tableName);

/**
* Alter the RECORD_METADATA column to be of the required structured OBJECT type for iceberg
* tables.
*
* @param tableName iceberg table name
*/
void initializeMetadataColumnTypeForIceberg(String tableName);

/**
* Add the RECORD_METADATA column to the iceberg table if it does not exist.
*
* @param tableName iceberg table name
*/
void addMetadataColumnForIcebergIfNotExists(String tableName);

/**
* Calls describe table statement and returns all columns and corresponding types.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,13 @@ public enum SnowflakeErrors {
"0003",
"Missing required parameter",
"one or multiple required parameters haven't be provided"),
ERROR_0004("0004", "Empty Stage name", "Input Stage name is empty string or null"),
ERROR_0005("0005", "Empty Table name", "Input Table name is empty string or null"),
ERROR_0006("0006", "Empty Pipe name", "Input Pipe name is empty String or null"),
ERROR_0007(
"0007",
"Invalid Snowflake URL",
"Snowflake URL format: 'https://<account_name>.<region_name>"
+ ".snowflakecomputing.com:443', 'https://' and ':443' are optional."),
ERROR_0008(
"0008",
"Invalid staging file name",
"File name format: <app_name>/<table_name>/<partition_number"
+ ">/<start_offset>_<end_offset>_<timestamp>.json.gz"),
ERROR_0009(
"0009",
"Invalid value converter",
"Only support Snowflake Converters (e.g. SnowflakeJsonConverter, "
+ "SnowflakeAvroConverter)"),
ERROR_0010("0010", "Invalid input record", "Input record value can't be parsed"),
ERROR_0011("0011", "Failed to load schema from Schema Registry", "Schema ID doesn't exist"),
ERROR_0012(
"0012",
"Failed to connect schema registry service",
"Schema registry service is not available"),
ERROR_0013(
"0013",
"Missed private key in connector config",
Expand Down Expand Up @@ -105,10 +88,6 @@ public enum SnowflakeErrors {
"0023",
"Invalid proxy username or password",
"Both username and password need to be provided if one of them is provided"),
ERROR_0024(
"0024",
"Reader schema invalid",
"A reader schema is provided but can not be parsed as an Avro schema"),
ERROR_0030(
"0030",
String.format(
Expand All @@ -121,18 +100,6 @@ public enum SnowflakeErrors {
"0031",
"Failed to combine JDBC properties",
"One of snowflake.jdbc.map property overrides other jdbc property"),
ERROR_0032(
"0032",
"Iceberg table does not exist or is in invalid format",
"Check Snowflake Kafka Connector docs for details"),
ERROR_0033(
"0033",
"Invalid OAuth URL",
"OAuth URL format: '[http://|https://]<oauth_server>[:<port>][/<path>]'. Protocol defaults"
+ " to 'https://'. Port defaults to 443 for https and 80 for http. Path may contain"
+ " alphanumeric characters, dots, hyphens, and forward slashes (e.g.,"
+ " 'login.example.com/oauth2/v2.0/token')."),
// Snowflake connection issues 1---
ERROR_1001(
"1001",
"Failed to connect to Snowflake Server",
Expand All @@ -141,8 +108,6 @@ public enum SnowflakeErrors {
"1003",
"Snowflake connection is closed",
"Either the current connection is closed or hasn't connect to snowflake" + " server"),
ERROR_1004(
"1004", "Fetching OAuth token fail", "Fail to get OAuth token from authorization server"),
ERROR_1005(
"1005",
"Task failed due to authorization error",
Expand All @@ -158,35 +123,6 @@ public enum SnowflakeErrors {
"2007",
"Failed to create table",
"Failed to create table on Snowflake, please check that you have permission to do so."),
ERROR_2010("2010", "Max retry exceeded", "Api retry exceeded the max retry limit"),
ERROR_2012(
"2012",
"Failed to append RECORD_METADATA column",
"Failed to append RECORD_METADATA column due to an existing RECORD_METADATA column with"
+ " non-VARIANT type."),
ERROR_2013(
"2013",
"Failed to append RECORD_METADATA column",
"Failed to append RECORD_METADATA column, please check that you have permission to do so."),
ERROR_2014(
"2014", "Table not exists", "Table not exists. It might have been deleted externally."),
ERROR_2015(
"2015", "Failed to append columns", "Failed to append columns during schema evolution"),
ERROR_2017(
"2017",
"Failed to check schema evolution permission",
"Failed to check schema evolution permission"),

ERROR_2018(
"2018",
"Failed to alter RECORD_METADATA column type for iceberg",
"Failed to alter RECORD_METADATA column type to required format for iceberg."),
ERROR_2019(
"2019",
"Failed to add RECORD_METADATA column for iceberg",
"Failed to add RECORD_METADATA column with required format for iceberg."),

ERROR_5003("5003", "Incompatible table", "Table doesn't have a compatible schema"),
ERROR_5007(
"5007",
"SnowflakeStreamingSinkConnector timeout",
Expand All @@ -197,9 +133,6 @@ public enum SnowflakeErrors {
"5010",
"Connection is null or closed",
"Connection is closed or null when starting sink service"),
ERROR_5011(
"5011", "Data is not broken", "Tried to access broken data but the record is not broken"),
ERROR_5012("5012", "Data is broken", "Failed to access record data because it is broken"),
ERROR_5013(
"5013",
"Failed to initialize SinkTask",
Expand All @@ -211,14 +144,6 @@ public enum SnowflakeErrors {
ERROR_5015(
"5015", "Invalid SinkRecord received", "Error parsing SinkRecord value or SinkRecord header"),
ERROR_5020("5020", "Failed to register MBean in MbeanServer", "Object Name is invalid"),
ERROR_5022("5022", "Invalid column name", "Failed to find column in the schema"),

ERROR_5025(
"5025", "Unexpected data type", "Unexpected data type encountered during schema evolution."),
ERROR_5026(
"5026",
"Invalid SinkRecord received",
"Cannot infer type from null or empty object/list during schema evolution."),
ERROR_5027(
"5027",
"Data verification failed",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.snowflake.kafka.connector.internal;

import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA;
import static com.snowflake.kafka.connector.streaming.iceberg.IcebergDDLTypes.ICEBERG_METADATA_OBJECT_SCHEMA;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
Expand Down Expand Up @@ -101,50 +100,6 @@ public void createTableWithOnlyMetadataColumn(final String tableName) {
LOGGER.info("Created table {} with only RECORD_METADATA column", tableName);
}

@Override
public void addMetadataColumnForIcebergIfNotExists(String tableName) {
checkConnection();
InternalUtils.assertNotEmpty("tableName", tableName);
String query =
"ALTER ICEBERG TABLE identifier(?) ADD COLUMN IF NOT EXISTS RECORD_METADATA "
+ ICEBERG_METADATA_OBJECT_SCHEMA;
try {
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, tableName);
stmt.execute();
stmt.close();
} catch (SQLException e) {
LOGGER.error(
"Couldn't alter table {} add RECORD_METADATA column to align with iceberg format",
tableName);
throw SnowflakeErrors.ERROR_2019.getException(e);
}
LOGGER.info(
"alter table {} add RECORD_METADATA column to align with iceberg format", tableName);
}

@Override
public void initializeMetadataColumnTypeForIceberg(String tableName) {
checkConnection();
InternalUtils.assertNotEmpty("tableName", tableName);
String query =
"ALTER ICEBERG TABLE identifier(?) ALTER COLUMN RECORD_METADATA SET DATA TYPE "
+ ICEBERG_METADATA_OBJECT_SCHEMA;
try {
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, tableName);
stmt.execute();
stmt.close();
} catch (SQLException e) {
LOGGER.error(
"Couldn't alter table {} RECORD_METADATA column type to align with iceberg format",
tableName);
throw SnowflakeErrors.ERROR_2018.getException(e);
}
LOGGER.info(
"alter table {} RECORD_METADATA column type to align with iceberg format", tableName);
}

@Override
public boolean tableExist(final String tableName) {
return describeTable(tableName).isPresent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
import static com.snowflake.kafka.connector.ConnectorConfigTools.BOOLEAN_VALIDATOR;
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.ERRORS_LOG_ENABLE_CONFIG;
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.ERRORS_TOLERANCE_CONFIG;
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_ICEBERG_ENABLED;
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_MAX_CLIENT_LAG;
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.VALUE_CONVERTER;
import static com.snowflake.kafka.connector.Utils.isIcebergEnabled;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
Expand All @@ -27,13 +25,6 @@ public class DefaultStreamingConfigValidator implements StreamingConfigValidator
public ImmutableMap<String, String> validate(Map<String, String> inputConfig) {
Map<String, String> invalidParams = new HashMap<>();

// Validate Iceberg config
if (isIcebergEnabled(inputConfig)) {
invalidParams.put(
SNOWFLAKE_STREAMING_ICEBERG_ENABLED,
"Ingestion to Iceberg table is currently unsupported.");
}

validateRole(inputConfig)
.ifPresent(errorEntry -> invalidParams.put(errorEntry.getKey(), errorEntry.getValue()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,6 @@ private void addKafkaConnectBuiltInParameters(

private void addConnectorSpecificParameters(
final Map<String, String> userProvidedConfig, final ObjectNode dataObjectNode) {
// Iceberg configuration
dataObjectNode.put(
KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_ICEBERG_ENABLED,
userProvidedConfig.getOrDefault(
KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_ICEBERG_ENABLED,
String.valueOf(
KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_ICEBERG_ENABLED_DEFAULT)));

// Streaming configuration
addConfigIfPresent(
userProvidedConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ public class IcebergDDLTypes {
+ "topic STRING,"
+ "partition INTEGER,"
+ "key STRING,"
+ "schema_id INTEGER,"
+ "key_schema_id INTEGER,"
+ "CreateTime BIGINT,"
+ "LogAppendTime BIGINT,"
+ "SnowflakeConnectorPushTime BIGINT,"
+ "headers MAP(VARCHAR, VARCHAR)"
+ ")";
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ public SnowflakeSinkConnectorConfigBuilder withPrivateKey(String privateKey) {
return this;
}

public SnowflakeSinkConnectorConfigBuilder withIcebergEnabled() {
config.put(KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_ICEBERG_ENABLED, "true");
return this;
}

public SnowflakeSinkConnectorConfigBuilder withRole(String role) {
config.put(SNOWFLAKE_ROLE_NAME, role);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ public void testAssertNotEmpty() {
SnowflakeErrors.ERROR_0005, () -> InternalUtils.assertNotEmpty("TABLENAME", null));
assert TestUtils.assertError(
SnowflakeErrors.ERROR_0005, () -> InternalUtils.assertNotEmpty("tableName", ""));
assert TestUtils.assertError(
SnowflakeErrors.ERROR_0004, () -> InternalUtils.assertNotEmpty("stagename", null));
assert TestUtils.assertError(
SnowflakeErrors.ERROR_0004, () -> InternalUtils.assertNotEmpty("stageName", ""));
assert TestUtils.assertError(
SnowflakeErrors.ERROR_0006, () -> InternalUtils.assertNotEmpty("pipeName", null));
assert TestUtils.assertError(
Expand Down
Loading
Loading