Skip to content

Commit a34681e

Browse files
CONN-10463 Revert role validation + telemetry improvements + Java 11 fixes (#1133)
1 parent ac678f6 commit a34681e

File tree

14 files changed

+42
-92
lines changed

14 files changed

+42
-92
lines changed

src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static com.snowflake.kafka.connector.Utils.isSnowpipeStreamingIngestion;
2020

2121
import com.google.common.base.Strings;
22-
import com.google.common.collect.ImmutableSet;
2322
import com.snowflake.kafka.connector.internal.KCLogger;
2423
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
2524
import java.util.Arrays;
@@ -251,7 +250,7 @@ public class SnowflakeSinkConnectorConfig {
251250
"value.converter.schema.registry.url";
252251

253252
public static final Set<String> CUSTOM_SNOWFLAKE_CONVERTERS =
254-
ImmutableSet.of(
253+
Set.of(
255254
"com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
256255
"com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry",
257256
"com.snowflake.kafka.connector.records.SnowflakeAvroConverter");

src/main/java/com/snowflake/kafka/connector/internal/streaming/DefaultStreamingConfigValidator.java

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -101,35 +101,13 @@ public ImmutableMap<String, String> validate(Map<String, String> inputConfig) {
101101
}
102102

103103
private static Optional<Map.Entry<String, String>> validateRole(Map<String, String> inputConfig) {
104-
if (Utils.isSnowpipeStreamingV2Enabled(inputConfig)) {
105-
return validateRoleForSSv2(inputConfig);
106-
} else {
107-
return validateRoleForSSv1(inputConfig);
108-
}
109-
}
110-
111-
private static Optional<Map.Entry<String, String>> validateRoleForSSv1(
112-
Map<String, String> inputConfig) {
113104
if (!inputConfig.containsKey(Utils.SF_ROLE)
114105
|| Strings.isNullOrEmpty(inputConfig.get(Utils.SF_ROLE))) {
115-
String roleMissingForSSv1 =
106+
String missingRole =
116107
String.format(
117108
"Config:%s should be present if ingestionMethod is:%s",
118109
Utils.SF_ROLE, inputConfig.get(INGESTION_METHOD_OPT));
119-
return Optional.of(Map.entry(Utils.SF_ROLE, roleMissingForSSv1));
120-
}
121-
return Optional.empty();
122-
}
123-
124-
private static Optional<Map.Entry<String, String>> validateRoleForSSv2(
125-
Map<String, String> inputConfig) {
126-
if (inputConfig.containsKey(Utils.SF_ROLE)) {
127-
String rolePresentForSSv2 =
128-
String.format(
129-
"The default role is used when '%s' is enabled. Delete '%s' parameter from the"
130-
+ " connector config and make sure that default role is set properly.",
131-
SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_V2_ENABLED, Utils.SF_ROLE);
132-
return Optional.of(Map.entry(Utils.SF_ROLE, rolePresentForSSv2));
110+
return Optional.of(Map.entry(Utils.SF_ROLE, missingRole));
133111
}
134112
return Optional.empty();
135113
}

src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/InsertErrorMapper.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.snowflake.kafka.connector.internal.streaming.schemaevolution;
22

3-
import com.google.common.collect.ImmutableList;
43
import com.google.common.collect.Iterables;
54
import com.google.common.collect.Lists;
65
import java.util.List;
@@ -21,10 +20,10 @@ public SchemaEvolutionTargetItems mapToSchemaEvolutionItems(
2120
extraColNames);
2221
}
2322

24-
private List<String> joinNullableLists(List<String> list1, List<String> list2) {
23+
private static List<String> joinNullableLists(List<String> list1, List<String> list2) {
2524
return Lists.newArrayList(
2625
Iterables.concat(
27-
Optional.ofNullable(list1).orElse(ImmutableList.of()),
28-
Optional.ofNullable(list2).orElse(ImmutableList.of())));
26+
Optional.ofNullable(list1).orElse(List.of()),
27+
Optional.ofNullable(list2).orElse(List.of())));
2928
}
3029
}

src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;
22

