Skip to content

Commit b595442

Browse files
FLOW-7864 Adapt iceberg tests for SSV2 remove unused code
1 parent 0de8f33 commit b595442

22 files changed

+2
-544
lines changed

src/main/java/com/snowflake/kafka/connector/Constants.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@ public static final class KafkaConnectorConfigParams {
2929
public static final boolean SNOWFLAKE_STREAMING_METADATA_CONNECTOR_PUSH_TIME_DEFAULT = true;
3030
public static final String SNOWFLAKE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP =
3131
"snowflake.streaming.client.provider.override.map";
32-
// Iceberg
33-
public static final String SNOWFLAKE_STREAMING_ICEBERG_ENABLED =
34-
"snowflake.streaming.iceberg.enabled";
35-
public static final boolean SNOWFLAKE_STREAMING_ICEBERG_ENABLED_DEFAULT = false;
3632
public static final String SNOWFLAKE_STREAMING_MAX_CLIENT_LAG =
3733
"snowflake.streaming.max.client.lag";
3834

src/main/java/com/snowflake/kafka/connector/Utils.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717
package com.snowflake.kafka.connector;
1818

19-
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_ICEBERG_ENABLED;
20-
2119
import com.google.common.collect.ImmutableMap;
2220
import com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams;
2321
import com.snowflake.kafka.connector.internal.KCLogger;
@@ -315,14 +313,6 @@ static boolean isValidSnowflakeTableName(String tableName) {
315313
return tableName.matches("^([_a-zA-Z]{1}[_$a-zA-Z0-9]+\\.){0,2}[_a-zA-Z]{1}[_$a-zA-Z0-9]+$");
316314
}
317315

318-
/**
319-
* @param config config with applied default values
320-
* @return true when Iceberg mode is enabled.
321-
*/
322-
public static boolean isIcebergEnabled(Map<String, String> config) {
323-
return Boolean.parseBoolean(config.get(SNOWFLAKE_STREAMING_ICEBERG_ENABLED));
324-
}
325-
326316
/**
327317
* @param config config with applied default values
328318
* @return role specified in rhe config

src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -318,13 +318,6 @@ public static ConfigDef getConfig() {
318318
LOW,
319319
"If set to true the Connector will fail its tasks when authorization error from"
320320
+ " Snowflake occurred")
321-
.define(
322-
KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_ICEBERG_ENABLED,
323-
BOOLEAN,
324-
KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_ICEBERG_ENABLED_DEFAULT,
325-
HIGH,
326-
"When set to true the connector will ingest data into the Iceberg table. Check the"
327-
+ " official Snowflake documentation for the prerequisites.")
328321
.define(
329322
KafkaConnectorConfigParams.CACHE_TABLE_EXISTS,
330323
BOOLEAN,

src/main/java/com/snowflake/kafka/connector/internal/CachingSnowflakeConnectionService.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -153,16 +153,6 @@ public void createTableWithOnlyMetadataColumn(String tableName) {
153153
tableExistsCache.invalidate(tableName);
154154
}
155155

156-
@Override
157-
public void addMetadataColumnForIcebergIfNotExists(String tableName) {
158-
delegate.addMetadataColumnForIcebergIfNotExists(tableName);
159-
}
160-
161-
@Override
162-
public void initializeMetadataColumnTypeForIceberg(String tableName) {
163-
delegate.initializeMetadataColumnTypeForIceberg(tableName);
164-
}
165-
166156
@Override
167157
public boolean isTableCompatible(String tableName) {
168158
return delegate.isTableCompatible(tableName);

src/main/java/com/snowflake/kafka/connector/internal/InternalUtils.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,6 @@ static void assertNotEmpty(String name, Object value) {
5353
switch (name.toLowerCase()) {
5454
case "tablename":
5555
throw SnowflakeErrors.ERROR_0005.getException();
56-
case "stagename":
57-
throw SnowflakeErrors.ERROR_0004.getException();
5856
case "pipename":
5957
throw SnowflakeErrors.ERROR_0006.getException();
6058
case "conf":

src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -86,21 +86,6 @@ public interface SnowflakeConnectionService {
8686
*/
8787
void createTableWithOnlyMetadataColumn(String tableName);
8888

89-
/**
90-
* Alter the RECORD_METADATA column to be of the required structured OBJECT type for iceberg
91-
* tables.
92-
*
93-
* @param tableName iceberg table name
94-
*/
95-
void initializeMetadataColumnTypeForIceberg(String tableName);
96-
97-
/**
98-
* Add the RECORD_METADATA column to the iceberg table if it does not exist.
99-
*
100-
* @param tableName iceberg table name
101-
*/
102-
void addMetadataColumnForIcebergIfNotExists(String tableName);
103-
10489
/**
10590
* Calls describe table statement and returns all columns and corresponding types.
10691
*

src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java

Lines changed: 0 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -34,30 +34,13 @@ public enum SnowflakeErrors {
3434
"0003",
3535
"Missing required parameter",
3636
"one or multiple required parameters haven't be provided"),
37-
ERROR_0004("0004", "Empty Stage name", "Input Stage name is empty string or null"),
3837
ERROR_0005("0005", "Empty Table name", "Input Table name is empty string or null"),
3938
ERROR_0006("0006", "Empty Pipe name", "Input Pipe name is empty String or null"),
4039
ERROR_0007(
4140
"0007",
4241
"Invalid Snowflake URL",
4342
"Snowflake URL format: 'https://<account_name>.<region_name>"
4443
+ ".snowflakecomputing.com:443', 'https://' and ':443' are optional."),
45-
ERROR_0008(
46-
"0008",
47-
"Invalid staging file name",
48-
"File name format: <app_name>/<table_name>/<partition_number"
49-
+ ">/<start_offset>_<end_offset>_<timestamp>.json.gz"),
50-
ERROR_0009(
51-
"0009",
52-
"Invalid value converter",
53-
"Only support Snowflake Converters (e.g. SnowflakeJsonConverter, "
54-
+ "SnowflakeAvroConverter)"),
55-
ERROR_0010("0010", "Invalid input record", "Input record value can't be parsed"),
56-
ERROR_0011("0011", "Failed to load schema from Schema Registry", "Schema ID doesn't exist"),
57-
ERROR_0012(
58-
"0012",
59-
"Failed to connect schema registry service",
60-
"Schema registry service is not available"),
6144
ERROR_0013(
6245
"0013",
6346
"Missed private key in connector config",
@@ -105,10 +88,6 @@ public enum SnowflakeErrors {
10588
"0023",
10689
"Invalid proxy username or password",
10790
"Both username and password need to be provided if one of them is provided"),
108-
ERROR_0024(
109-
"0024",
110-
"Reader schema invalid",
111-
"A reader schema is provided but can not be parsed as an Avro schema"),
11291
ERROR_0030(
11392
"0030",
11493
String.format(
@@ -121,18 +100,6 @@ public enum SnowflakeErrors {
121100
"0031",
122101
"Failed to combine JDBC properties",
123102
"One of snowflake.jdbc.map property overrides other jdbc property"),
124-
ERROR_0032(
125-
"0032",
126-
"Iceberg table does not exist or is in invalid format",
127-
"Check Snowflake Kafka Connector docs for details"),
128-
ERROR_0033(
129-
"0033",
130-
"Invalid OAuth URL",
131-
"OAuth URL format: '[http://|https://]<oauth_server>[:<port>][/<path>]'. Protocol defaults"
132-
+ " to 'https://'. Port defaults to 443 for https and 80 for http. Path may contain"
133-
+ " alphanumeric characters, dots, hyphens, and forward slashes (e.g.,"
134-
+ " 'login.example.com/oauth2/v2.0/token')."),
135-
// Snowflake connection issues 1---
136103
ERROR_1001(
137104
"1001",
138105
"Failed to connect to Snowflake Server",
@@ -141,8 +108,6 @@ public enum SnowflakeErrors {
141108
"1003",
142109
"Snowflake connection is closed",
143110
"Either the current connection is closed or hasn't connect to snowflake" + " server"),
144-
ERROR_1004(
145-
"1004", "Fetching OAuth token fail", "Fail to get OAuth token from authorization server"),
146111
ERROR_1005(
147112
"1005",
148113
"Task failed due to authorization error",
@@ -158,35 +123,6 @@ public enum SnowflakeErrors {
158123
"2007",
159124
"Failed to create table",
160125
"Failed to create table on Snowflake, please check that you have permission to do so."),
161-
ERROR_2010("2010", "Max retry exceeded", "Api retry exceeded the max retry limit"),
162-
ERROR_2012(
163-
"2012",
164-
"Failed to append RECORD_METADATA column",
165-
"Failed to append RECORD_METADATA column due to an existing RECORD_METADATA column with"
166-
+ " non-VARIANT type."),
167-
ERROR_2013(
168-
"2013",
169-
"Failed to append RECORD_METADATA column",
170-
"Failed to append RECORD_METADATA column, please check that you have permission to do so."),
171-
ERROR_2014(
172-
"2014", "Table not exists", "Table not exists. It might have been deleted externally."),
173-
ERROR_2015(
174-
"2015", "Failed to append columns", "Failed to append columns during schema evolution"),
175-
ERROR_2017(
176-
"2017",
177-
"Failed to check schema evolution permission",
178-
"Failed to check schema evolution permission"),
179-
180-
ERROR_2018(
181-
"2018",
182-
"Failed to alter RECORD_METADATA column type for iceberg",
183-
"Failed to alter RECORD_METADATA column type to required format for iceberg."),
184-
ERROR_2019(
185-
"2019",
186-
"Failed to add RECORD_METADATA column for iceberg",
187-
"Failed to add RECORD_METADATA column with required format for iceberg."),
188-
189-
ERROR_5003("5003", "Incompatible table", "Table doesn't have a compatible schema"),
190126
ERROR_5007(
191127
"5007",
192128
"SnowflakeStreamingSinkConnector timeout",
@@ -197,9 +133,6 @@ public enum SnowflakeErrors {
197133
"5010",
198134
"Connection is null or closed",
199135
"Connection is closed or null when starting sink service"),
200-
ERROR_5011(
201-
"5011", "Data is not broken", "Tried to access broken data but the record is not broken"),
202-
ERROR_5012("5012", "Data is broken", "Failed to access record data because it is broken"),
203136
ERROR_5013(
204137
"5013",
205138
"Failed to initialize SinkTask",
@@ -211,14 +144,6 @@ public enum SnowflakeErrors {
211144
ERROR_5015(
212145
"5015", "Invalid SinkRecord received", "Error parsing SinkRecord value or SinkRecord header"),
213146
ERROR_5020("5020", "Failed to register MBean in MbeanServer", "Object Name is invalid"),
214-
ERROR_5022("5022", "Invalid column name", "Failed to find column in the schema"),
215-
216-
ERROR_5025(
217-
"5025", "Unexpected data type", "Unexpected data type encountered during schema evolution."),
218-
ERROR_5026(
219-
"5026",
220-
"Invalid SinkRecord received",
221-
"Cannot infer type from null or empty object/list during schema evolution."),
222147
ERROR_5027(
223148
"5027",
224149
"Data verification failed",

src/main/java/com/snowflake/kafka/connector/internal/StandardSnowflakeConnectionService.java

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.snowflake.kafka.connector.internal;
22

33
import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA;
4-
import static com.snowflake.kafka.connector.streaming.iceberg.IcebergDDLTypes.ICEBERG_METADATA_OBJECT_SCHEMA;
54

65
import com.fasterxml.jackson.databind.ObjectMapper;
76
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
@@ -101,50 +100,6 @@ public void createTableWithOnlyMetadataColumn(final String tableName) {
101100
LOGGER.info("Created table {} with only RECORD_METADATA column", tableName);
102101
}
103102

104-
@Override
105-
public void addMetadataColumnForIcebergIfNotExists(String tableName) {
106-
checkConnection();
107-
InternalUtils.assertNotEmpty("tableName", tableName);
108-
String query =
109-
"ALTER ICEBERG TABLE identifier(?) ADD COLUMN IF NOT EXISTS RECORD_METADATA "
110-
+ ICEBERG_METADATA_OBJECT_SCHEMA;
111-
try {
112-
PreparedStatement stmt = conn.prepareStatement(query);
113-
stmt.setString(1, tableName);
114-
stmt.execute();
115-
stmt.close();
116-
} catch (SQLException e) {
117-
LOGGER.error(
118-
"Couldn't alter table {} add RECORD_METADATA column to align with iceberg format",
119-
tableName);
120-
throw SnowflakeErrors.ERROR_2019.getException(e);
121-
}
122-
LOGGER.info(
123-
"alter table {} add RECORD_METADATA column to align with iceberg format", tableName);
124-
}
125-
126-
@Override
127-
public void initializeMetadataColumnTypeForIceberg(String tableName) {
128-
checkConnection();
129-
InternalUtils.assertNotEmpty("tableName", tableName);
130-
String query =
131-
"ALTER ICEBERG TABLE identifier(?) ALTER COLUMN RECORD_METADATA SET DATA TYPE "
132-
+ ICEBERG_METADATA_OBJECT_SCHEMA;
133-
try {
134-
PreparedStatement stmt = conn.prepareStatement(query);
135-
stmt.setString(1, tableName);
136-
stmt.execute();
137-
stmt.close();
138-
} catch (SQLException e) {
139-
LOGGER.error(
140-
"Couldn't alter table {} RECORD_METADATA column type to align with iceberg format",
141-
tableName);
142-
throw SnowflakeErrors.ERROR_2018.getException(e);
143-
}
144-
LOGGER.info(
145-
"alter table {} RECORD_METADATA column type to align with iceberg format", tableName);
146-
}
147-
148103
@Override
149104
public boolean tableExist(final String tableName) {
150105
return describeTable(tableName).isPresent();

src/main/java/com/snowflake/kafka/connector/internal/streaming/DefaultStreamingConfigValidator.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@
33
import static com.snowflake.kafka.connector.ConnectorConfigTools.BOOLEAN_VALIDATOR;
44
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.ERRORS_LOG_ENABLE_CONFIG;
55
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.ERRORS_TOLERANCE_CONFIG;
6-
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_ICEBERG_ENABLED;
76
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_MAX_CLIENT_LAG;
87
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.VALUE_CONVERTER;
9-
import static com.snowflake.kafka.connector.Utils.isIcebergEnabled;
108

119
import com.google.common.base.Strings;
1210
import com.google.common.collect.ImmutableMap;
@@ -27,13 +25,6 @@ public class DefaultStreamingConfigValidator implements StreamingConfigValidator
2725
public ImmutableMap<String, String> validate(Map<String, String> inputConfig) {
2826
Map<String, String> invalidParams = new HashMap<>();
2927

30-
// Validate Iceberg config
31-
if (isIcebergEnabled(inputConfig)) {
32-
invalidParams.put(
33-
SNOWFLAKE_STREAMING_ICEBERG_ENABLED,
34-
"Ingestion to Iceberg table is currently unsupported.");
35-
}
36-
3728
validateRole(inputConfig)
3829
.ifPresent(errorEntry -> invalidParams.put(errorEntry.getKey(), errorEntry.getValue()));
3930

src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryService.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -253,14 +253,6 @@ private void addKafkaConnectBuiltInParameters(
253253

254254
private void addConnectorSpecificParameters(
255255
final Map<String, String> userProvidedConfig, final ObjectNode dataObjectNode) {
256-
// Iceberg configuration
257-
dataObjectNode.put(
258-
KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_ICEBERG_ENABLED,
259-
userProvidedConfig.getOrDefault(
260-
KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_ICEBERG_ENABLED,
261-
String.valueOf(
262-
KafkaConnectorConfigParams.SNOWFLAKE_STREAMING_ICEBERG_ENABLED_DEFAULT)));
263-
264256
// Streaming configuration
265257
addConfigIfPresent(
266258
userProvidedConfig,

0 commit comments

Comments
 (0)