Skip to content

Commit 9da8278

Browse files
FLOW-7882 Adapt FDN schema evolution tests to the implementation from SSV2 server side (#1243)
1 parent 01cec9e commit 9da8278

File tree

39 files changed

+571
-1975
lines changed

39 files changed

+571
-1975
lines changed

src/main/java/com/snowflake/kafka/connector/internal/CachingSnowflakeConnectionService.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -217,11 +217,6 @@ public void executeQueryWithParameters(String query, String... parameters) {
217217
tableExistsCache.invalidateAll();
218218
}
219219

220-
@Override
221-
public void appendMetaColIfNotExist(String tableName) {
222-
delegate.appendMetaColIfNotExist(tableName);
223-
}
224-
225220
private void logStatsIfNeeded() {
226221
final long now = System.currentTimeMillis();
227222
final long lastLogged = lastStatsLogTimestamp.get();

src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,6 @@ public interface SnowflakeConnectionService {
7575
/** @return the raw jdbc connection */
7676
Connection getConnection();
7777

78-
/**
79-
* Append a VARIANT type column "RECORD_METADATA" to the table if it is not present.
80-
*
81-
* <p>This method is only called when schematization is enabled
82-
*
83-
* @param tableName table name
84-
*/
85-
void appendMetaColIfNotExist(String tableName);
86-
8778
/**
8879
* Create a table with only the RECORD_METADATA column. The rest of the columns might be added
8980
* through schema evolution

src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,11 @@ public SnowflakeConnectionService build() {
5757
CachingConfig cachingConfig = CachingConfig.fromConfig(config);
5858
return new CachingSnowflakeConnectionService(baseService, cachingConfig);
5959
}
60+
61+
public SnowflakeConnectionServiceBuilder noCaching() {
62+
config.put(KafkaConnectorConfigParams.CACHE_TABLE_EXISTS, "false");
63+
config.put(KafkaConnectorConfigParams.CACHE_PIPE_EXISTS, "false");
64+
return this;
65+
}
6066
}
6167
}

src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,6 @@ public enum SnowflakeErrors {
172172
"2014", "Table not exists", "Table not exists. It might have been deleted externally."),
173173
ERROR_2015(
174174
"2015", "Failed to append columns", "Failed to append columns during schema evolution"),
175-
ERROR_2016("2016", "Failed to drop NOT NULL", "Failed to drop NOT NULL during schema evolution"),
176175
ERROR_2017(
177176
"2017",
178177
"Failed to check schema evolution permission",

src/main/java/com/snowflake/kafka/connector/internal/StandardSnowflakeConnectionService.java

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -231,48 +231,6 @@ public boolean isTableCompatible(final String tableName) {
231231
return compatible;
232232
}
233233

234-
@Override
235-
public void appendMetaColIfNotExist(final String tableName) {
236-
checkConnection();
237-
InternalUtils.assertNotEmpty("tableName", tableName);
238-
String query = "desc table identifier(?)";
239-
PreparedStatement stmt = null;
240-
ResultSet result = null;
241-
boolean hasMeta = false;
242-
boolean isVariant = false;
243-
try {
244-
stmt = conn.prepareStatement(query);
245-
stmt.setString(1, tableName);
246-
result = stmt.executeQuery();
247-
while (result.next()) {
248-
// The result schema is row idx | column name | data type | kind | null? | ...
249-
if (result.getString(1).equals(TABLE_COLUMN_METADATA)) {
250-
hasMeta = true;
251-
if (result.getString(2).equals("VARIANT")) {
252-
isVariant = true;
253-
}
254-
break;
255-
}
256-
}
257-
} catch (SQLException e) {
258-
throw SnowflakeErrors.ERROR_2014.getException("table name: " + tableName);
259-
}
260-
try {
261-
if (!hasMeta) {
262-
String metaQuery = "alter table identifier(?) add RECORD_METADATA VARIANT";
263-
stmt = conn.prepareStatement(metaQuery);
264-
stmt.setString(1, tableName);
265-
stmt.executeQuery();
266-
} else {
267-
if (!isVariant) {
268-
throw SnowflakeErrors.ERROR_2012.getException("table name: " + tableName);
269-
}
270-
}
271-
} catch (SQLException e) {
272-
throw SnowflakeErrors.ERROR_2013.getException("table name: " + tableName);
273-
}
274-
}
275-
276234
@Override
277235
public void databaseExists(String databaseName) {
278236
checkConnection();

src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,6 @@ public Optional<MetricRegistry> getMetricRegistry(String partitionChannelKey) {
457457
: Optional.empty();
458458
}
459459

460-
// ------ Streaming Ingest Related Functions ------ //
461460
private void createTableIfNotExists(final String tableName) {
462461
if (this.conn.tableExist(tableName)) {
463462
LOGGER.info("Using existing table {}.", tableName);

src/test/java/com/snowflake/kafka/connector/ConnectClusterBaseIT.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ public abstract class ConnectClusterBaseIT {
3131
public void beforeAll() {
3232
Map<String, String> workerConfig = new HashMap<>();
3333
workerConfig.put("plugin.discovery", "hybrid_warn");
34+
// this parameter decides how often preCommit is called on the task
35+
workerConfig.put("offset.flush.interval.ms", "5000");
36+
3437
connectCluster =
3538
new EmbeddedConnectCluster.Builder()
3639
.name("kafka-push-connector-connect-cluster")
@@ -50,7 +53,7 @@ public void afterAll() {
5053

5154
protected FakeSnowflakeStreamingIngestClient getOpenedFakeIngestClient(String connectorName) {
5255
await("channelsCreated")
53-
.atMost(Duration.ofSeconds(30))
56+
.atMost(Duration.ofSeconds(60))
5457
.ignoreExceptions()
5558
.until(
5659
() ->
@@ -92,7 +95,7 @@ protected final void waitForConnectorRunning(String connectorName) {
9295
}
9396
}
9497

95-
protected final void waitForConnectorStopped(String connectorName) {
98+
protected final void waitForConnectorDoesNotExist(String connectorName) {
9699
try {
97100
connectCluster
98101
.assertions()
@@ -102,6 +105,16 @@ protected final void waitForConnectorStopped(String connectorName) {
102105
}
103106
}
104107

108+
protected final void waitForConnectorStopped(String connectorName) {
109+
try {
110+
connectCluster
111+
.assertions()
112+
.assertConnectorIsStopped(connectorName, "Connector should be stopped");
113+
} catch (InterruptedException e) {
114+
throw new IllegalStateException("The connector is not running");
115+
}
116+
}
117+
105118
private FakeSnowflakeStreamingIngestClient getFakeSnowflakeStreamingIngestClient(
106119
String connectorName) {
107120
return fakeClientSupplier.getFakeIngestClients().stream()
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package com.snowflake.kafka.connector;
2+
3+
import static com.snowflake.kafka.connector.internal.TestUtils.assertTableColumnCount;
4+
import static com.snowflake.kafka.connector.internal.TestUtils.assertWithRetry;
5+
6+
import com.snowflake.kafka.connector.internal.TestUtils;
7+
import io.confluent.connect.avro.AvroConverter;
8+
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
9+
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
10+
import io.confluent.kafka.serializers.KafkaAvroSerializer;
11+
import java.math.BigDecimal;
12+
import java.nio.ByteBuffer;
13+
import java.util.HashMap;
14+
import java.util.Map;
15+
import java.util.Properties;
16+
import org.apache.avro.Conversions;
17+
import org.apache.avro.LogicalTypes;
18+
import org.apache.avro.Schema;
19+
import org.apache.avro.generic.GenericData;
20+
import org.apache.avro.generic.GenericRecord;
21+
import org.apache.kafka.clients.producer.KafkaProducer;
22+
import org.apache.kafka.clients.producer.ProducerConfig;
23+
import org.apache.kafka.clients.producer.ProducerRecord;
24+
import org.apache.kafka.common.serialization.StringSerializer;
25+
import org.apache.kafka.connect.runtime.ConnectorConfig;
26+
import org.junit.jupiter.api.AfterEach;
27+
import org.junit.jupiter.api.BeforeEach;
28+
import org.junit.jupiter.api.Test;
29+
30+
/**
31+
* Integration test for schema evolution using Avro with Schema Registry. Tests that the table is
32+
* updated with correct column types when records with different Avro schemas are sent from multiple
33+
* topics.
34+
*/
35+
class SchemaEvolutionAvroSrIT extends SchemaEvolutionBase {
36+
37+
private static final String MOCK_SCHEMA_REGISTRY_URL = "mock://test-schema-registry";
38+
39+
private static final String PERFORMANCE_STRING = "PERFORMANCE_STRING";
40+
private static final String PERFORMANCE_CHAR = "PERFORMANCE_CHAR";
41+
private static final String RATING_INT = "RATING_INT";
42+
private static final String RATING_DOUBLE = "RATING_DOUBLE";
43+
private static final String APPROVAL = "APPROVAL";
44+
private static final String TIME_MILLIS = "TIME_MILLIS";
45+
private static final String TIMESTAMP_MILLIS = "TIMESTAMP_MILLIS";
46+
private static final String DATE = "DATE";
47+
private static final String DECIMAL = "DECIMAL";
48+
private static final String SOME_FLOAT_NAN = "SOME_FLOAT_NAN";
49+
private static final String RECORD_METADATA = "RECORD_METADATA";
50+
51+
private static final Map<String, String> EXPECTED_SCHEMA = new HashMap();
52+
53+
static {
54+
EXPECTED_SCHEMA.put(PERFORMANCE_STRING, "VARCHAR");
55+
EXPECTED_SCHEMA.put(PERFORMANCE_CHAR, "VARCHAR");
56+
EXPECTED_SCHEMA.put(RATING_INT, "NUMBER");
57+
EXPECTED_SCHEMA.put(
58+
RATING_DOUBLE, "NUMBER"); // no floats anymore in server side SSV2 schema evo)
59+
EXPECTED_SCHEMA.put(APPROVAL, "BOOLEAN");
60+
EXPECTED_SCHEMA.put(
61+
SOME_FLOAT_NAN, "VARCHAR"); // no floats anymore in server side SSV2 schema evo)
62+
EXPECTED_SCHEMA.put(TIME_MILLIS, "TIME");
63+
EXPECTED_SCHEMA.put(TIMESTAMP_MILLIS, "VARCHAR");
64+
EXPECTED_SCHEMA.put(DATE, "TIME");
65+
EXPECTED_SCHEMA.put(DECIMAL, "NUMBER");
66+
EXPECTED_SCHEMA.put(RECORD_METADATA, "VARIANT");
67+
}
68+
69+
private static final String VALUE_SCHEMA_0 =
70+
"{\"type\": \"record\",\"name\": \"value_schema_0\",\"fields\": [ {\"name\":"
71+
+ " \"PERFORMANCE_CHAR\", \"type\": \"string\"}, {\"name\": \"PERFORMANCE_STRING\","
72+
+ " \"type\": \"string\"},"
73+
+ " {\"name\":\"TIME_MILLIS\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},"
74+
+ "{\"name\":\"DATE\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},{\"name\":\"DECIMAL\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\","
75+
+ " \"precision\":4, \"scale\":2}},"
76+
+ "{\"name\":\"TIMESTAMP_MILLIS\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},"
77+
+ " {\"name\": \"RATING_INT\", \"type\": \"int\"}]}";
78+
79+
private static final String VALUE_SCHEMA_1 =
80+
"{"
81+
+ "\"type\": \"record\","
82+
+ "\"name\": \"value_schema_1\","
83+
+ "\"fields\": ["
84+
+ " {\"name\": \"RATING_DOUBLE\", \"type\": \"float\"},"
85+
+ " {\"name\": \"PERFORMANCE_STRING\", \"type\": \"string\"},"
86+
+ " {\"name\": \"APPROVAL\", \"type\": \"boolean\"},"
87+
+ " {\"name\": \"SOME_FLOAT_NAN\", \"type\": \"float\"}"
88+
+ "]"
89+
+ "}";
90+
91+
private static final String SCHEMA_REGISTRY_SCOPE = "test-schema-registry";
92+
private static final int COL_NUM = 11;
93+
94+
private KafkaProducer<String, Object> avroProducer;
95+
96+
@BeforeEach
97+
void beforeEach() {
98+
avroProducer = createAvroProducer();
99+
}
100+
101+
@AfterEach
102+
void afterEach() {
103+
if (avroProducer != null) {
104+
avroProducer.close();
105+
}
106+
MockSchemaRegistry.dropScope(SCHEMA_REGISTRY_SCOPE);
107+
}
108+
109+
@Test
110+
void testSchemaEvolutionWithMultipleTopicsAndAvroSr() throws Exception {
111+
// given
112+
final Map<String, String> config = createConnectorConfig();
113+
config.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, AvroConverter.class.getName());
114+
config.put("value.converter.schema.registry.url", MOCK_SCHEMA_REGISTRY_URL);
115+
connectCluster.configureConnector(connectorName, config);
116+
waitForConnectorRunning(connectorName);
117+
118+
// when
119+
sendRecordsToTopic0();
120+
sendRecordsToTopic1();
121+
122+
// then
123+
final int expectedTotalRecords = TOPIC_COUNT * RECORD_COUNT;
124+
assertWithRetry(() -> snowflake.tableExist(tableName));
125+
assertWithRetry(() -> TestUtils.getNumberOfRows(tableName) == expectedTotalRecords);
126+
assertTableColumnCount(tableName, COL_NUM);
127+
TestUtils.checkTableSchema(tableName, EXPECTED_SCHEMA);
128+
}
129+
130+
private KafkaProducer<String, Object> createAvroProducer() {
131+
final Properties props = new Properties();
132+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, connectCluster.kafka().bootstrapServers());
133+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
134+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
135+
props.put("schema.registry.url", MOCK_SCHEMA_REGISTRY_URL);
136+
return new KafkaProducer<>(props, new StringSerializer(), createAvroSerializer());
137+
}
138+
139+
private KafkaAvroSerializer createAvroSerializer() {
140+
final SchemaRegistryClient schemaRegistryClient =
141+
MockSchemaRegistry.getClientForScope(SCHEMA_REGISTRY_SCOPE);
142+
final KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaRegistryClient);
143+
serializer.configure(Map.of("schema.registry.url", MOCK_SCHEMA_REGISTRY_URL), false);
144+
return serializer;
145+
}
146+
147+
private void sendRecordsToTopic0() {
148+
final Schema schema = new Schema.Parser().parse(VALUE_SCHEMA_0);
149+
for (int i = 0; i < RECORD_COUNT; i++) {
150+
final GenericRecord record = createTopic0Record(schema);
151+
avroProducer.send(new ProducerRecord<>(topic0, "key-" + i, record));
152+
}
153+
avroProducer.flush();
154+
}
155+
156+
private void sendRecordsToTopic1() {
157+
final Schema schema = new Schema.Parser().parse(VALUE_SCHEMA_1);
158+
for (int i = 0; i < RECORD_COUNT; i++) {
159+
final GenericRecord record = createTopic1Record(schema);
160+
avroProducer.send(new ProducerRecord<>(topic1, "key-" + i, record));
161+
}
162+
avroProducer.flush();
163+
}
164+
165+
private GenericRecord createTopic0Record(final Schema schema) {
166+
Schema decimalSchema = schema.getField(DECIMAL).schema();
167+
LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) decimalSchema.getLogicalType();
168+
BigDecimal value = new BigDecimal("0.03125");
169+
BigDecimal scaledValue = value.setScale(decimalType.getScale(), BigDecimal.ROUND_HALF_UP);
170+
ByteBuffer byteBuffer =
171+
new Conversions.DecimalConversion().toBytes(scaledValue, decimalSchema, decimalType);
172+
173+
final GenericRecord record = new GenericData.Record(schema);
174+
record.put(PERFORMANCE_STRING, "Excellent");
175+
record.put(PERFORMANCE_CHAR, "A");
176+
record.put(RATING_INT, 100);
177+
record.put(TIME_MILLIS, 10);
178+
record.put(TIMESTAMP_MILLIS, 12);
179+
record.put(DECIMAL, byteBuffer);
180+
record.put(DATE, 11);
181+
return record;
182+
}
183+
184+
private GenericRecord createTopic1Record(final Schema schema) {
185+
final GenericRecord record = new GenericData.Record(schema);
186+
record.put(PERFORMANCE_STRING, "Excellent");
187+
record.put(RATING_DOUBLE, 0.99f);
188+
record.put(APPROVAL, true);
189+
record.put(SOME_FLOAT_NAN, Float.NaN);
190+
return record;
191+
}
192+
}

0 commit comments

Comments
 (0)