33
import com.fasterxml.jackson.databind.JsonNode;
4-
import com.google.common.collect.ImmutableList;
54
import com.google.common.collect.Streams;
65
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
76
import com.snowflake.kafka.connector.internal.streaming.common.ColumnProperties;
@@ -43,7 +42,7 @@ public List<IcebergColumnTree> resolveIcebergSchemaFromChannel(
4342
public List<IcebergColumnTree> resolveIcebergSchemaFromRecord(
4443
SinkRecord record, Set<String> columnsToEvolve) {
4544
if (columnsToEvolve == null || columnsToEvolve.isEmpty()) {
46-
return ImmutableList.of();
45+
return List.of();
4746
}
4847
if (hasSchema(record)) {
4948
LOGGER.debug(

src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryService.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,17 @@ protected void addUserConnectorPropertiesToDataNode(
263263
userProvidedConfig.getOrDefault(
264264
ENABLE_SCHEMATIZATION_CONFIG, ENABLE_SCHEMATIZATION_DEFAULT));
265265

266+
dataObjectNode.put(
267+
SNOWPIPE_STREAMING_V2_ENABLED,
268+
userProvidedConfig.getOrDefault(
269+
SNOWPIPE_STREAMING_V2_ENABLED,
270+
String.valueOf(SNOWPIPE_STREAMING_V2_ENABLED_DEFAULT_VALUE)));
271+
272+
dataObjectNode.put(
273+
ICEBERG_ENABLED,
274+
userProvidedConfig.getOrDefault(
275+
ICEBERG_ENABLED, String.valueOf(ICEBERG_ENABLED_DEFAULT_VALUE)));
276+
266277
// Record whether streaming client optimization is enabled
267278
dataObjectNode.put(
268279
ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG,

src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -924,19 +924,18 @@ public void shouldValidateSSv2Config() {
924924
Map<String, String> config =
925925
SnowflakeSinkConnectorConfigBuilder.streamingConfig()
926926
.withSnowpipeStreamingV2Enabled()
927-
.withoutRole()
928927
.build();
929928

930929
assertThatCode(() -> connectorConfigValidator.validateConfig(config))
931930
.doesNotThrowAnyException();
932931
}
933932

934933
@Test
935-
public void shouldThrowExceptionWhenRoleDefinedForSSv2() {
934+
public void shouldThrowExceptionWhenRoleNotDefinedForSSv2() {
936935
Map<String, String> config =
937936
SnowflakeSinkConnectorConfigBuilder.streamingConfig()
938937
.withSnowpipeStreamingV2Enabled()
939-
.withRole("someRole")
938+
.withoutRole()
940939
.build();
941940

942941
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))

src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
44
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_V2_ENABLED;
55

6-
import com.google.common.collect.ImmutableMap;
76
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
87
import com.snowflake.kafka.connector.internal.streaming.StreamingConfigValidator;
98
import java.util.Map;
@@ -21,7 +20,7 @@ public class IcebergConfigValidationTest {
2120
@MethodSource("validConfigs")
2221
public void shouldValidateCorrectConfig(Map<String, String> config) {
2322
// when
24-
ImmutableMap<String, String> invalidParameters = validator.validate(config);
23+
Map<String, String> invalidParameters = validator.validate(config);
2524

2625
// then
2726
Assertions.assertTrue(invalidParameters.isEmpty());
@@ -31,7 +30,7 @@ public void shouldValidateCorrectConfig(Map<String, String> config) {
3130
@MethodSource("invalidConfigs")
3231
public void shouldReturnErrorOnInvalidConfig(Map<String, String> config, String errorKey) {
3332
// when
34-
ImmutableMap<String, String> invalidParameters = validator.validate(config);
33+
Map<String, String> invalidParameters = validator.validate(config);
3534

3635
// then
3736
Assertions.assertTrue(invalidParameters.containsKey(errorKey));

src/test/java/com/snowflake/kafka/connector/internal/OAuthURLTest.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import static org.assertj.core.api.Assertions.assertThat;
44
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
55

6-
import com.google.common.collect.ImmutableList;
76
import java.util.List;
87
import java.util.stream.Stream;
98
import org.junit.jupiter.api.Test;
@@ -17,11 +16,10 @@ static Stream<Arguments> correctUrls() {
1716
return Stream.of(
1817
Arguments.of(
1918
"https://localhost:8085/push/token",
20-
ImmutableList.of("https", "localhost:8085", "/push/token", true)),
21-
Arguments.of("localhost:8085", ImmutableList.of("https", "localhost:8085", "", true)),
22-
Arguments.of(
23-
"http://localhost:8085", ImmutableList.of("http", "localhost:8085", "", false)),
24-
Arguments.of("localhost", ImmutableList.of("https", "localhost:443", "", true)));
19+
List.of("https", "localhost:8085", "/push/token", true)),
20+
Arguments.of("localhost:8085", List.of("https", "localhost:8085", "", true)),
21+
Arguments.of("http://localhost:8085", List.of("http", "localhost:8085", "", false)),
22+
Arguments.of("localhost", List.of("https", "localhost:443", "", true)));
2523
}
2624

2725
@ParameterizedTest(name = "url: {0}, parsed: {1}")

src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@
3131

3232
import com.fasterxml.jackson.databind.JsonNode;
3333
import com.fasterxml.jackson.databind.ObjectMapper;
34-
import com.google.common.collect.ImmutableSet;
35-
import com.google.common.collect.Sets;
3634
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
3735
import com.snowflake.kafka.connector.Utils;
3836
import com.snowflake.kafka.connector.config.SnowflakeSinkConnectorConfigBuilder;
@@ -61,9 +59,6 @@
6159
import java.util.function.Function;
6260
import java.util.regex.Matcher;
6361
import java.util.regex.Pattern;
64-
import java.util.stream.Collectors;
65-
import java.util.stream.IntStream;
66-
import java.util.stream.Stream;
6762
import net.snowflake.client.jdbc.internal.apache.http.HttpHeaders;
6863
import net.snowflake.client.jdbc.internal.apache.http.client.methods.CloseableHttpResponse;
6964
import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpPost;
@@ -81,7 +76,6 @@
8176
import org.apache.kafka.connect.data.Struct;
8277
import org.apache.kafka.connect.json.JsonConverter;
8378
import org.apache.kafka.connect.sink.SinkRecord;
84-
import org.junit.jupiter.params.provider.Arguments;
8579

8680
public class TestUtils {
8781
// test profile properties
@@ -1029,14 +1023,4 @@ private static StringEntity buildStringEntity(String payload) {
10291023
throw new RuntimeException(e);
10301024
}
10311025
}
1032-
1033-
public static Stream<Arguments> nBooleanProduct(int n) {
1034-
return Sets.cartesianProduct(
1035-
IntStream.range(0, n)
1036-
.mapToObj(i -> ImmutableSet.of(false, true))
1037-
.collect(Collectors.toList()))
1038-
.stream()
1039-
.map(List::toArray)
1040-
.map(Arguments::of);
1041-
}
10421026
}

src/test/java/com/snowflake/kafka/connector/internal/TombstoneRecordIngestionIT.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
import static com.snowflake.kafka.connector.ConnectorConfigValidatorTest.COMMUNITY_CONVERTER_SUBSET;
44
import static com.snowflake.kafka.connector.ConnectorConfigValidatorTest.CUSTOM_SNOWFLAKE_CONVERTERS;
55

6-
import com.google.common.collect.ImmutableSet;
7-
import com.google.common.collect.Sets;
86
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
97
import com.snowflake.kafka.connector.Utils;
108
import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter;
@@ -20,7 +18,6 @@
2018
import java.util.HashMap;
2119
import java.util.List;
2220
import java.util.Map;
23-
import java.util.stream.Stream;
2421
import org.apache.kafka.common.TopicPartition;
2522
import org.apache.kafka.connect.data.Schema;
2623
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -30,9 +27,7 @@
3027
import org.junit.jupiter.api.AfterEach;
3128
import org.junit.jupiter.api.BeforeEach;
3229
import org.junit.jupiter.params.ParameterizedTest;
33-
import org.junit.jupiter.params.provider.Arguments;
3430
import org.junit.jupiter.params.provider.EnumSource;
35-
import org.junit.jupiter.params.provider.MethodSource;
3631

3732
public class TombstoneRecordIngestionIT {
3833
private final int partition = 0;
@@ -56,16 +51,8 @@ public void afterEach() {
5651
TestUtils.dropTable(table);
5752
}
5853

59-
private static Stream<Arguments> behaviorAndSingleBufferParameters() {
60-
return Sets.cartesianProduct(
61-
ImmutableSet.copyOf(SnowflakeSinkConnectorConfig.BehaviorOnNullValues.values()))
62-
.stream()
63-
.map(List::toArray)
64-
.map(Arguments::of);
65-
}
66-
6754
@ParameterizedTest(name = "behavior: {0}")
68-
@MethodSource("behaviorAndSingleBufferParameters")
55+
@EnumSource(SnowflakeSinkConnectorConfig.BehaviorOnNullValues.class)
6956
public void testStreamingTombstoneBehavior(
7057
SnowflakeSinkConnectorConfig.BehaviorOnNullValues behavior) throws Exception {
7158
// setup
@@ -93,7 +80,7 @@ public void testStreamingTombstoneBehavior(
9380
}
9481

9582
@ParameterizedTest(name = "behavior: {0}")
96-
@MethodSource("behaviorAndSingleBufferParameters")
83+
@EnumSource(SnowflakeSinkConnectorConfig.BehaviorOnNullValues.class)
9784
public void testStreamingTombstoneBehaviorWithSchematization(
9885
SnowflakeSinkConnectorConfig.BehaviorOnNullValues behavior) throws Exception {
9986
// setup

0 commit comments

Comments
 (0)